11//! Implements threads.
22
3- use std:: mem;
43use std:: sync:: atomic:: Ordering :: Relaxed ;
54use std:: task:: Poll ;
65use std:: time:: { Duration , SystemTime } ;
6+ use std:: { io, mem} ;
77
88use rand:: seq:: IteratorRandom ;
99use rustc_abi:: ExternAbi ;
@@ -25,10 +25,11 @@ use crate::*;
2525enum SchedulingAction {
2626 /// Execute step on the active thread.
2727 ExecuteStep ,
28- /// Execute a timeout callback.
29- ExecuteTimeoutCallback ,
30- /// Wait for a bit, until there is a timeout to be called.
31- Sleep ( Duration ) ,
28+ /// Wait for a bit, but at most as long as the duration specified.
29+ /// We wake up early if an I/O event happened.
30+ /// If the duration is [`None`], we sleep indefinitely. This is
31+ /// only allowed when isolation is disabled and there are threads waiting for I/O!
32+ SleepAndWaitForIo ( Option < Duration > ) ,
3233}
3334
3435/// What to do with TLS allocations from terminated threads
@@ -111,6 +112,8 @@ pub enum BlockReason {
111112 Eventfd ,
112113 /// Blocked on unnamed_socket.
113114 UnnamedSocket ,
115+ /// Blocked on an IO operation.
116+ IO ,
114117 /// Blocked for any reason related to GenMC, such as `assume` statements (GenMC mode only).
115118 /// Will be implicitly unblocked when GenMC schedules this thread again.
116119 Genmc ,
@@ -765,26 +768,45 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
765768 }
766769
767770 // We are not in GenMC mode, so we control the scheduling.
768- let thread_manager = & mut this. machine . threads ;
769- let clock = & this. machine . monotonic_clock ;
770- let rng = this. machine . rng . get_mut ( ) ;
771+ let thread_manager = & this. machine . threads ;
771772 // This thread and the program can keep going.
772773 if thread_manager. threads [ thread_manager. active_thread ] . state . is_enabled ( )
773774 && !thread_manager. yield_active_thread
774775 {
775776 // The currently active thread is still enabled, just continue with it.
776777 return interp_ok ( SchedulingAction :: ExecuteStep ) ;
777778 }
778- // The active thread yielded or got terminated. Let's see if there are any timeouts to take
779- // care of. We do this *before* running any other thread, to ensure that timeouts "in the
780- // past" fire before any other thread can take an action. This ensures that for
779+
780+ // The active thread yielded or got terminated. Let's see if there are any I/O events
781+ // or timeouts to take care of.
782+
783+ if this. machine . communicate ( ) {
784+ // When isolation is disabled we need to check for events for
785+ // threads which are blocked on host I/O.
786+ // We do this before running any other threads such that the threads
787+ // which received events are available for scheduling afterwards.
788+
789+ // Perform a non-blocking poll for newly available I/O events from the OS.
790+ this. poll_and_unblock ( Some ( Duration :: ZERO ) ) ?;
791+ }
792+
793+ let thread_manager = & this. machine . threads ;
794+ let clock = & this. machine . monotonic_clock ;
795+
796+ // We also check timeouts before running any other thread, to ensure that timeouts
797+ // "in the past" fire before any other thread can take an action. This ensures that for
781798 // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
782799 // abstime has already been passed at the time of the call".
783800 // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
784801 let potential_sleep_time = thread_manager. next_callback_wait_time ( clock) ;
785802 if potential_sleep_time == Some ( Duration :: ZERO ) {
786- return interp_ok ( SchedulingAction :: ExecuteTimeoutCallback ) ;
803+ // The timeout exceeded for some thread so we unblock the thread and execute its timeout callback.
804+ this. run_timeout_callback ( ) ?;
787805 }
806+
807+ let thread_manager = & mut this. machine . threads ;
808+ let rng = this. machine . rng . get_mut ( ) ;
809+
788810 // No callbacks immediately scheduled, pick a regular thread to execute.
789811 // The active thread blocked or yielded. So we go search for another enabled thread.
790812 // We build the list of threads by starting with the threads after the current one, followed by
@@ -832,7 +854,16 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
832854 // All threads are currently blocked, but we have unexecuted
833855 // timeout_callbacks, which may unblock some of the threads. Hence,
834856 // sleep until the first callback.
835- interp_ok ( SchedulingAction :: Sleep ( sleep_time) )
857+ interp_ok ( SchedulingAction :: SleepAndWaitForIo ( Some ( sleep_time) ) )
858+ } else if thread_manager
859+ . threads
860+ . iter ( )
861+ . any ( |thread| thread. state . is_blocked_on ( BlockReason :: IO ) )
862+ {
863+ // At least one thread is blocked on host I/O but doesn't
864+ // have a timeout set. Hence, we sleep indefinitely in the
865+ // hope that eventually an I/O event for this thread happens.
866+ interp_ok ( SchedulingAction :: SleepAndWaitForIo ( None ) )
836867 } else {
837868 throw_machine_stop ! ( TerminationInfo :: GlobalDeadlock ) ;
838869 }
@@ -1300,13 +1331,38 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
13001331 }
13011332 }
13021333 }
1303- SchedulingAction :: ExecuteTimeoutCallback => {
1304- this. run_timeout_callback ( ) ?;
1305- }
1306- SchedulingAction :: Sleep ( duration) => {
1307- this. machine . monotonic_clock . sleep ( duration) ;
1334+ SchedulingAction :: SleepAndWaitForIo ( duration) => {
1335+ if this. machine . communicate ( ) {
1336+ // When we're running with isolation disabled, instead of
1337+ // strictly sleeping the duration we allow waking up
1338+ // early for I/O events from the OS.
1339+
1340+ this. poll_and_unblock ( duration) ?;
1341+ } else {
1342+ let duration = duration. expect (
1343+ "Infinite sleep should not be triggered when isolation is enabled" ,
1344+ ) ;
1345+ this. machine . monotonic_clock . sleep ( duration) ;
1346+ }
13081347 }
13091348 }
13101349 }
13111350 }
1351+
1352+ /// Poll for I/O events until either an I/O event happened or the timeout expired.
1353+ /// The different timeout values are described in [`BlockingIoManager::poll`].
1354+ fn poll_and_unblock ( & mut self , timeout : Option < Duration > ) -> InterpResult < ' tcx > {
1355+ let this = self . eval_context_mut ( ) ;
1356+
1357+ let ready = match this. machine . blocking_io . poll ( timeout) {
1358+ Ok ( ready) => ready,
1359+ // We can ignore errors originating from interrupts; that's just a spurious wakeup.
1360+ Err ( e) if e. kind ( ) == io:: ErrorKind :: Interrupted => return interp_ok ( ( ) ) ,
1361+ // For other errors we panic. On Linux and BSD hosts this should only be
1362+ // reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred.
1363+ Err ( e) => panic ! ( "{e}" ) ,
1364+ } ;
1365+
1366+ ready. into_iter ( ) . try_for_each ( |thread_id| this. unblock_thread ( thread_id, BlockReason :: IO ) )
1367+ }
13121368}
0 commit comments