Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions lib/Control/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ threadStatus thr = do
st <- primThreadStatus thr
return $
case st of
0 -> ThreadRunning -- ts_runnable
1 -> ThreadBlocked BlockedOnMVar -- ts_wait_mvar
2 -> ThreadBlocked BlockedOnOther -- ts_wait_time
3 -> ThreadFinished -- ts_finished
4 -> ThreadDied -- ts_died
0 -> ThreadRunning -- ts_runnable
1 -> ThreadBlocked BlockedOnMVar -- ts_wait_mvar
2 -> ThreadBlocked BlockedOnOther -- ts_wait_time
3 -> ThreadFinished -- ts_finished
4 -> ThreadDied -- ts_died
5 -> ThreadBlocked BlockedOnForeignCall -- ts_wait_io

-------------------------------------------------------
-- Just for GHC compatibility.
Expand Down
6 changes: 6 additions & 0 deletions lib/Primitives.hs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ primTryPutMVar = _primitive "IO.tryputmvar"
primTryReadMVar :: MVar a -> IO b {-(Maybe a)-}
primTryReadMVar = _primitive "IO.tryreadmvar"

primWaitWriteFD :: Int -> IO Int
primWaitWriteFD = _primitive "IO.waitwrfd"

primWaitReadFD :: Int -> IO Int
primWaitReadFD = _primitive "IO.waitrdfd"

primThreadDelay :: Int -> IO ()
primThreadDelay = _primitive "IO.threaddelay"

Expand Down
9 changes: 9 additions & 0 deletions lib/System/IO/FD.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module System.IO.FD (waitForReadFD, waitForWriteFD) where

import Primitives

waitForReadFD :: Int -> IO Int
waitForReadFD = primWaitReadFD

waitForWriteFD :: Int -> IO Int
waitForWriteFD = primWaitWriteFD
1 change: 1 addition & 0 deletions lib/base.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ library base
System.Environment
System.Exit
System.IO
System.IO.FD
System.IO.Error
System.IO.MD5
System.IO.PrintOrRun
Expand Down
2 changes: 2 additions & 0 deletions src/MicroHs/Translate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ primTable = [
("bs2fp", _primitive "bs2fp"),
("IO.fork", _primitive "IO.fork"),
("IO.thid", _primitive "IO.thid"),
("IO.waitrdfd", _primitive "IO.waitrdfd"),
("IO.waitwrfd", _primitive "IO.waitwrfd"),
("thnum", _primitive "thnum"),
("IO.throwto", _primitive "IO.throwto"),
("IO.yield", _primitive "IO.yield"),
Expand Down
133 changes: 131 additions & 2 deletions src/runtime/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
#if WANT_SIGINT
#include <signal.h>
#endif
#if MHS_IO_POLL
#include "io_poll.h"
#endif

extern char **environ; /* should probably be behind some WANT_ */

Expand Down Expand Up @@ -126,6 +129,7 @@ int num_ffi;
#define THREAD_DEBUG 0
#endif


#define VERSION "v8.3\n"

#define PRIvalue PRIdPTR
Expand Down Expand Up @@ -542,6 +546,7 @@ enum node_tag { T_FREE, T_IND, T_AP, T_INT, T_INT64, T_DBL, T_FLT32, T_PTR, T_FU
T_WKNEWFIN, T_WKNEW, T_WKDEREF, T_WKFINAL,
T_IO_PP, /* for debugging */
T_IO_STDIN, T_IO_STDOUT, T_IO_STDERR,
T_IO_WAITRDFD, T_IO_WAITWRFD,
T_LAST_TAG,
};

Expand Down Expand Up @@ -899,7 +904,14 @@ dump_tick_table(FILE *f)

enum th_sched { mt_main, mt_resched, mt_raise };
/* The two enums below are known by the Haskell code. Do not change order */
enum th_state { ts_runnable, ts_wait_mvar, ts_wait_time, ts_finished, ts_died };
enum th_state {
ts_runnable,
ts_wait_mvar,
ts_wait_time,
ts_finished,
ts_died,
ts_wait_io, /* not visible to Haskell; must stay after ts_died */
};
enum mask_state { mask_unmasked, mask_interruptible, mask_uninterruptible };

/***************** HANDLER *****************/
Expand All @@ -912,6 +924,9 @@ struct handler {

/***************** THREAD ******************/

#define IO_POLL_WAITING_FOR_NONE (-1)
#define IO_POLL_EVENT_HAS_HAPPENED (-2)

struct mthread {
enum th_state mt_state; /* thread state */
enum mask_state mt_mask; /* making state. */
Expand All @@ -924,6 +939,10 @@ struct mthread {
NODEPTR mt_mval; /* filled after waiting for take/read */
bool mt_mark; /* marked as accessible */
uvalue_t mt_id; /* thread number, thread 1 is the main thread */
#if MHS_IO_POLL
int mt_fd; /* The file descriptor that we are waiting on (will be either IO_POLL_WAITING_FOR_NONE or IO_POLL_EVENT_HAS_HAPPENED) */
int mt_events; /* IO_POLL_READ or IO_POLL_WRITE */
#endif
#if defined(CLOCK_INIT)
CLOCK_T mt_at; /* time to wake up when in threadDelay */
#endif
Expand Down Expand Up @@ -1115,6 +1134,19 @@ add_runq_tail(struct mthread *mt)
add_q_tail(&runq, mt);
}

#if MHS_IO_POLL
/* this is the callback that is sent to the io_poll framework.
* It is invoked when an event a thread is waiting for becomes ready.
*/
static void
io_thread_ready(void *ptr)
{
struct mthread *mt = (struct mthread *)ptr;
mt->mt_fd = IO_POLL_EVENT_HAS_HAPPENED;
add_runq_tail(mt);
}
#endif

struct mthread*
remove_q_head(struct mqueue *q)
{
Expand Down Expand Up @@ -1302,7 +1334,16 @@ yield(void)
check_timeq();
check_thrown(false);
check_sigint();
// printf("yield %p %d\n", runq, (int)stack_ptr);

#if MHS_IO_POLL
if (io_waiter_count() > 0) {
/* Check if any threads blocked on IO can be scheduled. Since we pass in a delay of 0, checking
for the events should not block. */
io_poll(0, io_thread_ready);
}
#endif

// printf("yield %p %d\n", runq, (int)stack_ptr);
/* if there is nothing after in the runq then there is no need to reschedule */
if (!runq.mq_head->mt_queue) {
#if THREAD_DEBUG
Expand Down Expand Up @@ -1347,6 +1388,10 @@ new_thread(NODEPTR root)
mt->mt_mark = false;
mt->mt_num_slices = 0;
mt->mt_id = num_thread_create++;
#if MHS_IO_POLL
mt->mt_fd = IO_POLL_WAITING_FOR_NONE;
mt->mt_events = -1;
#endif
#if defined(CLOCK_INIT)
mt->mt_at = 0; /* delay has not expired */
#endif
Expand Down Expand Up @@ -1599,6 +1644,43 @@ thread_delay(uvalue_t usecs)
void
pause_exec(void)
{
/* End up here if the run queue is empty. If there is no thread waiting for
Comment thread
Rewbert marked this conversation as resolved.
* a delay to expire, we will never resume operation and we are deadlocked. However, if
* we compile with MHS_IO_POLL there might be threads waiting for IO events, so in
* that case we check for them as well. If there is no thread waiting for a delay or an
* IO event, we are deadlocked.
*/
#if MHS_IO_POLL

/* Check for deadlock situation */
if (io_waiter_count() == 0
#if defined(CLOCK_INIT)
&& !timeq.mq_head
#endif
) ERR("deadlock");

/* Loop until at least one thread is runnable.*/
while (!runq.mq_head) {
int timeout_ms = -1; /* block indefinitely if only io_waiters */
#if defined(CLOCK_INIT)
/* If there are threads blocked on delays, recompute the timeout_ms to account
for that. */
if (timeq.mq_head) {
CLOCK_T delta = timeq.mq_head->mt_at - CLOCK_GET();
/* +999 emulates a ceiling function, adding at most 0.999 ms. io_poll wants milliseconds,
not microseconds as delta represents. When delta is < 1000, without +999, we'll truncate
to zero and enter a loop at full CPU speed. */
timeout_ms = (delta > 0) ? (int)((delta + 999) / 1000) : 0;
}
#endif
io_poll(timeout_ms, io_thread_ready);
#if defined(CLOCK_INIT)
check_timeq();
#endif
}

#else /* !MHS_IO_POLL */

#if defined(CLOCK_INIT)
if (timeq.mq_head) {
struct mthread *mt;
Expand Down Expand Up @@ -1636,6 +1718,7 @@ pause_exec(void)
#else /* CLOCK_INIT */
ERR("no clock");
#endif /* CLOCK_INIT */
#endif /* MHS_IO_POLL */
}

/* Interrupt a sleeping thread in a throwTo/threadDelay */
Expand Down Expand Up @@ -1823,6 +1906,10 @@ new_ap(NODEPTR f, NODEPTR a)
return n;
}

#if MHS_IO_POLL
#include "io_poll_impl.c"
#endif

NODEPTR evali(NODEPTR n);

/* If this is non-0 it means that the threading system is active. */
Expand All @@ -1837,6 +1924,10 @@ start_exec(NODEPTR root)
mt->mt_id = MAIN_THREAD; /* make it the main thread in case this is foreign export calling */
main_thread = mt;

#if MHS_IO_POLL
io_init();
#endif

switch(setjmp(sched)) {
case mt_main:
break;
Expand Down Expand Up @@ -2173,6 +2264,8 @@ struct {
{ "binbs1", T_BINBS1 },
{ "unint1", T_UNINT1 },
{ "undbl1", T_UNDBL1 },
{ "IO.waitrdfd", T_IO_WAITRDFD},
{ "IO.waitwrfd", T_IO_WAITWRFD},
#if WANT_INT64
{ "I+", T_ADD64, T_ADD64 },
{ "I-", T_SUB64, T_SUBR64 },
Expand Down Expand Up @@ -5996,6 +6089,42 @@ evali(NODEPTR an)
POP(2);
GOPAIR(mkInt(mt->mt_state));
}
case T_IO_WAITRDFD:
case T_IO_WAITWRFD: {
#if MHS_IO_POLL
CHKARG2NP; /* x = the filedescriptor, y = RealWorld; no pop yet */

/* io_thread_ready sets mt_fd=IO_POLL_EVENT_HAS_HAPPENED when waking the thread.
If we did not do this check we would just register again.

This seems to be how T_IO_THREADDELAY works, with mt_at == -1.
*/
if (runq.mq_head->mt_fd == IO_POLL_EVENT_HAS_HAPPENED) {
runq.mq_head->mt_fd = IO_POLL_WAITING_FOR_NONE;
POP(2);
GOPAIR(mkInt(1));
}

POP(2);
int fd = evalint(x);
int events = (tag == T_IO_WAITRDFD) ? IO_POLL_READ : IO_POLL_WRITE;

/* Set up the waiting thread's state, preparing it to leave the run queue
until an event is ready for it */
struct mthread *mt = remove_q_head(&runq);
mt->mt_fd = fd;
mt->mt_events = events;
mt->mt_state = ts_wait_io;

io_register(fd, events, mt);

resched(mt, ts_wait_io);
#else
CHKARG2NP;
POP(2);
GOPAIR(mkInt(-1));
#endif
}
case T_IO_GETMASKINGSTATE:
CHKARG1; /* x = ST */
GOPAIR(mkInt(runq.mq_head->mt_mask));
Expand Down
18 changes: 18 additions & 0 deletions src/runtime/io_poll.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2026 Robert Krook
* See LICENSE file for full license.
*/

/*
* OS-agnostic interface for non-blocking IO polling.
* Platform-specific implementations live in <platform>/io_poll_impl.c and
* are included into eval.c via #include "io_poll_impl.c".
*
*/

#define IO_POLL_READ 1
#define IO_POLL_WRITE 2

void io_init(void);
void io_poll(int timeout_ms, void (*on_ready)(void *cookie)); /* cookie is a struct mthread*, passed in from eval.c */
void io_register(int fd, int events, void *cookie);
int io_waiter_count(void);
11 changes: 11 additions & 0 deletions src/runtime/unix/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@
*/
#define WANT_OVERFLOW 1

/*
* Enable non-blocking IO polling.
* Linux uses epoll
* The backend is selected in unix/io_poll_impl.c.
*/
#if defined(__linux__)
#define MHS_IO_POLL 1
#else
#define MHS_IO_POLL 0
#endif

/*
* Use CPU counters.
* Only available on:
Expand Down
Loading
Loading