diff --git a/lib/Control/Concurrent.hs b/lib/Control/Concurrent.hs index 7e65bc650..90ba7ac9e 100644 --- a/lib/Control/Concurrent.hs +++ b/lib/Control/Concurrent.hs @@ -119,11 +119,12 @@ threadStatus thr = do st <- primThreadStatus thr return $ case st of - 0 -> ThreadRunning -- ts_runnable - 1 -> ThreadBlocked BlockedOnMVar -- ts_wait_mvar - 2 -> ThreadBlocked BlockedOnOther -- ts_wait_time - 3 -> ThreadFinished -- ts_finished - 4 -> ThreadDied -- ts_died + 0 -> ThreadRunning -- ts_runnable + 1 -> ThreadBlocked BlockedOnMVar -- ts_wait_mvar + 2 -> ThreadBlocked BlockedOnOther -- ts_wait_time + 3 -> ThreadFinished -- ts_finished + 4 -> ThreadDied -- ts_died + 5 -> ThreadBlocked BlockedOnForeignCall -- ts_wait_io ------------------------------------------------------- -- Just for GHC compatibility. diff --git a/lib/Primitives.hs b/lib/Primitives.hs index b61e5b43b..17e116b53 100644 --- a/lib/Primitives.hs +++ b/lib/Primitives.hs @@ -389,6 +389,12 @@ primTryPutMVar = _primitive "IO.tryputmvar" primTryReadMVar :: MVar a -> IO b {-(Maybe a)-} primTryReadMVar = _primitive "IO.tryreadmvar" +primWaitWriteFD :: Int -> IO Int +primWaitWriteFD = _primitive "IO.waitwrfd" + +primWaitReadFD :: Int -> IO Int +primWaitReadFD = _primitive "IO.waitrdfd" + primThreadDelay :: Int -> IO () primThreadDelay = _primitive "IO.threaddelay" diff --git a/lib/System/IO/FD.hs b/lib/System/IO/FD.hs new file mode 100644 index 000000000..90993035c --- /dev/null +++ b/lib/System/IO/FD.hs @@ -0,0 +1,9 @@ +module System.IO.FD (waitForReadFD, waitForWriteFD) where + +import Primitives + +waitForReadFD :: Int -> IO Int +waitForReadFD = primWaitReadFD + +waitForWriteFD :: Int -> IO Int +waitForWriteFD = primWaitWriteFD \ No newline at end of file diff --git a/lib/base.cabal b/lib/base.cabal index 252916aa7..9f49399c2 100644 --- a/lib/base.cabal +++ b/lib/base.cabal @@ -190,6 +190,7 @@ library base System.Environment System.Exit System.IO + System.IO.FD System.IO.Error System.IO.MD5 System.IO.PrintOrRun diff --git a/src/MicroHs/Translate.hs b/src/MicroHs/Translate.hs index 295211039..78f8618e0 100644 --- a/src/MicroHs/Translate.hs +++ b/src/MicroHs/Translate.hs @@ -272,6 +272,8 @@ primTable = [ ("bs2fp", _primitive "bs2fp"), ("IO.fork", _primitive "IO.fork"), ("IO.thid", _primitive "IO.thid"), + ("IO.waitrdfd", _primitive "IO.waitrdfd"), + ("IO.waitwrfd", _primitive "IO.waitwrfd"), ("thnum", _primitive "thnum"), ("IO.throwto", _primitive "IO.throwto"), ("IO.yield", _primitive "IO.yield"), diff --git a/src/runtime/eval.c b/src/runtime/eval.c index b17753f94..369b8f35d 100644 --- a/src/runtime/eval.c +++ b/src/runtime/eval.c @@ -43,6 +43,9 @@ #if WANT_SIGINT #include #endif +#if MHS_IO_POLL +#include "io_poll.h" +#endif extern char **environ; /* should probably be behind some WANT_ */ @@ -126,6 +129,7 @@ int num_ffi; #define THREAD_DEBUG 0 #endif + #define VERSION "v8.3\n" #define PRIvalue PRIdPTR @@ -542,6 +546,7 @@ enum node_tag { T_FREE, T_IND, T_AP, T_INT, T_INT64, T_DBL, T_FLT32, T_PTR, T_FU T_WKNEWFIN, T_WKNEW, T_WKDEREF, T_WKFINAL, T_IO_PP, /* for debugging */ T_IO_STDIN, T_IO_STDOUT, T_IO_STDERR, + T_IO_WAITRDFD, T_IO_WAITWRFD, T_LAST_TAG, }; @@ -899,7 +904,14 @@ dump_tick_table(FILE *f) enum th_sched { mt_main, mt_resched, mt_raise }; /* The two enums below are known by the Haskell code. Do not change order */ -enum th_state { ts_runnable, ts_wait_mvar, ts_wait_time, ts_finished, ts_died }; +enum th_state { + ts_runnable, + ts_wait_mvar, + ts_wait_time, + ts_finished, + ts_died, + ts_wait_io, /* not visible to Haskell; must stay after ts_died */ +}; enum mask_state { mask_unmasked, mask_interruptible, mask_uninterruptible }; /***************** HANDLER *****************/ @@ -912,6 +924,9 @@ struct handler { /***************** THREAD ******************/ +#define IO_POLL_WAITING_FOR_NONE (-1) +#define IO_POLL_EVENT_HAS_HAPPENED (-2) + struct mthread { enum th_state mt_state; /* thread state */ enum mask_state mt_mask; /* making state. */ @@ -924,6 +939,10 @@ struct mthread { NODEPTR mt_mval; /* filled after waiting for take/read */ bool mt_mark; /* marked as accessible */ uvalue_t mt_id; /* thread number, thread 1 is the main thread */ +#if MHS_IO_POLL + int mt_fd; /* The file descriptor that we are waiting on (will be either IO_POLL_WAITING_FOR_NONE or IO_POLL_EVENT_HAS_HAPPENED) */ + int mt_events; /* IO_POLL_READ or IO_POLL_WRITE */ +#endif #if defined(CLOCK_INIT) CLOCK_T mt_at; /* time to wake up when in threadDelay */ #endif @@ -1115,6 +1134,19 @@ add_runq_tail(struct mthread *mt) add_q_tail(&runq, mt); } +#if MHS_IO_POLL +/* this is the callback that is sent to the io_poll framework. + * It is invoked when an event a thread is waiting for becomes ready. + */ +static void +io_thread_ready(void *ptr) +{ + struct mthread *mt = (struct mthread *)ptr; + mt->mt_fd = IO_POLL_EVENT_HAS_HAPPENED; + add_runq_tail(mt); +} +#endif + struct mthread* remove_q_head(struct mqueue *q) { @@ -1302,7 +1334,16 @@ yield(void) check_timeq(); check_thrown(false); check_sigint(); - // printf("yield %p %d\n", runq, (int)stack_ptr); + +#if MHS_IO_POLL + if (io_waiter_count() > 0) { + /* Check if any threads blocked on IO can be scheduled. Since we pass in a delay of 0, checking + for the events should not block. */ + io_poll(0, io_thread_ready); + } +#endif + +// printf("yield %p %d\n", runq, (int)stack_ptr); /* if there is nothing after in the runq then there is no need to reschedule */ if (!runq.mq_head->mt_queue) { #if THREAD_DEBUG @@ -1347,6 +1388,10 @@ new_thread(NODEPTR root) mt->mt_mark = false; mt->mt_num_slices = 0; mt->mt_id = num_thread_create++; +#if MHS_IO_POLL + mt->mt_fd = IO_POLL_WAITING_FOR_NONE; + mt->mt_events = -1; +#endif #if defined(CLOCK_INIT) mt->mt_at = 0; /* delay has not expired */ #endif @@ -1599,6 +1644,43 @@ thread_delay(uvalue_t usecs) void pause_exec(void) { +/* End up here if the run queue is empty. If there is no thread waiting for + * a delay to expire, we will never resume operation and we are deadlocked. However, if + * we compile with MHS_IO_POLL there might be threads waiting for IO events, so in + * that case we check for them as well. If there is no thread waiting for a delay or an + * IO event, we are deadlocked. + */ +#if MHS_IO_POLL + + /* Check for deadlock situation */ + if (io_waiter_count() == 0 +#if defined(CLOCK_INIT) + && !timeq.mq_head +#endif + ) ERR("deadlock"); + + /* Loop until at least one thread is runnable.*/ + while (!runq.mq_head) { + int timeout_ms = -1; /* block indefinitely if only io_waiters */ +#if defined(CLOCK_INIT) + /* If there are threads blocked on delays, recompute the timeout_ms to account + for that. */ + if (timeq.mq_head) { + CLOCK_T delta = timeq.mq_head->mt_at - CLOCK_GET(); + /* +999 emulates a ceiling function, adding at most 0.999 ms. io_poll wants milliseconds, + not microseconds as delta represents. When delta is < 1000, without +999, we'll truncate + to zero and enter a loop at full CPU speed. */ + timeout_ms = (delta > 0) ? (int)((delta + 999) / 1000) : 0; + } +#endif + io_poll(timeout_ms, io_thread_ready); +#if defined(CLOCK_INIT) + check_timeq(); +#endif + } + +#else /* !MHS_IO_POLL */ + #if defined(CLOCK_INIT) if (timeq.mq_head) { struct mthread *mt; @@ -1636,6 +1718,7 @@ pause_exec(void) #else /* CLOCK_INIT */ ERR("no clock"); #endif /* CLOCK_INIT */ +#endif /* MHS_IO_POLL */ } /* Interrupt a sleeping thread in a throwTo/threadDelay */ @@ -1823,6 +1906,10 @@ new_ap(NODEPTR f, NODEPTR a) return n; } +#if MHS_IO_POLL +#include "io_poll_impl.c" +#endif + NODEPTR evali(NODEPTR n); /* If this is non-0 it means that the threading system is active. */ @@ -1837,6 +1924,10 @@ start_exec(NODEPTR root) mt->mt_id = MAIN_THREAD; /* make it the main thread in case this is foreign export calling */ main_thread = mt; +#if MHS_IO_POLL + io_init(); +#endif + switch(setjmp(sched)) { case mt_main: break; @@ -2173,6 +2264,8 @@ struct { { "binbs1", T_BINBS1 }, { "unint1", T_UNINT1 }, { "undbl1", T_UNDBL1 }, + { "IO.waitrdfd", T_IO_WAITRDFD}, + { "IO.waitwrfd", T_IO_WAITWRFD}, #if WANT_INT64 { "I+", T_ADD64, T_ADD64 }, { "I-", T_SUB64, T_SUBR64 }, @@ -5996,6 +6089,42 @@ evali(NODEPTR an) POP(2); GOPAIR(mkInt(mt->mt_state)); } + case T_IO_WAITRDFD: + case T_IO_WAITWRFD: { +#if MHS_IO_POLL + CHKARG2NP; /* x = the filedescriptor, y = RealWorld; no pop yet */ + + /* io_thread_ready sets mt_fd=IO_POLL_EVENT_HAS_HAPPENED when waking the thread. + If we did not do this check we would just register again. + + This seems to be how T_IO_THREADDELAY works, with mt_at == -1. + */ + if (runq.mq_head->mt_fd == IO_POLL_EVENT_HAS_HAPPENED) { + runq.mq_head->mt_fd = IO_POLL_WAITING_FOR_NONE; + POP(2); + GOPAIR(mkInt(1)); + } + + POP(2); + int fd = evalint(x); + int events = (tag == T_IO_WAITRDFD) ? IO_POLL_READ : IO_POLL_WRITE; + + /* Set up the waiting thread's state, preparing it to leave the run queue + until an event is ready for it */ + struct mthread *mt = remove_q_head(&runq); + mt->mt_fd = fd; + mt->mt_events = events; + mt->mt_state = ts_wait_io; + + io_register(fd, events, mt); + + resched(mt, ts_wait_io); +#else + CHKARG2NP; + POP(2); + GOPAIR(mkInt(-1)); +#endif + } case T_IO_GETMASKINGSTATE: CHKARG1; /* x = ST */ GOPAIR(mkInt(runq.mq_head->mt_mask)); diff --git a/src/runtime/io_poll.h b/src/runtime/io_poll.h new file mode 100644 index 000000000..b280eb824 --- /dev/null +++ b/src/runtime/io_poll.h @@ -0,0 +1,18 @@ +/* Copyright 2026 Robert Krook + * See LICENSE file for full license. + */ + +/* + * OS-agnostic interface for non-blocking IO polling. + * Platform-specific implementations live in /io_poll_impl.c and + * are included into eval.c via #include "io_poll_impl.c". + * + */ + +#define IO_POLL_READ 1 +#define IO_POLL_WRITE 2 + +void io_init(void); +void io_poll(int timeout_ms, void (*on_ready)(void *cookie)); /* cookie is a struct mthread*, passed in from eval.c */ +void io_register(int fd, int events, void *cookie); +int io_waiter_count(void); diff --git a/src/runtime/unix/config.h b/src/runtime/unix/config.h index d3658ef71..701b86525 100644 --- a/src/runtime/unix/config.h +++ b/src/runtime/unix/config.h @@ -84,6 +84,17 @@ */ #define WANT_OVERFLOW 1 +/* + * Enable non-blocking IO polling. + * Linux uses epoll + * The backend is selected in unix/io_poll_impl.c. + */ +#if defined(__linux__) +#define MHS_IO_POLL 1 +#else +#define MHS_IO_POLL 0 +#endif + /* * Use CPU counters. * Only available on: diff --git a/src/runtime/unix/io_poll_epoll.c b/src/runtime/unix/io_poll_epoll.c new file mode 100644 index 000000000..6f0a9b969 --- /dev/null +++ b/src/runtime/unix/io_poll_epoll.c @@ -0,0 +1,121 @@ +/* Copyright 2026 Robert Krook + * See LICENSE file for full license. + */ + +/* + * epoll backend for non-blocking IO polling (Linux only). + * This file is #included into eval.c via io_poll_impl.c. + * + * A single TCP socket fd can have both a reader and a writer waiting on it + * simultaneously — e.g. a receive loop blocked on EPOLLIN while a send path + * is blocked on EPOLLOUT. The original design used one cookie per fd with + * EPOLL_CTL_ADD, which fails with EEXIST in that scenario. + * + * This version tracks separate read/write cookies per fd in a static table + * and uses EPOLL_CTL_MOD when the fd is already in the epoll set, so both + * directions stay live in a single epoll entry. + */ + +#include +#include +#include + +/* Per-fd waiter state. */ +struct fd_state { + void *read_cookie; /* non-NULL while a thread is waiting for EPOLLIN */ + void *write_cookie; /* non-NULL while a thread is waiting for EPOLLOUT */ +}; + +#define MAX_FDS 256 +static struct fd_state fd_table[MAX_FDS]; /* zero-initialised by the C runtime */ + +static int epoll_fd = -1; +static int io_waiters = 0; + +void +io_init(void) +{ + epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (epoll_fd < 0) { + ERR("epoll_create1 failed"); + } +} + +/* If timeout_ms is 0 this is a non-blocking check; otherwise it blocks up to + timeout_ms milliseconds (or indefinitely when timeout_ms == -1). */ +void +io_poll(int timeout_ms, void (*on_ready)(void *cookie)) +{ + struct epoll_event evs[64]; + int n = epoll_wait(epoll_fd, evs, 64, timeout_ms); + for (int i = 0; i < n; i++) { + int fd = evs[i].data.fd; + uint32_t got = evs[i].events; + void *rc = NULL, *wc = NULL; + + /* Collect whichever cookies fired and clear them from the table. */ + if ((got & (EPOLLIN | EPOLLERR | EPOLLHUP)) && fd_table[fd].read_cookie) { + rc = fd_table[fd].read_cookie; + fd_table[fd].read_cookie = NULL; + io_waiters--; + } + if ((got & (EPOLLOUT | EPOLLERR | EPOLLHUP)) && fd_table[fd].write_cookie) { + wc = fd_table[fd].write_cookie; + fd_table[fd].write_cookie = NULL; + io_waiters--; + } + + /* EPOLLONESHOT disabled the fd after firing. Re-arm for any remaining + interest, or remove the fd from the set entirely. */ + uint32_t remaining = 0; + if (fd_table[fd].read_cookie) remaining |= EPOLLIN; + if (fd_table[fd].write_cookie) remaining |= EPOLLOUT; + + if (remaining) { + struct epoll_event ev = { .events = remaining | EPOLLONESHOT, .data.fd = fd }; + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev); + } else { + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); + } + + if (rc) on_ready(rc); + if (wc) on_ready(wc); + } +} + +/* Register fd to call on_ready(cookie) when the requested event is ready. + events is IO_POLL_READ or IO_POLL_WRITE. + Uses EPOLL_CTL_MOD when the fd already has an interest registered in the + other direction, so both directions coexist in a single epoll entry. */ +void +io_register(int fd, int events, void *cookie) +{ + if (fd < 0 || fd >= MAX_FDS) ERR("io_register: fd out of range"); + + int already_registered = (fd_table[fd].read_cookie != NULL || + fd_table[fd].write_cookie != NULL); + + if (events == IO_POLL_READ) { + fd_table[fd].read_cookie = cookie; + } else { + fd_table[fd].write_cookie = cookie; + } + + uint32_t ev_flags = 0; + if (fd_table[fd].read_cookie) ev_flags |= EPOLLIN; + if (fd_table[fd].write_cookie) ev_flags |= EPOLLOUT; + ev_flags |= EPOLLONESHOT; + + struct epoll_event ev = { .events = ev_flags, .data.fd = fd }; + int op = already_registered ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + if (epoll_ctl(epoll_fd, op, fd, &ev) < 0) { + ERR("io_register epoll_ctl failed"); + } + io_waiters++; +} + +int +io_waiter_count(void) +{ + return io_waiters; +} diff --git a/src/runtime/unix/io_poll_impl.c b/src/runtime/unix/io_poll_impl.c new file mode 100644 index 000000000..574970f94 --- /dev/null +++ b/src/runtime/unix/io_poll_impl.c @@ -0,0 +1,13 @@ +/* Copyright 2026 Robert Krook + * See LICENSE file for full license. + */ + +/* + * IO poll backend for Unix. + * + * Currently Linux only, using epoll. When a second implementation for MACOS is + * available, do some tricks here to dispatch the correct one. + * + */ + +#include "io_poll_epoll.c" diff --git a/src/runtime/windows/io_poll_impl.c b/src/runtime/windows/io_poll_impl.c new file mode 100644 index 000000000..34840c96d --- /dev/null +++ b/src/runtime/windows/io_poll_impl.c @@ -0,0 +1,9 @@ +/* Copyright 2026 Robert Krook + * See LICENSE file for full license. + */ + +/* + * Selects the platform-specific IO poll backend for Windows. + */ + +#include "io_poll_iocp.c" diff --git a/src/runtime/windows/io_poll_iocp.c b/src/runtime/windows/io_poll_iocp.c new file mode 100644 index 000000000..a72c0f554 --- /dev/null +++ b/src/runtime/windows/io_poll_iocp.c @@ -0,0 +1,10 @@ +/* Copyright 2026 Robert Krook + * See LICENSE file for full license. + */ + +/* + * IOCP backend for non-blocking IO polling (Windows). + * Not yet implemented. + */ + +#error "IOCP IO poll backend not yet implemented for Windows" diff --git a/tests/Makefile b/tests/Makefile index 072e65f23..50efdd0b2 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -5,7 +5,7 @@ TMHS=$(MHS) $(MHSTARGET) $(MHSOUTPUT) EVAL=../bin/mhseval +RTS -H1M -RTS .PHONY: test nfib clean errtest alltest cache info -alltest: info test errtest testforimp testforexp testforimpexp testapplysp +alltest: info test errtest testforimp testforexp testforimpexp testapplysp testpipenonblockio # interactivetest cache: @@ -138,6 +138,9 @@ testforimpexp: testapplysp: $(TMHS) -optc -I. ApplySP hsasp.c -oApplySP.exe && ./ApplySP.exe > ApplySP.out && diff ApplySP.ref ApplySP.out +testpipenonblockio: + MHSDIR=.. $(TMHS) -oPipeNonBlockIO.exe PipeNonBlockIO && ./PipeNonBlockIO.exe > PipeNonBlockIO.out && diff PipeNonBlockIO.ref PipeNonBlockIO.out + errtest: sh errtester.sh $(MHS) < errmsg.test diff --git a/tests/PipeNonBlockIO.hs b/tests/PipeNonBlockIO.hs new file mode 100644 index 000000000..0ca1828ad --- /dev/null +++ b/tests/PipeNonBlockIO.hs @@ -0,0 +1,62 @@ +module PipeNonBlockIO where + +import Control.Monad (void) +import Control.Concurrent +import Control.Concurrent.MVar +import Data.Word +import Foreign.C.Error +import Foreign.C.Types +import Foreign.Marshal.Alloc +import Foreign.Ptr +import Foreign.Storable +import System.IO.FD + +foreign import ccall "unistd.h pipe" c_pipe :: Ptr CInt -> IO CInt +foreign import ccall "unistd.h read" c_read :: CInt -> Ptr Word8 -> CInt -> IO CInt +foreign import ccall "unistd.h write" c_write :: CInt -> Ptr Word8 -> CInt -> IO CInt +foreign import ccall "fcntl.h fcntl" c_fcntl_setfl :: CInt -> CInt -> CInt -> IO CInt +foreign import capi "fcntl.h value F_SETFL" fSETFL :: CInt +foreign import capi "fcntl.h value O_NONBLOCK" oNONBLOCK :: CInt + +setNonBlocking :: CInt -> IO () +setNonBlocking fd = void (c_fcntl_setfl fd fSETFL oNONBLOCK) + +blockOnRead :: CInt -> IO Word8 +blockOnRead fd = alloca $ \p -> go p + where + go p = do + r <- c_read fd p 1 + if r == 1 + then peek p + else do + errno <- getErrno + if errno == eAGAIN || errno == eWOULDBLOCK + then waitForReadFD (fromIntegral fd) >> go p + else throwErrno "read" + +reader :: CInt -> MVar () -> IO () +reader fd done = do + putStrLn "reader: waiting for data" + b <- blockOnRead fd + putStrLn $ "reader: received " ++ show b + putMVar done () + +main :: IO () +main = + allocaBytes 8 $ \fds -> do -- space for two CInt file descriptors + c_pipe fds + readFd <- peekByteOff fds 0 :: IO CInt + writeFd <- peekByteOff fds 4 :: IO CInt + setNonBlocking readFd + + done <- newEmptyMVar :: IO (MVar ()) + forkIO $ reader readFd done + yield -- let reader reach waitForReadFD + + threadDelay 1000000 -- let the other thread reach its blocking state + putStrLn "main: writing to pipe" + alloca $ \p -> do + poke p (42 :: Word8) + c_write writeFd p 1 + + takeMVar done diff --git a/tests/PipeNonBlockIO.ref b/tests/PipeNonBlockIO.ref new file mode 100644 index 000000000..f72c1fda9 --- /dev/null +++ b/tests/PipeNonBlockIO.ref @@ -0,0 +1,3 @@ +reader: waiting for data +main: writing to pipe +reader: received 42