@@ -17,7 +17,6 @@ use futures_channel::oneshot;
1717use futures_core:: ready;
1818use tracing:: { debug, trace} ;
1919
20- use hyper:: rt:: Sleep ;
2120use hyper:: rt:: Timer as _;
2221
2322use crate :: common:: { exec, exec:: Exec , timer:: Timer } ;
@@ -440,13 +439,11 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
440439 let interval = IdleTask {
441440 timer : timer. clone ( ) ,
442441 duration : dur,
443- deadline : Instant :: now ( ) ,
444- fut : timer. sleep_until ( Instant :: now ( ) ) , // ready at first tick
445442 pool : WeakOpt :: downgrade ( pool_ref) ,
446443 pool_drop_notifier : rx,
447444 } ;
448445
449- self . exec . execute ( interval) ;
446+ self . exec . execute ( interval. run ( ) ) ;
450447 }
451448}
452449
@@ -766,55 +763,44 @@ impl Expiration {
766763 }
767764}
768765
769- pin_project_lite:: pin_project! {
770- struct IdleTask <T , K : Key > {
771- timer: Timer ,
772- duration: Duration ,
773- deadline: Instant ,
774- fut: Pin <Box <dyn Sleep >>,
775- pool: WeakOpt <Mutex <PoolInner <T , K >>>,
776- // This allows the IdleTask to be notified as soon as the entire
777- // Pool is fully dropped, and shutdown. This channel is never sent on,
778- // but Err(Canceled) will be received when the Pool is dropped.
779- #[ pin]
780- pool_drop_notifier: oneshot:: Receiver <Infallible >,
781- }
766+ struct IdleTask < T , K : Key > {
767+ timer : Timer ,
768+ duration : Duration ,
769+ pool : WeakOpt < Mutex < PoolInner < T , K > > > ,
770+ // This allows the IdleTask to be notified as soon as the entire
771+ // Pool is fully dropped, and shutdown. This channel is never sent on,
772+ // but Err(Canceled) will be received when the Pool is dropped.
773+ pool_drop_notifier : oneshot:: Receiver < Infallible > ,
782774}
783775
784- impl < T : Poolable + ' static , K : Key > Future for IdleTask < T , K > {
785- type Output = ( ) ;
776+ impl < T : Poolable + ' static , K : Key > IdleTask < T , K > {
777+ async fn run ( self ) {
778+ use futures_util:: future;
786779
787- fn poll ( self : Pin < & mut Self > , cx : & mut task :: Context < ' _ > ) -> Poll < Self :: Output > {
788- let mut this = self . project ( ) ;
780+ let mut sleep = self . timer . sleep_until ( Instant :: now ( ) + self . duration ) ;
781+ let mut on_pool_drop = self . pool_drop_notifier ;
789782 loop {
790- match this. pool_drop_notifier . as_mut ( ) . poll ( cx) {
791- Poll :: Ready ( Ok ( n) ) => match n { } ,
792- Poll :: Pending => ( ) ,
793- Poll :: Ready ( Err ( _canceled) ) => {
794- trace ! ( "pool closed, canceling idle interval" ) ;
795- return Poll :: Ready ( ( ) ) ;
783+ match future:: select ( & mut on_pool_drop, & mut sleep) . await {
784+ future:: Either :: Left ( _) => {
785+ // pool dropped, bah-bye
786+ break ;
796787 }
797- }
798-
799- ready ! ( Pin :: new( & mut this. fut) . poll( cx) ) ;
800- // Set this task to run after the next deadline
801- // If the poll missed the deadline by a lot, set the deadline
802- // from the current time instead
803- * this. deadline += * this. duration ;
804- if * this. deadline < Instant :: now ( ) - Duration :: from_millis ( 5 ) {
805- * this. deadline = Instant :: now ( ) + * this. duration ;
806- }
807- * this. fut = this. timer . sleep_until ( * this. deadline ) ;
788+ future:: Either :: Right ( ( ( ) , _) ) => {
789+ if let Some ( inner) = self . pool . upgrade ( ) {
790+ if let Ok ( mut inner) = inner. lock ( ) {
791+ trace ! ( "idle interval checking for expired" ) ;
792+ inner. clear_expired ( ) ;
793+ }
794+ }
808795
809- if let Some ( inner) = this. pool . upgrade ( ) {
810- if let Ok ( mut inner) = inner. lock ( ) {
811- trace ! ( "idle interval checking for expired" ) ;
812- inner. clear_expired ( ) ;
813- continue ;
796+ let deadline = Instant :: now ( ) + self . duration ;
797+ self . timer . reset ( & mut sleep, deadline) ;
814798 }
815799 }
816- return Poll :: Ready ( ( ) ) ;
817800 }
801+
802+ trace ! ( "pool closed, canceling idle interval" ) ;
803+ return ;
818804 }
819805}
820806
0 commit comments