@@ -269,7 +269,7 @@ pub mod source {
269269 use std:: sync:: Arc ;
270270 use timely:: dataflow:: { Scope , Stream , operators:: { Capability , CapabilitySet } } ;
271271 use timely:: progress:: Timestamp ;
272- use timely:: scheduling:: { SyncActivator , activate :: SyncActivateOnDrop } ;
272+ use timely:: scheduling:: SyncActivator ;
273273
274274 // TODO(guswynn): implement this generally in timely
275275 struct DropActivator {
@@ -351,14 +351,14 @@ pub mod source {
351351 let address = messages_op. operator_info ( ) . address ;
352352 let activator = scope. sync_activator_for ( & address) ;
353353 let activator2 = scope. activator_for ( & address) ;
354- let drop_activator = Arc :: new ( SyncActivateOnDrop :: new ( ( ) , scope. sync_activator_for ( & address) ) ) ;
354+ let drop_activator = DropActivator { activator : Arc :: new ( scope. sync_activator_for ( & address) ) } ;
355355 let mut source = source_builder ( activator) ;
356356 let ( mut updates_out, updates) = messages_op. new_output ( ) ;
357357 let ( mut progress_out, progress) = messages_op. new_output ( ) ;
358358 messages_op. build ( |capabilities| {
359359
360360 // A Weak that communicates whether the returned token has been dropped.
361- let drop_activator_weak = Arc :: downgrade ( & drop_activator) ;
361+ let drop_activator_weak = Arc :: downgrade ( & drop_activator. activator ) ;
362362
363363 token = Some ( drop_activator) ;
364364
0 commit comments