diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3550a30 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.github/workflows/build_nix.yml b/.github/workflows/build_nix.yml new file mode 100644 index 0000000..2a45c1f --- /dev/null +++ b/.github/workflows/build_nix.yml @@ -0,0 +1,33 @@ +name: CI + +on: + push: + branches: [ "main", "hopperx" ] + pull_request: + branches: [ "main", "hopperx" ] + +jobs: + build-x86_64: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Install Nix + uses: cachix/install-nix-action@v26 + + - name: Build Hopper x86_64 + run: nix build .#hopper.cross-x86_64-linux + + build-aarch64: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Install Nix + uses: cachix/install-nix-action@v26 + + - name: Build Hopper aarch64 + run: nix build .#hopper.cross-aarch64-linux + diff --git a/.gitignore b/.gitignore index adce25a..d24df54 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,16 @@ -pipes/ +.cache/ +.direnv/ +build/ +compile_commands.json +result + +# Python-generated files __pycache__/ -dist/ +*.py[oc] build/ -hopper.egg-info/ -tmplog.txt -.gdb_history +dist/ +wheels/ +*.egg-info + +# Virtual environments +.venv diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 72f09a5..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "Run Hopper Server", - "type": "debugpy", - "request": "launch", - "module": "hopper.server", - "args": ["pipes"] - } - ] -} \ No newline at end of file diff --git a/README.md b/README.md index 2de9ca2..9a05d42 100644 --- a/README.md +++ b/README.md @@ -1,191 +1,9 @@ # hopper -Pipe multiplexer for internal communication. +Hopper is a general-purpose, stream-oriented, broadcast IPC system for Linux systems +systems. -Hopper is the core part of the new communication system that ties together -various systems within the (updated) RoboCon brains. Hopper is only relevant -in brains for 2026 (and perhaps later) competitions. Where applicable, Hopper -is a primarily dependency for Shepherd and the robot library, but is also -needed for other smaller services. In the future, Hopper will be a dependency -of Wardog (unfinished as of 2026). +## License -Hopper's primary purpose is to provide a multiplexer for named pipes, -specifically, FIFOs. I (OldUser101, Nathan Gill) may use the terms -"FIFO", "pipe", and "named pipe" interchangeably. If you really want to, -go and research the differences between these terms. In simple terms, -Hopper is a service that acts as a one-way broadcaster between various -data channels. In even simpler terms, one program can send a message, and -any number of other programs which are listening for it, can get the message. -It is functionally similar to MQTT (which was originally discussed prior to -Hopper's design in late 2025, and Hopper's inspiration), but doesn't touch -the network stack. The advantage of this is lower latency compared to the -network-based MQTT. The primary disadvantage is that external clients -(outside the brain) cannot communicate with the system directly, adding -network overhead and complexity for, say, the arena. - -Originally, Hopper was written completely in Python, mainly for -interoperability with other services such as Shepherd, which (at the time of -writing) is also written in Python. Hopper is formed of multiple modules. -Each module of Hopper will be explained briefly, and in detail later. - -The first part is the `server` module, which is a standalone program -that handles the multiplexing between named pipes in a directory. Due to -potential latency issues, and the sheer overhead of Python, the server module -was rewritten in C, during the development of the (still unfinished) Wardog -hardware server. - -The second is the `client` module, which provides Python wrappers around Hopper -communication. In effect, it provides basic functions to allow Python clients -to open, read, write, and close named pipes in such a way that the Hopper -server can use them. - -The third module is named `common`. This module provides common **Python**, -functionality, such as named pipe handling. Ironically, since the rewrite of -the Hopper server, the `common` module only provides functionality for the -`client` module, rather than being shared, as the name suggests. - -Finally, the `util` module consists purely of programs that can be used to -test the functionality of both the Hopper `server`, and `client` modules. -Like `common`, it is badly named, and should probably be called `tests`, -but I can't be bothered to rename it. - -## `server` - -The `server` module is the standalone pipe multiplexer server that facilitates -Hopper itself. It is (now) written in C. If you don't know POSIX and/or Linux -APIs in general, this section may seem complicated. This section focuses on the -current C-based Hopper server, which improves upon the Python version (which is -largely undocumented). The core focus of the rewrite was significantly reducing -the latency when sending/reciving data through the server. This is critical for -both system stability, and the planned Wardog hardware server, which requires -near-zero latency for precise hardware timing. - -In the Python version, -a single message would have an average latency of around 0.25 seconds, for -data buffers <1 KiB. Above this size, delays of seconds or longer could occur. -This was not only due to the overhead of Python, but also inefficiencies and -filesystem constraints. The primary part of this was that pipes were opened -in non-blocking mode, since the server was single threaded, and blocking is -non-ideal. Because of this, a 0.25 second delay was added to prevent the -process from consuming all system resources, constantly, and crashing Shepherd. - -**OUT OF DATE**:The way the current Hopper server achieves this near-zero latency, and low -system resource usage, involves the use of Linux and POSIX APIs that are not -trivially exposed in Python. More specifically, `epoll` is used to block -the server when no data needs to be transferred, while keeping file descriptors -open in non-blocking mode for other purposes; the use of `splice`, `tee`, and -intermediate pipes mean that the transferred data buffers are never copied -into user space, allowing the kernel to effectively manage them. In effect, -the Hopper server doesn't actually copy any data itself, reducing latency -from the copy process, and allowing for the 1 KiB limit to be increased to -1 MiB in a single transfer. - -**UPDATED**: Due to complexities and issues with using kernel buffers, a simple -ring buffer is used instead, which is slightly slower, but probably fine -for out use case. - -For coordinating which FIFOs need to receive what data, Hopper uses a filename -format system. - -`I/O__` - -- `I/O`, input or output pipe. This is relative to the server, and is a little -counterintuitive. An client providing an "input" pipe is actually *sending* -through the Hopper server, not receiving it. - -- ``, the handler ID. This should really be called the channel -identifier, as it controls the group of FIFOs that data is shared between. - -- ``, a unique name for the client. - -For example, a FIFO with name `O_log_helper`, is an output pipe, that will -receive data from the Hopper server. It will receive all data from FIFOs that -have the matching handler `log` (for log messages), e.g. `I_log_robot` (logs -sent from usercode, the robot library). It is given the unique name `helper`. -This name corresponds to the `helper.py` service, provided alongside Shepherd. -As the name suggests, this FIFO will recieve logs, which need to be sent to -Sheep, and the arena. - -Neither the handler ID or client name can contain underscores, as it messes -with the format system. However, this may not be checked by the rewritten -server? - -The handler ID was originally given its name in the older Python server. -In that system, a handler was a particular class that processed data -sent through a pipe, invoked by the server. These handlers could manipulate -the data in transit, such as appending timestamps, or saving to a separate -file. The handler concept was ditched in the server rewrite, but the name, -and old values, still remain. - -Handler IDs, and their respective internal mappings are defined in -`handler.h`, and `handler.c`. - -A better way to identify channels of FIFOs could be a directory structure -like this: - -``` -/home/pi/pipes -| -----log -| | -| ----in -| | | -| | ----robot -| | | -| | ----runner -| | | -| | ... -| | -| ----out -| | -| ----helper -| | -| ----shepherd -| | -| ... -| -... -``` - -Rather than using filenames, the standard directory based model could be used. -This is much cleaner, and probably easier to understand, than the current -model. - -The new Hopper server also uses `inotify` to detect when FIFOs and directories -change, as well as other mechanisms. - -If you couldn't figure out already, the Hopper server takes a single argument: - -- ``, a path pointing to the directory of FIFOs to multiplex. - -In practice, this is only ever `/home/pi/pipes`. It you ever get a fault where -the brain boots, the Wi-Fi network is operational, but doesn't get to flashy, -and Shepherd doesn't work, try SSHing and creating this directory, as -Hopper doesn't create it, and the failure of Hopper will prevent anything else -from starting at all. - -## `client` - -The second module of Hopper is the Python client bindings. These are relatively -easy to use. `hopper` should be installed as a Python module available to -use. If not, a `setup.py` is provided. - -The client APIs are relatively easy to use, and basic examples can be found in -the `read.py` and `write.py` files in the `util` module. - -The API also provides a `JsonReader` class, which is used in the development -version of Wardog, and simplifies reading JSON from Hopper. If you want -examples of this, check out Wardog. - -## `common` - -This module provides functionality to the Hopper `client`. This was previously -used by the `server` module, before the rewrite. - -The `common` module is almost entirely re-exported by the `client` module, -so you should look at the documentation for that instead. - -## `util` - -The `util` module is (hopefully) not something you need documentation for, if -you've read everything above. +Hopper is licensed under the BSD 2-Clause license, see [LICENSE](./LICENSE) +for details. diff --git a/build.sh b/build.sh deleted file mode 100755 index b22c3d1..0000000 --- a/build.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash - -set -e - -CC=gcc - -echo "[*] Building Hopper server..." - -mkdir -p build/ -$CC -o build/hopper.server hopper/server/server.c hopper/server/pipe.c hopper/server/handler.c -Ihopper/server -Wall -Wextra -g - -echo "[*] Installing Hopper server..." - -hopper_server=$(realpath build/hopper.server) -ln -s "$hopper_server" /usr/bin/hopper.server - -echo "[*] Installing Hopper client..." - -pip install -e . - -echo "[✓] Done." diff --git a/client/lib/lib.c b/client/lib/lib.c new file mode 100644 index 0000000..bd5bdf5 --- /dev/null +++ b/client/lib/lib.c @@ -0,0 +1,132 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "hopper/hopper.h" + +static int get_open_flags(int flags) { + int open_flags = 0; + + if (flags & HOPPER_IN) + open_flags |= O_WRONLY; + if (flags & HOPPER_OUT) + open_flags |= O_RDONLY; + if (flags & HOPPER_NONBLOCK) + open_flags |= O_NONBLOCK; + + return open_flags; +} + +static char *get_endpoint_path(struct hopper_pipe *pipe) { + char *path = (char *)malloc(sizeof(char) * PATH_MAX); + sprintf(path, "%s/%s", pipe->hopper, pipe->endpoint); + return path; +} + +static char *get_pipe_path(struct hopper_pipe *pipe) { + char *path = (char *)malloc(sizeof(char) * PATH_MAX); + const char *suffix = (pipe->flags & HOPPER_IN ? "in" : "out"); + + // {HOPPER}/{ENDPOINT}/{NAME}.{TYPE} + sprintf(path, "%s/%s/%s.%s", pipe->hopper, pipe->endpoint, pipe->name, + suffix); + + return path; +} + +int hopper_open(struct hopper_pipe *pipe) { + int res = 0; + + if (pipe->name == NULL || pipe->endpoint == NULL || pipe->hopper == NULL) { + // pipe, endpoint name, hopper path are (obviously) required + errno = EINVAL; + return -1; + } + + if (!(pipe->flags & HOPPER_IN) == !(pipe->flags & HOPPER_OUT)) { + // either both or none of the input/output flags are set + errno = EINVAL; + return -1; + } + + char *endpoint_path = get_endpoint_path(pipe); + res = mkdir(endpoint_path, 0755); + free(endpoint_path); + if (res == -1 && errno != EEXIST) { + pipe->fd = -1; + return -1; + } + + char *pipe_path = get_pipe_path(pipe); + if (mkfifo(pipe_path, 0660) < 0 && errno != EEXIST) { + // mkfifo failed in some way, preserve errno + res = -1; + goto cleanup; + } + + int open_flags = get_open_flags(pipe->flags); + int fd = open(pipe_path, open_flags); + if (fd < 0) { + // preserve errno if open fails + res = -1; + goto cleanup; + } + + // acquire a file lock on the fifo, we don't want other things using it + if (flock(fd, (pipe->flags & HOPPER_IN ? LOCK_EX : LOCK_SH) | LOCK_NB) != + 0) { + if (errno == EWOULDBLOCK) + errno = EBUSY; // this makes more sense for clients + + int errsv = errno; // preserve errno across close + close(fd); + errno = errsv; + + res = -1; + goto cleanup; + } + + pipe->fd = fd; + +cleanup: + free(pipe_path); + if (res != 0) + pipe->fd = -1; + + return res; +} + +int hopper_close(struct hopper_pipe *pipe) { + if (pipe->fd == -1) + return 0; + + if (flock(pipe->fd, LOCK_UN) != 0) + return -1; + + if (close(pipe->fd) != 0) + return -1; + + pipe->fd = -1; + + return 0; +} + +ssize_t hopper_read(struct hopper_pipe *pipe, void *dst, size_t len) { + ssize_t res = read(pipe->fd, dst, len); + if (res < 0 && errno == EWOULDBLOCK) + return 0; // EWOULDBLOCK isn't an error for non-block pipes + return res; +} + +ssize_t hopper_write(struct hopper_pipe *pipe, void *src, size_t len) { + ssize_t res = write(pipe->fd, src, len); + if (res < 0 && errno == EWOULDBLOCK) + return 0; // EWOULDBLOCK isn't an error for non-block pipes + return res; +} + diff --git a/client/lib/meson.build b/client/lib/meson.build new file mode 100644 index 0000000..05c12c0 --- /dev/null +++ b/client/lib/meson.build @@ -0,0 +1,20 @@ +client_inc = include_directories('.', '../../include') + +libhopper = static_library( + 'hopper', + 'lib.c', + include_directories: client_inc, + pic: true, + install: true, +) + +pkg = import('pkgconfig') + +pkg.generate( + libhopper, + name: 'hopper', + description: 'Hopper client library', + version: meson.project_version(), + subdirs: 'hopper', +) + diff --git a/client/meson.build b/client/meson.build new file mode 100644 index 0000000..c2f563b --- /dev/null +++ b/client/meson.build @@ -0,0 +1 @@ +subdir('lib') diff --git a/client/py/__init__.py b/client/py/__init__.py new file mode 100644 index 0000000..87b6cae --- /dev/null +++ b/client/py/__init__.py @@ -0,0 +1,7 @@ +from .hopper import HopperPipe, HopperPipeType + +__all__ = [ + "HopperPipe", + "HopperPipeType", +] + diff --git a/client/py/__pycache__/hopper.cpython-313.pyc b/client/py/__pycache__/hopper.cpython-313.pyc new file mode 100644 index 0000000..c9d0410 Binary files /dev/null and b/client/py/__pycache__/hopper.cpython-313.pyc differ diff --git a/client/py/hopper.py b/client/py/hopper.py new file mode 100644 index 0000000..8ec85f2 --- /dev/null +++ b/client/py/hopper.py @@ -0,0 +1,97 @@ +import os +import fcntl +import errno + +from enum import Enum + +class HopperPipeType(str, Enum): + IN = "in" + OUT = "out" + +class HopperPipe: + def __init__(self, type: HopperPipeType, name: str, endpoint: str, + hopper: str = "", nonblock: bool = False): + self.type = type + self.name = name + self.endpoint = endpoint + self.hopper = hopper + self.nonblock = nonblock + self.fd = -1 + + def _get_open_flags(self): + flags = 0 + if self.type == HopperPipeType.IN: + flags |= os.O_WRONLY + elif self.type == HopperPipeType.OUT: + flags |= os.O_RDONLY + if self.nonblock: + flags |= os.O_NONBLOCK + return flags + + def _get_endpoint_path(self): + return os.path.join(self.hopper, self.endpoint) + + def _get_path(self): + return os.path.join(self._get_endpoint_path(), f"{self.name}.{str(self.type.value)}") + + def open(self): + if self.name == "" or self.endpoint == "": + raise ValueError("Name and endpoint must be set") + + if self.hopper == "": + hopper = os.getenv("HOPPER_PATH") + if hopper: + self.hopper = hopper + else: + raise ValueError("Hopper path not set, or HOPPER_PATH not available") + + endpoint_path = self._get_endpoint_path() + os.makedirs(endpoint_path, exist_ok=True) + + path = self._get_path() + + try: + os.mkfifo(path, mode=0o660) + except OSError as e: + if e.errno != errno.EEXIST: + raise e + + open_flags = self._get_open_flags() + fd = os.open(path, open_flags) + + try: + fcntl.flock(fd, (fcntl.LOCK_EX if self.type == HopperPipeType.IN else fcntl.LOCK_SH) | fcntl.LOCK_NB); + except OSError as e: + if e.errno == errno.EWOULDBLOCK: + e.errno = EBUSY + errsv = e.errno + os.close(fd) + raise OSError(errno=errsv) + + self.fd = fd + + def close(self): + fcntl.flock(self.fd, fcntl.LOCK_UN) + os.close(self.fd) + self.fd = -1 + + def read(self, len: int): + try: + buf = os.read(self.fd, len) + return buf + except OSError as e: + if e.errno == EWOULDBLOCK: + return b'' + else: + raise e + + def write(self, buf: bytes): + try: + res = os.write(self.fd, buf) + return res + except OSError as e: + if e.errno == EWOULDBLOCK: + return 0; + else: + raise e + diff --git a/daemon/buffer.cpp b/daemon/buffer.cpp new file mode 100644 index 0000000..5612ea6 --- /dev/null +++ b/daemon/buffer.cpp @@ -0,0 +1,159 @@ +#include +#include + +#include "hopper/daemon/buffer.hpp" +#include "hopper/daemon/marker.hpp" +#include "hopper/daemon/pipe.hpp" + +namespace hopper { + +/* BufferMarker */ + +void BufferMarker::seek(size_t offset, size_t max, SeekDirection dir) { + if (dir == SeekDirection::FORWARD) + m_pos = (m_pos + offset) % max; + else + m_pos = ((m_pos > offset) ? m_pos - offset : max - (offset - m_pos)); +} + +/* HopperBuffer */ + +HopperBuffer::HopperBuffer(size_t len) : m_edge(0) { m_buf.resize(len); } + +BufferMarker *HopperBuffer::create_marker() { + auto *m = new BufferMarker(m_edge); + m_markers.push_back(m); + return m; +} + +void HopperBuffer::delete_marker(BufferMarker *marker) { + auto tgt = std::find(m_markers.begin(), m_markers.end(), marker); + if (tgt != m_markers.end()) + m_markers.erase(tgt); +} + +size_t HopperBuffer::write(void *src, size_t len) { + size_t max_len = std::min(len, max_write()); + size_t done_len = 0; + + // Write up to the buffer length, but only if len > buf_len + size_t next_len = std::min((m_buf.size() - m_edge), max_len); + std::memcpy(&m_buf[m_edge], src, next_len); + m_edge = (m_edge + next_len) % m_buf.size(); + + // Enough bytes have been written, return + done_len += next_len; + if (done_len >= max_len) + return done_len; + + // We are guaranteed tp have space for whatever's left + next_len = max_len - next_len; + std::memcpy(&m_buf[m_edge], reinterpret_cast(src) + done_len, + next_len); + m_edge = (m_edge + next_len) % m_buf.size(); + + done_len += next_len; + return done_len; +} + +size_t HopperBuffer::write(HopperPipe *pipe) { + size_t max_len = max_write(); + size_t done_len = 0; + + size_t next_len = std::min((m_buf.size() - m_edge), max_len); + size_t res = pipe->read_pipe(&m_buf[m_edge], next_len); + if (res == (size_t)-1) + // -1 indicates read error + return -1; + + m_edge = (m_edge + res) % m_buf.size(); + done_len += res; + if (res <= next_len) + // read_pipe reads as much as possible up to next_len, so the pipe is + // empty here + return done_len; + + next_len = max_len - next_len; + res = pipe->read_pipe(&m_buf[m_edge], next_len); + if (res == (size_t)-1) + return -1; + + m_edge = (m_edge + res) % m_buf.size(); + done_len += res; + return done_len; +} + +size_t HopperBuffer::read(BufferMarker *m, void *dst, size_t len) { + // Just see the arithmetic in `write`, it's the same here. + + size_t max_len = std::min(len, max_read(m)); + size_t done_len = 0; + + size_t next_len = std::min((m_buf.size() - m->pos()), max_len); + std::memcpy(dst, &m_buf[m->pos()], next_len); + m->seek(next_len, m_buf.size(), SeekDirection::FORWARD); + + done_len += next_len; + if (done_len >= max_len) + return done_len; + + next_len = max_len - next_len; + std::memcpy(reinterpret_cast(dst) + done_len, &m_buf[m->pos()], + next_len); + m->seek(next_len, m_buf.size(), SeekDirection::FORWARD); + + done_len += next_len; + return done_len; +} + +size_t HopperBuffer::read(HopperPipe *pipe) { + BufferMarker *m = pipe->marker(); + size_t max_len = max_read(m); + size_t done_len = 0; + + size_t next_len = std::min((m_buf.size() - m->pos()), max_len); + size_t res = pipe->write_pipe(&m_buf[m->pos()], next_len); + if (res == (size_t)-1) + return -1; + + m->seek(res, m_buf.size(), SeekDirection::FORWARD); + done_len += res; + if (res <= next_len) + return done_len; + + next_len = max_len - next_len; + res = pipe->write_pipe(&m_buf[m->pos()], next_len); + if (res == (size_t)-1) + return -1; + + m->seek(res, m_buf.size(), SeekDirection::FORWARD); + done_len += res; + return done_len; +} + +size_t HopperBuffer::max_write() { + size_t cap = m_buf.size(); + if (m_markers.empty()) + return cap; + + size_t min_dist = cap; + + for (auto *m : m_markers) { + size_t d = (m->pos() - m_edge + cap) % cap; + + if (d == 0) // Marker is at m_edge, full buffer left + d = cap; + + if (d < min_dist) + min_dist = d; + } + + return min_dist; +} + +size_t HopperBuffer::max_read(BufferMarker *m) { + size_t cap = m_buf.size(); + return (m_edge - m->pos() + cap) % cap; +} + +}; // namespace hopper diff --git a/daemon/daemon.cpp b/daemon/daemon.cpp new file mode 100644 index 0000000..37fa2c5 --- /dev/null +++ b/daemon/daemon.cpp @@ -0,0 +1,83 @@ +#include +#include +#include + +#include +#include +#include + +#include "hopper/daemon/daemon.hpp" +#include "hopper/daemon/endpoint.hpp" +#include "hopper/daemon/util.hpp" + +namespace hopper { + +HopperDaemon::HopperDaemon(std::filesystem::path path, int max_events, + int timeout) + : m_max_events(max_events), m_timeout(timeout), m_path(path) { + if (!std::filesystem::exists(path)) { + std::filesystem::create_directories(path); + } + + // just ignore SIGPIPEs + std::signal(SIGPIPE, SIG_IGN); + + if ((m_epoll_fd = epoll_create1(0)) < 0) + throw_errno("epoll_create1"); + + setup_inotify(); +} + +HopperDaemon::~HopperDaemon() { + for (const auto &[_, endpoint] : m_endpoints) + delete endpoint; +} + +void HopperDaemon::process_events(struct epoll_event *events, int n_events) { + for (int i = 0; i < n_events; i++) { + struct epoll_event ev = events[i]; + + // inotify events use 0 as ID + if (ev.data.u64 == 0) { + handle_inotify(); + continue; + } + + uint32_t endpoint_id = (ev.data.u64 >> 40) & 0xFFFFFFFFFF; + if (!m_endpoints.contains(endpoint_id)) + continue; + + HopperEndpoint *endpoint = m_endpoints[endpoint_id]; + + if (ev.events & EPOLLIN) + endpoint->on_pipe_readable(ev.data.u64); + if (ev.events & EPOLLHUP || ev.events & EPOLLERR) + remove_pipe(endpoint, ev.data.u64); + } +} + +int HopperDaemon::run() { + int res = 0; + + std::unique_ptr events( + new struct epoll_event[m_max_events]); + + while (res == 0) { + + int n = epoll_wait(m_epoll_fd, events.get(), m_max_events, m_timeout); + if (n < 0) { + if (errno == EINTR) + continue; + + throw_errno("epoll_wait"); + return -1; + } + + process_events(events.get(), n); + refresh_pipes(); + } + + return res; +} + +}; // namespace hopper diff --git a/daemon/daemon_endpoint.cpp b/daemon/daemon_endpoint.cpp new file mode 100644 index 0000000..5c368de --- /dev/null +++ b/daemon/daemon_endpoint.cpp @@ -0,0 +1,68 @@ +#include "hopper/daemon/daemon.hpp" +#include "hopper/daemon/util.hpp" +#include +#include + +namespace hopper { + +uint32_t HopperDaemon::create_endpoint(const std::filesystem::path &path) { + uint32_t endpoint_id = next_endpoint_id(); + if (endpoint_id == 0) + return 0; + + int inotify_watch_fd = + inotify_add_watch(m_inotify_fd, path.c_str(), IN_CREATE | IN_DELETE); + if (inotify_watch_fd < 0) + return 0; + + auto *endpoint = new HopperEndpoint(endpoint_id, inotify_watch_fd, path); + m_endpoints[endpoint_id] = endpoint; + + std::cout << "CREATE " << *endpoint << std::endl; + + // Open anything that may already exist in the endpoint + for (const auto &dir_entry : std::filesystem::directory_iterator{path}) { + const auto &p = dir_entry.path(); + + PipeType pipe_type = detect_pipe_type(p); + if (pipe_type == PipeType::NONE) + continue; + + HopperPipe *pipe = + (pipe_type == PipeType::IN ? endpoint->add_input_pipe(p) + : endpoint->add_output_pipe(p)); + + if (pipe != nullptr) + add_pipe(pipe); + } + + return endpoint_id; +} + +void HopperDaemon::delete_endpoint(uint32_t id) { + if (!m_endpoints.contains(id)) + return; + + std::cout << "DELETE " << *(m_endpoints[id]) << std::endl; + + delete m_endpoints[id]; + m_endpoints.erase(id); +} + +void HopperDaemon::delete_endpoint(const std::filesystem::path &path) { + for (const auto &[_, endpoint] : m_endpoints) { + if (endpoint->path() == path) { + delete_endpoint(endpoint->id()); + break; + } + } +} + +HopperEndpoint *HopperDaemon::endpoint_by_watch(int watch) { + for (const auto &[_, endpoint] : m_endpoints) + if (endpoint->watch_fd() == watch) + return endpoint; + return nullptr; +} + +}; // namespace hopper diff --git a/daemon/daemon_inotify.cpp b/daemon/daemon_inotify.cpp new file mode 100644 index 0000000..7cbbc2e --- /dev/null +++ b/daemon/daemon_inotify.cpp @@ -0,0 +1,113 @@ +#include "hopper/daemon/daemon.hpp" +#include "hopper/daemon/util.hpp" + +#include +#include +#include + +namespace hopper { + +void HopperDaemon::setup_inotify() { + if ((m_inotify_fd = inotify_init()) < 0) + throw_errno("inotify_init"); + + struct epoll_event inotify_ev = {}; + inotify_ev.events = EPOLLIN; + inotify_ev.data.u64 = 0; + + if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_inotify_fd, &inotify_ev) != 0) + throw_errno("epoll_ctl ADD"); + + if ((m_inotify_root_watch = inotify_add_watch( + m_inotify_fd, m_path.c_str(), + IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_ISDIR)) < 0) + throw_errno("inotify_add_watch"); +} + +void HopperDaemon::handle_root_inotify(struct inotify_event *ev) { + if (ev->mask & IN_DELETE_SELF) { + std::cerr << "Hopper " << m_path << " got deleted, exiting... :(" + << std::endl; + _exit(1); + } + + if (ev->mask & IN_CREATE) { + std::filesystem::path p = m_path; + p /= ev->name; + + if (create_endpoint(p) == 0) + std::cerr << "Endpoint creation failed! Out of IDs?" << std::endl; + } + + if (ev->mask & IN_DELETE) { + std::filesystem::path p = m_path; + p /= ev->name; + + delete_endpoint(p); + } +} + +void HopperDaemon::handle_endpoint_inotify(struct inotify_event *ev, + HopperEndpoint *endpoint) { + if (ev->mask & IN_CREATE) { + std::filesystem::path p = endpoint->path(); + p /= ev->name; + + PipeType pipe_type = detect_pipe_type(p); + if (pipe_type == PipeType::NONE) + return; + + HopperPipe *pipe = + (pipe_type == PipeType::IN ? endpoint->add_input_pipe(p) + : endpoint->add_output_pipe(p)); + + if (pipe != nullptr) + add_pipe(pipe); + } + + if (ev->mask & IN_DELETE) { + std::filesystem::path p = endpoint->path(); + p /= ev->name; + + HopperPipe *pipe = endpoint->pipe_by_path(p); + + if (pipe != nullptr) { + if (pipe->status() == PipeStatus::ACTIVE) + remove_pipe(endpoint, pipe->id()); + + endpoint->remove_by_id(pipe->id()); + } + } +} + +void HopperDaemon::handle_inotify() { + struct inotify_event *iev = reinterpret_cast( + std::malloc(sizeof(struct inotify_event) + NAME_MAX + 1)); + + if (read(m_inotify_fd, iev, sizeof(struct inotify_event) + NAME_MAX + 1) <= + 0) + return; + + if (iev->wd == m_inotify_root_watch) { + handle_root_inotify(iev); + std::free(iev); + return; + } + + HopperEndpoint *endpoint = endpoint_by_watch(iev->wd); + if (endpoint == nullptr) { + std::cout << "No endpoint found for watch ID " << iev->wd << std::endl; + std::free(iev); + return; + } + + handle_endpoint_inotify(iev, endpoint); + + // The endpoint is now closed + if (iev->mask & IN_IGNORED) + delete_endpoint(endpoint->id()); + + std::free(iev); +} + +}; // namespace hopper diff --git a/daemon/daemon_pipe.cpp b/daemon/daemon_pipe.cpp new file mode 100644 index 0000000..7ca6043 --- /dev/null +++ b/daemon/daemon_pipe.cpp @@ -0,0 +1,61 @@ +#include "hopper/daemon/daemon.hpp" +#include "hopper/daemon/util.hpp" +#include + +namespace hopper { + +void HopperDaemon::remove_pipe(HopperEndpoint *endpoint, uint64_t pipe_id) { + PipeType type = (pipe_id & 0x1 ? PipeType::IN : PipeType::OUT); + for (const auto &[id, pipe] : + (type == PipeType::IN ? endpoint->inputs() : endpoint->outputs())) { + + if (pipe->id() != pipe_id) + continue; + + if (epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, pipe->fd(), nullptr) != 0) + throw_errno("epoll_ctl DEL"); + pipe->close_pipe(); + + std::cout << "DOWN " << *pipe << "\n"; + } +} + +void HopperDaemon::add_pipe(HopperPipe *pipe) { + if (pipe == nullptr) + return; + + // Pipe has bad ID or bad FD + if (pipe->id() == 0 || pipe->fd() == -1) + return; + + struct epoll_event ev = {}; + ev.events = (pipe->type() == PipeType::IN ? EPOLLIN | EPOLLHUP | EPOLLET + : EPOLLHUP); + ev.data.u64 = pipe->id(); + + if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, pipe->fd(), &ev) != 0) + throw_errno("epoll_ctl ADD"); + + std::cout << "UP " << *pipe << "\n"; +} + +void HopperDaemon::refresh_pipes() { + // Try to open any inactive pipes again + for (const auto &[_, endpoint] : m_endpoints) { + for (const auto &[id, pipe] : endpoint->inputs()) { + if (pipe->status() == PipeStatus::ACTIVE || !pipe->open_pipe()) + continue; + add_pipe(pipe); + } + for (const auto &[id, pipe] : endpoint->outputs()) { + if (pipe->status() == PipeStatus::ACTIVE || !pipe->open_pipe()) + continue; + add_pipe(pipe); + } + + // Try to empty buffers into pipes + endpoint->flush_pipes(); + } +} + +}; // namespace hopper diff --git a/daemon/endpoint.cpp b/daemon/endpoint.cpp new file mode 100644 index 0000000..4c8d022 --- /dev/null +++ b/daemon/endpoint.cpp @@ -0,0 +1,129 @@ +#include +#include + +#include "hopper/daemon/endpoint.hpp" +#include "hopper/daemon/pipe.hpp" +#include "hopper/daemon/util.hpp" + +namespace hopper { + +HopperEndpoint::HopperEndpoint(uint32_t id, int watch_fd, + std::filesystem::path path) + : m_path(path), m_id(id), m_watch_fd(watch_fd) { + m_name = path.filename(); +} + +HopperEndpoint::~HopperEndpoint() { + for (const auto &[_, pipe] : m_inputs) + delete pipe; + for (const auto &[_, pipe] : m_outputs) + delete pipe; +} + +void HopperEndpoint::on_pipe_readable(uint64_t id) { + if (!m_inputs.contains(id)) + return; + + HopperPipe *pipe = m_inputs[id]; + + size_t res = m_buffer.write(pipe); + if (res == (size_t)-1) + throw_errno("read"); + + std::cout << *pipe << " -> " << res << " bytes\n"; +} + +void HopperEndpoint::flush_pipes() { + for (const auto &[_, pipe] : m_outputs) { + if (pipe->status() == PipeStatus::INACTIVE) + continue; + + size_t res = m_buffer.read(pipe); + if (res > 0) + std::cout << *pipe << " <- " << res << " bytes\n"; + } +} + +HopperPipe *HopperEndpoint::pipe_by_path(const std::filesystem::path &path) { + for (const auto &[_, pipe] : m_outputs) + if (pipe->path() == path) + return pipe; + for (const auto &[_, pipe] : m_inputs) + if (pipe->path() == path) + return pipe; + + return 0; +} + +HopperPipe *HopperEndpoint::add_input_pipe(const std::filesystem::path &path) { + if (!std::filesystem::is_fifo(path)) + return nullptr; + + uint64_t id = next_pipe_id(1); // Type 1 for input + if (id == 0) // ID 0 is never valid + return nullptr; + + HopperPipe *p = new HopperPipe(id, m_name, PipeType::IN, path, nullptr); + m_inputs[id] = p; + + std::cout << "OPEN " << *p << "\n"; + + return p; +} + +HopperPipe *HopperEndpoint::add_output_pipe(const std::filesystem::path &path) { + if (!std::filesystem::is_fifo(path)) + return nullptr; + + BufferMarker *marker = m_buffer.create_marker(); + uint64_t id = next_pipe_id(0); // Type 0 for output + if (id == 0) + return nullptr; + + HopperPipe *p = new HopperPipe(id, m_name, PipeType::OUT, path, marker); + m_outputs[id] = p; + + std::cout << "OPEN " << *p << "\n"; + + return p; +} + +void HopperEndpoint::remove_by_id(uint64_t pipe_id) { + PipeType type = (pipe_id & 0x1 ? PipeType::IN : PipeType::OUT); + + if (type == PipeType::IN && m_inputs.contains(pipe_id)) { + HopperPipe *pipe = m_inputs[pipe_id]; + m_buffer.delete_marker(pipe->marker()); + std::cout << "CLOSE " << *pipe << "\n"; + + delete pipe; + m_inputs.erase(pipe_id); + } else if (type == PipeType::OUT && m_outputs.contains(pipe_id)) { + HopperPipe *pipe = m_outputs[pipe_id]; + m_buffer.delete_marker(pipe->marker()); + std::cout << "CLOSE " << *pipe << "\n"; + + delete pipe; + m_outputs.erase(pipe_id); + } +} + +void HopperEndpoint::remove_input_pipe(const std::filesystem::path &path) { + for (const auto &[id, pipe] : m_inputs) { + if (pipe->path() == path) { + remove_by_id(id); + break; + } + } +} + +void HopperEndpoint::remove_output_pipe(const std::filesystem::path &path) { + for (const auto &[id, pipe] : m_outputs) { + if (pipe->path() == path) { + remove_by_id(id); + break; + } + } +} + +}; // namespace hopper diff --git a/daemon/main.cpp b/daemon/main.cpp new file mode 100644 index 0000000..cd106b3 --- /dev/null +++ b/daemon/main.cpp @@ -0,0 +1,34 @@ +#include +#include +#include + +#include "hopper/daemon/daemon.hpp" + +int main(int argc, char *argv[]) { + try { + if (argc > 2) { + std::cout << "Usage: hopperd " << std::endl; + return 1; + } else if (argc == 2) { + std::cout << "HOPPER " << argv[1] << std::endl; + auto daemon = hopper::HopperDaemon(argv[1]); + return daemon.run(); + } else if (char *p = getenv("HOPPER_PATH")) { + std::cout << "HOPPER " << p << std::endl; + auto daemon = hopper::HopperDaemon(p); + return daemon.run(); + } else { + std::cerr + << "Could not find Hopper instance, try setting HOPPER_PATH, " + "or passing as argument!" + << std::endl; + return 1; + } + } catch (const std::exception &e) { + std::cerr << "Exception: " << e.what() << std::endl; + return 1; + } catch (...) { + std::cerr << "Unknown exception" << std::endl; + return 1; + } +} diff --git a/daemon/meson.build b/daemon/meson.build new file mode 100644 index 0000000..3cbe14c --- /dev/null +++ b/daemon/meson.build @@ -0,0 +1,15 @@ +executable( + 'hopperd', + 'main.cpp', + 'buffer.cpp', + 'pipe.cpp', + 'daemon.cpp', + 'daemon_endpoint.cpp', + 'daemon_inotify.cpp', + 'daemon_pipe.cpp', + 'endpoint.cpp', + 'util.cpp', + include_directories: include_directories('.', '../include'), + install: true, +) + diff --git a/daemon/pipe.cpp b/daemon/pipe.cpp new file mode 100644 index 0000000..3e3db2c --- /dev/null +++ b/daemon/pipe.cpp @@ -0,0 +1,113 @@ +#include +#include + +#include "hopper/daemon/pipe.hpp" +#include "hopper/daemon/util.hpp" + +namespace hopper { + +/* HopperPipe */ + +HopperPipe::HopperPipe(uint64_t id, const std::string &endpoint_name, + PipeType type, std::filesystem::path path, + BufferMarker *marker) + : m_marker(marker), m_type(type), m_path(path), + m_endpoint_name(endpoint_name), m_id(id) { + m_name = path.replace_extension("").filename(); + open_pipe(); +} + +HopperPipe::~HopperPipe() { + if (m_fd != -1) + close(m_fd); +} + +int HopperPipe::open_pipe() { + if (m_status == PipeStatus::ACTIVE) + return 1; + + int fd = open(m_path.c_str(), + (m_type == PipeType::IN ? O_RDONLY : O_WRONLY) | O_NONBLOCK); + if (fd < 0) { + if (m_type == PipeType::OUT && errno == ENXIO) { + // No readers available + m_status = PipeStatus::INACTIVE; + m_fd = -1; + return 0; + } + + m_status = PipeStatus::INACTIVE; + m_fd = -1; + throw_errno("open"); + return 0; + } + + m_fd = fd; + m_status = PipeStatus::ACTIVE; + + return 1; +} + +void HopperPipe::close_pipe() { + if (m_status == PipeStatus::INACTIVE) + return; + + m_status = PipeStatus::INACTIVE; + + if (m_fd != -1) + close(m_fd); +} + +size_t HopperPipe::write_pipe(void *src, size_t len) { + if (m_type == PipeType::IN) + return -1; + + if (len == 0) + return 0; + + size_t done_len = 0; + + while (done_len < len) { + ssize_t res = write(m_fd, reinterpret_cast(src) + done_len, + len - done_len); + + if (res == -1 && (errno == EWOULDBLOCK || errno == EINTR)) + return done_len; + else if (res == -1) { + perror("write"); + return -1; + } + + done_len += res; + } + + return done_len; +} + +size_t HopperPipe::read_pipe(void *dst, size_t len) { + if (m_type == PipeType::OUT) + return -1; + + if (len == 0) + return 0; + + size_t done_len = 0; + + while (done_len < len) { + ssize_t res = read(m_fd, reinterpret_cast(dst) + done_len, + len - done_len); + + if (res == -1 && (errno == EWOULDBLOCK || errno == EINTR)) + return done_len; + else if (res == -1) { + perror("read"); + return -1; + } else if (res == 0) + break; + + done_len += res; + } + + return done_len; +} +}; // namespace hopper diff --git a/daemon/util.cpp b/daemon/util.cpp new file mode 100644 index 0000000..f3638de --- /dev/null +++ b/daemon/util.cpp @@ -0,0 +1,18 @@ +#include "hopper/daemon/util.hpp" + +namespace hopper { + +PipeType detect_pipe_type(const std::filesystem::path &path) { + if (!path.has_extension()) + return PipeType::NONE; + + if (path.extension() == ".in") + return PipeType::IN; + + if (path.extension() == ".out") + return PipeType::OUT; + + return PipeType::NONE; +} + +}; // namespace hopper diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..8a3011e --- /dev/null +++ b/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1768032153, + "narHash": "sha256-6kD1MdY9fsE6FgSwdnx29hdH2UcBKs3/+JJleMShuJg=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "3146c6aa9995e7351a398e17470e15305e6e18ff", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs", + "utils": "utils" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + }, + "utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..a8bf585 --- /dev/null +++ b/flake.nix @@ -0,0 +1,51 @@ +{ + description = ""; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; + utils.url = "github:numtide/flake-utils"; + }; + + outputs = + { + self, + nixpkgs, + utils, + }: + let + systems = [ + "x86_64-linux" + "aarch64-linux" + ]; + in + utils.lib.eachSystem systems ( + system: + let + pkgs = import nixpkgs { inherit system; }; + in + { + packages = { + hopper = { + default = pkgs.callPackage ./nix/package.nix { }; + cross-x86_64-linux = pkgs.pkgsCross.gnu64.pkgsStatic.callPackage ./nix/package.nix { }; + cross-aarch64-linux = pkgs.pkgsCross.aarch64-multiplatform.pkgsStatic.callPackage ./nix/package.nix { }; + }; + }; + + devShell = pkgs.mkShell { + packages = with pkgs; [ + clang-tools + meson + ninja + pkg-config + + python313 + + # one probably wants these too + gdb + valgrind + ]; + }; + } + ); +} diff --git a/hopper/client/__init__.py b/hopper/client/__init__.py deleted file mode 100644 index d60a978..0000000 --- a/hopper/client/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .client import HopperClient -from .reader import * - -__all__ = ["HopperClient", "PipeReader", "JsonReader"] diff --git a/hopper/client/client.py b/hopper/client/client.py deleted file mode 100644 index ee83130..0000000 --- a/hopper/client/client.py +++ /dev/null @@ -1,57 +0,0 @@ -from hopper.common import * - - -class HopperClient: - def __init__(self): - """ - Initialize a new HopperClient. - """ - self.__pipes = [] - - def open_pipe(self, pn, create=True, delete=False, blocking=False, use_read_buffer=False, read_buffer_terminator=b'\n'): - """ - Open a pipe specified by a PipeName `pn`. - """ - pipe = Pipe(pn, create=create, delete=delete, blocking=blocking, - use_read_buffer=use_read_buffer, read_buffer_terminator=read_buffer_terminator) - self.__pipes.append(pipe) - - def close_pipe(self, pn): - """ - Close a pipe specified by a PipeName `pn`. - """ - p = self.get_pipe_by_pipe_name(pn) - if p == None: - raise - - p.close() - self.__pipes.remove(p) - - def get_pipe_by_pipe_name(self, pn): - """ - Return the Pipe object specified by PipeName `pn`. - """ - for p in self.__pipes: - if p.pipe_name == pn: - return p - return None - - def read(self, pn, _buf_size=-1): - """ - Read content from the pipe specified by `pn`. - """ - p = self.get_pipe_by_pipe_name(pn) - if p == None: - raise - - return p.read(_buf_size=_buf_size) - - def write(self, pn, buf): - """ - Write `buf` to the PipeName specified by `pn`. - """ - p = self.get_pipe_by_pipe_name(pn) - if p == None: - raise - - return p.write(buf + b'\n') diff --git a/hopper/client/reader.py b/hopper/client/reader.py deleted file mode 100644 index 54b3b1d..0000000 --- a/hopper/client/reader.py +++ /dev/null @@ -1,69 +0,0 @@ -import json - - -class PipeReader: - def __init__(self, client, pipe_name): - self._HOPPER_CLIENT = client - self._PIPE_NAME = pipe_name - - def read(self): - return self._HOPPER_CLIENT.read(self._PIPE_NAME) - - -class JsonReader(PipeReader): - def __init__(self, client, pipe_name, read_validator=None): - super().__init__(client, pipe_name) - - self.read_validator = read_validator if read_validator else self.default_read_validator - self.tail = "" - - if not self._HOPPER_CLIENT.get_pipe_by_pipe_name(pipe_name).blocking: - print("WARN: Non-blocking reads may crash the brain!") - - @staticmethod - def default_read_validator(_): - return True - - def _try_decode_json(self, s): - decoder = json.JSONDecoder() - idx = 0 - while idx < len(s): - slice = s[idx:].lstrip() - if not slice: - break - offset = len(s[idx:]) - len(slice) - try: - obj, end = decoder.raw_decode(slice) - if self.read_validator(obj): - idx += offset + end - return obj, s[idx:].rstrip() - else: - idx += 1 - except json.JSONDecodeError as e: - idx += 1 - return None, s - - def read(self): - buffer = self.tail - chunk_size = 1024 - max_buffer_size = 1024 * 1024 - - while len(buffer) < max_buffer_size: - try: - chunk = self._HOPPER_CLIENT.read( - self._PIPE_NAME, _buf_size=chunk_size) - if not chunk: - break - - buffer += chunk.decode("utf-8") - - obj, tail = self._try_decode_json(buffer) - - if obj: - self.tail = tail - return obj - - except BlockingIOError: - continue - - return None diff --git a/hopper/common/__init__.py b/hopper/common/__init__.py deleted file mode 100644 index 5e9290f..0000000 --- a/hopper/common/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .pipe import Pipe -from .pipe_type import PipeType -from .pipe_name import PipeName - -__all__ = ["PipeType", "PipeName", "Pipe"] \ No newline at end of file diff --git a/hopper/common/pipe.py b/hopper/common/pipe.py deleted file mode 100644 index 63c3552..0000000 --- a/hopper/common/pipe.py +++ /dev/null @@ -1,143 +0,0 @@ -import os - -from .pipe_name import PipeName - -""" -Pipe class: - Represents a single input/output pipe. - Implements basic read/write operations. -""" - - -class Pipe: - __BUF_SIZE = 1024 - __fd = 0 - __pn = None - __handler = None - - def __init__(self, pn: PipeName, create=False, delete=False, blocking=False, use_read_buffer=False, read_buffer_terminator=b'\n', buffer_size=1024): - self.__pn = pn - self.__create = create - self.__delete = delete - self.__blocking = blocking - self.__use_read_buffer = use_read_buffer - self.__read_buffer_terminator = read_buffer_terminator - self.__BUF_SIZE = buffer_size - - # The read buffer with non-blocking I/O, blocks, but not nicely - if not self.__blocking and self.__use_read_buffer: - print("WARN: Buffered reads with non-blocking I/O can crash the brain!") - - self.__open() - - def __open(self): - if self.__pn == None: - return - - pipe_path = self.__pn.pipe_path - - if not os.path.exists(pipe_path) and self.__create: - os.mkfifo(pipe_path) - elif not os.path.exists(pipe_path): - raise FileNotFoundError(f"Cannot find pipe at '{pipe_path}'.") - - flags = os.O_RDWR - flags |= (0 if self.__blocking else os.O_NONBLOCK) - - self.__fd = os.open(pipe_path, flags) - self.__inode_number = os.stat(pipe_path).st_ino - - def read(self, _buf_size=-1): - if self.__use_read_buffer: - buf = b'' - # Consume greedily, until we reach the terminator - while True: - try: - b = os.read(self.__fd, 1) - buf += b - if b == self.__read_buffer_terminator: - break - except BlockingIOError: - continue - except: - return None - return buf - - try: - buf = os.read( - self.__fd, self.__BUF_SIZE if _buf_size == -1 else _buf_size) - except: - return None - return buf - - def write(self, buf): - try: - os.write(self.__fd, buf) - except: - raise - - def close(self): - if self.__pn == None: - return - - os.close(self.__fd) - if self.__delete: - os.remove(self.__pn.pipe_path) - - def __del__(self): - self.close() - - def set_handler(self, handlers): - if self.__pn == None: - raise ValueError("Bad pipe name") - - try: - self.__handler = handlers[self.__pn.handler_id] - except: - raise - - @property - def type(self): - if self.__pn == None: - return None - return self.__pn.type - - @property - def id(self): - if self.__pn == None: - return None - return self.__pn.id - - @property - def handler_id(self): - if self.__pn == None: - return None - return self.__pn.handler_id - - @property - def pipe_name(self): - if self.__pn == None: - return None - return self.__pn - - @property - def pipe_path(self): - if self.__pn == None: - return None - return self.__pn.pipe_path - - @property - def handler(self): - return self.__handler - - @property - def fd(self): - return self.__fd - - @property - def inode_number(self): - return self.__inode_number - - @property - def blocking(self): - return self.__blocking diff --git a/hopper/common/pipe_name.py b/hopper/common/pipe_name.py deleted file mode 100644 index cfdaf69..0000000 --- a/hopper/common/pipe_name.py +++ /dev/null @@ -1,59 +0,0 @@ -import os - -from .pipe_type import PipeType - -class PipeName: - __type = None - __handler_id = None - __id = None - __root = "" - - def __init__(self, x, root = ""): - self.__root = root - - if type(x) is str: - self.__from_str(x) - elif type(x) is tuple: - self.__from_tuple(x) - else: - raise ValueError("Invalid pipe name object.") - - def __from_str(self, s): - p = s.split("_") - self.__type = (PipeType.INPUT if p[0] == 'I' else PipeType.OUTPUT) - self.__handler_id = p[1] - self.__id = p[2] - - def __from_tuple(self, t): - self.__type = t[0] - self.__handler_id = t[1] - self.__id = t[2] - - def __str__(self): - return f"{'I' if self.__type == PipeType.INPUT else 'O'}_{self.__handler_id}_{self.__id}" - - def __eq__(self, pn): - if not isinstance(pn, PipeName): - return NotImplemented - return (str(pn) == str(self)) and (pn.root_path == self.__root) - - @property - def pipe_path(self): - return os.path.join(self.__root, str(self)) - - @property - def type(self): - return self.__type - - @property - def handler_id(self): - return self.__handler_id - - @property - def id(self): - return self.__id - - @property - def root_path(self): - return self.__root - diff --git a/hopper/common/pipe_type.py b/hopper/common/pipe_type.py deleted file mode 100644 index dfae38d..0000000 --- a/hopper/common/pipe_type.py +++ /dev/null @@ -1,3 +0,0 @@ -class PipeType: - OUTPUT, RECEIVING = 0, 0 - INPUT, SENDING = 1, 1 \ No newline at end of file diff --git a/hopper/server/handler.c b/hopper/server/handler.c deleted file mode 100644 index fe312e9..0000000 --- a/hopper/server/handler.c +++ /dev/null @@ -1,34 +0,0 @@ -#include -#include - -#include "handler.h" - -/// A mapping between a handler ID and name -struct HandlerMapping { - short handler; - char *name; -}; - -/// An array of handler mappings -static struct HandlerMapping HANDLER_MAP[] = { - HANDLER_GENERIC, HANDLER_LOG, - -#ifdef UNUSED_HANDLERS - HANDLER_FULL_LOG, HANDLER_COMPLETE_LOG, -#endif - - HANDLER_START_BUTTON, HANDLER_STARTER, HANDLER_HARDWARE, -}; - -/// Maps a handler string to an ID number -short map_handler_to_id(char *handler) { - int n_handlers = sizeof(HANDLER_MAP) / sizeof(struct HandlerMapping); - - for (int i = 0; i < n_handlers; i++) - if (!strcmp(handler, HANDLER_MAP[i].name)) - return HANDLER_MAP[i].handler; - - printf("pipe handler '%s' is not recognised\n", handler); - - return HANDLER_UNKNOWN; -} diff --git a/hopper/server/handler.h b/hopper/server/handler.h deleted file mode 100644 index 1f307bb..0000000 --- a/hopper/server/handler.h +++ /dev/null @@ -1,24 +0,0 @@ -#ifndef handler_h_INCLUDED -#define handler_h_INCLUDED - -#define HANDLER_UNKNOWN 0 -#define HANDLER_GENERIC {1, "generic"} -#define HANDLER_LOG {2, "log"} - -#ifdef UNUSED_HANDLERS - -#define HANDLER_FULL_LOG {3, "fulllog"} -#define HANDLER_COMPLETE_LOG {4, "complog"} - -#endif - -#define HANDLER_START_BUTTON {5, "start-button"} -#define HANDLER_STARTER {6, "starter"} -#define HANDLER_HARDWARE {7, "hardware"} - -#define MAX_HANDLER_ID 7 - -short map_handler_to_id(char *handler); - - -#endif // handler_h_INCLUDED diff --git a/hopper/server/pipe.c b/hopper/server/pipe.c deleted file mode 100644 index 66f75ac..0000000 --- a/hopper/server/pipe.c +++ /dev/null @@ -1,241 +0,0 @@ -#define _GNU_SOURCE - -#include -#include -#include -#include -#include -#include - -#include "handler.h" -#include "pipe.h" - -/// Close all file descriptors in a PipeSet object -/// Threads should be joined first -void close_pipe_set(struct PipeSet *set) { - close(set->fd); - set->fd = -1; -} - -/// Free a PipeSet, file descriptors should be closed first -void free_pipe_set(struct PipeSet **set) { - if (!set) - return; - - struct PipeSet *_set = *set; - free(_set->info->id); - free(_set->info); - free(_set); - - // Set the pointer to NULL so it isn't reused - (*set) = NULL; -} - -/// Generate a PipeInfo object from a file path -struct PipeInfo *get_pipe_info(const char *path) { - struct PipeInfo *info = (struct PipeInfo *)malloc(sizeof(struct PipeInfo)); - if (!info) { - perror("malloc"); - return NULL; - } - - info->path = strdup(path); - - char *filename = basename((char *)path); - - info->name = strdup(filename); - - char *type = strtok(filename, "_"); - if (!type) - goto err_bad_fname; - - switch (*type) { - case 'I': - info->type = PIPE_SRC; - break; - case 'O': - info->type = PIPE_DST; - break; - default: - goto err_bad_fname; - } - - char *handler = strtok(NULL, "_"); - if (!handler) - goto err_bad_fname; - - info->handler = map_handler_to_id(handler); - - char *id = strtok(NULL, "_"); - if (!id) - goto err_bad_fname; - - int len = strlen(id) + 1; - - info->id = (char *)malloc(sizeof(char) * len); - if (!info->id) { - free(info); - perror("malloc"); - return NULL; - } - - strcpy(info->id, id); - - return info; - -err_bad_fname: - printf("Badly formatted filename: '%s'\n", filename); - free(info); - return NULL; -} - -/// Try to reopen a previously closed pipe -int reopen_pipe_set(struct PipeSet *set, struct HopperData *data) { - if (set->status == PIPE_ACTIVE) - return 0; - - if ((set->fd = open(set->info->path, - (set->info->type == PIPE_SRC ? O_RDONLY : O_WRONLY) | - O_NONBLOCK)) < 0) { - if (errno == ENXIO && set->info->type == PIPE_DST) { - pipe_set_status_inactive(set, data); - return 1; - } - - pipe_set_status_inactive(set, data); - perror("open"); - return 1; - } - - pipe_set_status_active(set, data); - - return 0; -} - -/// Open a PipeSet object from a file -struct PipeSet *open_pipe_set(const char *path) { - char *path_copy = strdup(path); - - struct PipeSet *set = (struct PipeSet *)malloc(sizeof(struct PipeSet)); - if (!set) { - perror("malloc"); - return NULL; - } - - struct PipeInfo *info = get_pipe_info(path_copy); - if (!info) - return NULL; - - set->info = info; - set->status = PIPE_INACTIVE; - set->next = NULL; - set->next_output = NULL; - set->fd = -1; - set->rd_ptr = NULL; - - return set; -} - -ssize_t nb_read(int fd, void *buf, ssize_t max) { - if (max == 0) - return 0; - - ssize_t bytes_copied = 0; - - while (bytes_copied < max) { - ssize_t res = read(fd, buf + bytes_copied, max - bytes_copied); - if (res == -1 && (errno == EAGAIN || errno == EINTR)) - return bytes_copied; - else if (res == -1) { - perror("read"); - return -1; - } - else if (res == 0) - break; - - bytes_copied += res; - } - - return bytes_copied; -} - -ssize_t nb_write(int fd, void *buf, ssize_t max) { - if (max == 0) - return 0; - - ssize_t bytes_copied = 0; - - while (bytes_copied < max) { - ssize_t res = write(fd, buf + bytes_copied, max - bytes_copied); - if (res == -1 && (errno == EAGAIN || errno == EINTR)) - return bytes_copied; - else if (res == -1) { - perror("write"); - return -1; - } - - bytes_copied += res; - } - - return bytes_copied; -} - -ssize_t read_fifo(struct HopperData *data, struct PipeSet *src) { - void *high_read_ptr = get_high_read_ptr(data, src->info->handler); - void *low_read_ptr = get_low_read_ptr(data, src->info->handler); - void *wr_ptr = data->buffers[src->info->handler]->wr_ptr; - ssize_t max_read = 0; - - if (!low_read_ptr || !high_read_ptr || wr_ptr >= high_read_ptr) - max_read = data->buffers[src->info->handler]->buf_end - wr_ptr; - else if (wr_ptr < low_read_ptr) - max_read = low_read_ptr - wr_ptr; - - if (max_read == 0) - return 0; - - ssize_t res = nb_read(src->fd, wr_ptr, max_read); - if (res == -1) - return -1; - - wr_ptr += res; - - if (wr_ptr >= data->buffers[src->info->handler]->buf_end) - wr_ptr = data->buffers[src->info->handler]->buf; - - data->buffers[src->info->handler]->last_wr_ptr = data->buffers[src->info->handler]->wr_ptr; - data->buffers[src->info->handler]->wr_ptr = wr_ptr; - - if (res > 0) - printf("%d/%s -> %zd bytes\n", src->info->handler, src->info->id, res); - - return res; -} - -ssize_t write_fifo(struct HopperData *data, struct PipeSet *dst) { - ssize_t max_write = 0; - - if (dst->rd_ptr > data->buffers[dst->info->handler]->wr_ptr) - max_write = data->buffers[dst->info->handler]->buf_end - dst->rd_ptr; - else if (dst->rd_ptr < data->buffers[dst->info->handler]->wr_ptr) - max_write = data->buffers[dst->info->handler]->wr_ptr - dst->rd_ptr; - - if (max_write == 0) - return 0; - - ssize_t res = nb_write(dst->fd, dst->rd_ptr, max_write); - if (res == -1 && errno == EPIPE) - pipe_set_status_inactive(dst, data); - if (res == -1) - return -1; - - dst->rd_ptr += res; - - if (dst->rd_ptr >= data->buffers[dst->info->handler]->buf_end) - dst->rd_ptr = data->buffers[dst->info->handler]->buf; - - if (res > 0) - printf("%d/%s <- %zd bytes\n", dst->info->handler, dst->info->id, res); - - return res; -} diff --git a/hopper/server/pipe.h b/hopper/server/pipe.h deleted file mode 100644 index 8ac5a54..0000000 --- a/hopper/server/pipe.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef pipe_h_INCLUDED -#define pipe_h_INCLUDED - -#include - -#include "server.h" - -#define PIPE_SRC 1 -#define PIPE_DST 2 - -#define PIPE_INACTIVE 0 -#define PIPE_ACTIVE 1 - -/// A structure containing file information about a I/O pipe -struct PipeInfo { - short type; - short handler; - char *id; - char *name; - char *path; -}; - -/// A structure for holding I/O pipe data -struct PipeSet { - void *rd_ptr; - int fd; - int inotify_fd; - short status; - struct PipeInfo *info; - struct PipeSet *next; - struct PipeSet *next_output; -}; - -void close_pipe_set(struct PipeSet *set); -void free_pipe_set(struct PipeSet **set); -struct PipeInfo *get_pipe_info(const char *path); -struct PipeSet *open_pipe_set(const char *path); -int reopen_pipe_set(struct PipeSet *set, struct HopperData *data); -ssize_t read_fifo(struct HopperData *data, struct PipeSet *src); -ssize_t write_fifo(struct HopperData *data, struct PipeSet *dst); - -#endif // pipe_h_INCLUDED diff --git a/hopper/server/server.c b/hopper/server/server.c deleted file mode 100644 index 002323f..0000000 --- a/hopper/server/server.c +++ /dev/null @@ -1,499 +0,0 @@ -#define _GNU_SOURCE - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "handler.h" -#include "pipe.h" -#include "server.h" - -// Data value used for inotify in epoll. PipeSet FDs use their pointers, so set -// this to a value that would never be a valid pointer. NULL is used for other -// things, 0x1 is low and probably won't be used by a PipeSet pointer. -#define INOTIFY_DATA 0x1 - -void *get_high_read_ptr(struct HopperData *data, short handler) { - void *rd_ptr = NULL; - - struct PipeSet *dst = data->outputs[handler]; - while (dst) { - if (dst->rd_ptr > rd_ptr) - rd_ptr = dst->rd_ptr; - - dst = dst->next_output; - } - - return rd_ptr; -} - -void *get_low_read_ptr(struct HopperData *data, short handler) { - void *rd_ptr = (void *)((uintptr_t)-1); // This is the maximum value for a ptr - - struct PipeSet *dst = data->outputs[handler]; - while (dst) { - if (dst->rd_ptr < rd_ptr && dst->rd_ptr) - rd_ptr = dst->rd_ptr; - - dst = dst->next_output; - } - - // Set to NULL if unchanged for consistency - if (rd_ptr == (void *)((uintptr_t)-1)) - rd_ptr = NULL; - - return rd_ptr; -} - -void free_pipe_list(struct PipeSet *head) { - struct PipeSet *set; - while (head) { - set = head->next; - free(head); - head = set; - } -} - -void prepend_pipe_list(struct PipeSet **head, struct PipeSet *set) { - set->next = *head; - *head = set; -} - -struct PipeSet *remove_pipe_by_name(struct PipeSet **pipes, struct PipeInfo *info) { - struct PipeSet *tgt; - - struct PipeSet head; - struct PipeInfo head_info; - head_info.name = "_HEAD_"; - head.info = &head_info; - head.next = *pipes; - - tgt = &head; - - while (tgt->next) { - //printf("checking %s\n", tgt->next->info->name); - if (!strcmp(tgt->next->info->name, info->name)) { - struct PipeSet *to_free = tgt->next; - // printf("removing %s, connecting %s to %s\n", to_free->info->name, tgt->info->name, tgt->next->next->info->name); - tgt->next = tgt->next->next; - - // This is redundant if the pipe is not at the start of the list - *pipes = head.next; - return to_free; - } - - tgt = tgt->next; - } - - return NULL; -} - -struct PipeSet *remove_output_pipe_by_name(struct PipeSet **pipes, struct PipeInfo *info) { - struct PipeSet *tgt; - - struct PipeSet head; - struct PipeInfo head_info; - head_info.name = "_HEAD_"; - head.info = &head_info; - head.next_output = *pipes; - - tgt = &head; - - while (tgt->next_output) { - //printf("checking %s\n", tgt->next_output->info->name); - if (!strcmp(tgt->next_output->info->name, info->name)) { - struct PipeSet *to_free = tgt->next_output; - // printf("removing %s, connecting %s to %s\n", to_free->info->name, tgt->info->name, tgt->next_output->next_output->info->name); - tgt->next_output = tgt->next_output->next_output; - - // This is redundant if the pipe is not at the start of the list - *pipes = head.next_output; - return to_free; - } - - tgt = tgt->next_output; - } - - return NULL; -} - -void free_hopper_buffer(struct HopperBuffer *buffer) { - if (!buffer) - return; - - if (buffer->buf) - free(buffer->buf); - - free(buffer); -} - -struct HopperBuffer *alloc_hopper_buffer() { - struct HopperBuffer *buffer = (struct HopperBuffer *)malloc(sizeof(struct HopperBuffer)); - if (!buffer) { - perror("alloc"); - return NULL; - } - - buffer->buf = (void *)malloc(MAX_BUF_SIZE); - if (!buffer->buf) { - perror("alloc"); - free(buffer); - return NULL; - } - - buffer->wr_ptr = buffer->buf; - buffer->last_wr_ptr = buffer->buf; - buffer->buf_len = MAX_BUF_SIZE; - buffer->buf_end = buffer->buf + buffer->buf_len; - - return buffer; -} - -/// Safely free a HopperData structure -void free_hopper_data(struct HopperData *data) { - if (!data) - return; - - free_pipe_list(data->pipes); - - if (data->outputs) - free(data->outputs); - - for (int i = 0; i < MAX_HANDLER_ID + 1; i++) { - if (data->buffers[i]) - free(data->buffers[i]); - } - - if (data->buffers) - free(data->buffers); -} - -void close_hopper_fds(struct HopperData *data) { - if (!data) - return; - - close(data->epoll_fd); - close(data->inotify_fd); - close(data->devnull); - - struct PipeSet *set = data->pipes; - - do { - close(set->fd); - set = set->next; - } while (set); -} - -/// Allocate a new HopperData structure -struct HopperData *alloc_hopper_data() { - struct HopperData *data = - (struct HopperData *)malloc(sizeof(struct HopperData)); - if (!data) - goto err_alloc; - - data->outputs = - (struct PipeSet **)calloc(MAX_HANDLER_ID + 1, sizeof(struct PipeSet *)); - if (!data->outputs) - goto err_alloc; - - data->buffers = (struct HopperBuffer **)calloc(MAX_HANDLER_ID + 1, sizeof(struct HopperData *)); - if (!data->buffers) - goto err_alloc; - - for (int i = 0; i < MAX_HANDLER_ID + 1; i++) { - data->buffers[i] = alloc_hopper_buffer(); - if (!data->buffers[i]) - goto err_alloc; - } - - return data; - -err_alloc: - perror("alloc"); - free_hopper_data(data); - return NULL; -} - -int epoll_add_src_pipe(struct HopperData *data, struct PipeSet *set) { - struct epoll_event ev = {}; - ev.events = EPOLLIN; - ev.data.ptr = (void *)set; - - int res; - if ((res = epoll_ctl(data->epoll_fd, EPOLL_CTL_ADD, set->fd, &ev)) != 0) - perror("epoll_ctl ADD"); - - return res; -} - -int load_new_pipe(struct HopperData *data, const char *path) { - struct PipeSet *set = open_pipe_set(path); - if (!set) - return 1; - - prepend_pipe_list(&data->pipes, set); - - if (set->info->type == PIPE_DST) { - set->next_output = data->outputs[set->info->handler]; - data->outputs[set->info->handler] = set; - - set->rd_ptr = NULL; - } - - printf("added fifo '%s'\n", path); - - reopen_pipe_set(set, data); - - return 0; -} - -void pipe_set_status_inactive(struct PipeSet *set, struct HopperData *data) { - if (set->status == PIPE_INACTIVE) - return; - - if (set->info->type == PIPE_SRC) - if (epoll_ctl(data->epoll_fd, EPOLL_CTL_DEL, set->fd, NULL) != 0) - perror("epoll_ctl DEL"); - - close(set->fd); - set->fd = -1; - set->status = PIPE_INACTIVE; - printf("%d/%s set to INACTIVE\n", set->info->handler, set->info->id); -} - -void pipe_set_status_active(struct PipeSet *set, struct HopperData *data) { - if (set->status == PIPE_ACTIVE) - return; - - if (set->info->type == PIPE_SRC) - epoll_add_src_pipe(data, set); - - if (!set->rd_ptr) // For freshly created pipes - set->rd_ptr = data->buffers[set->info->handler]->wr_ptr; - else // Pipes transitioning from inactive get last message - set->rd_ptr = data->buffers[set->info->handler]->last_wr_ptr; - - set->status = PIPE_ACTIVE; - - printf("%d/%s set to ACTIVE\n", set->info->handler, set->info->id); -} - -int load_pipes_directory(struct HopperData *data) { - struct dirent **entries; - int n; - - if ((n = scandir(data->pipe_dir, &entries, NULL, alphasort)) < 0) { - perror("scandir"); - return 1; - } - - for (int i = 0; i < n; i++) { - struct dirent *entry = entries[i]; - - if (entry->d_type == DT_FIFO || entry->d_type == DT_UNKNOWN) { - - char path[PATH_MAX]; - snprintf(path, sizeof(path), "%s/%s", data->pipe_dir, - entry->d_name); - - load_new_pipe(data, path); - } - free(entry); - } - - free(entries); - - return 0; -} - -int handle_inotify_event(struct HopperData *data) { - struct inotify_event *ev = (struct inotify_event *)malloc( - sizeof(struct inotify_event) + NAME_MAX + 1); - - if (read(data->inotify_fd, ev, - sizeof(struct inotify_event) + NAME_MAX + 1) < 0) { - perror("read"); - return 1; - } - - if (ev->mask & IN_DELETE_SELF) { - // The pipes directory got deleted, hopper can't continue, exit - // immediately. - printf("pipes directory disappeared, exiting...\n"); - _exit(1); - } - - if (ev->mask & IN_CREATE) { - char path[PATH_MAX]; - snprintf(path, sizeof(path), "%s/%s", data->pipe_dir, ev->name); - - if (load_new_pipe(data, path) < 0) { - free(ev); - return 1; - } - } - - if (ev->mask & IN_DELETE) { - char path[PATH_MAX]; - snprintf(path, sizeof(path), "%s/%s", data->pipe_dir, ev->name); - - struct PipeInfo *info = get_pipe_info(path); - - if (!info) - return 1; - - int removed = 0; - - struct PipeSet *to_free = remove_pipe_by_name(&data->pipes, info); - if (to_free && info->type == PIPE_DST) { - // Output pipes need to be removed from a separate linked list - // as well as the main one - to_free = remove_output_pipe_by_name(&data->outputs[info->handler], info); - if (to_free) { - free_pipe_set(&to_free); - removed = 1; - } - } else if (to_free) { - free_pipe_set(&to_free); - removed = 1; - } - - if (removed) - printf("removed %d/%s\n", info->handler, info->name); - else - printf("pipe %d/%s not found\n", info->handler, info->name); - } - - free(ev); - return 0; -} - -int flush_and_scan_pipes(struct HopperData *data) { - struct PipeSet *set = data->pipes; - - while (set) { - if (set->status == PIPE_INACTIVE) - reopen_pipe_set(set, data); - - if (set->status == PIPE_ACTIVE && set->info->type == PIPE_DST) - write_fifo(data, set); - - set = set->next; - } - - return 0; -} - -int run_epoll_cycle(struct HopperData *data) { - struct epoll_event events[MAX_EVENTS]; - int res; - - int n = epoll_wait(data->epoll_fd, events, MAX_EVENTS, 250); - if (n < 0) { - perror("epoll_wait"); - return n; - } - - for (int i = 0; i < n; i++) { - if (events[i].data.u64 == INOTIFY_DATA) { - handle_inotify_event(data); - continue; - } - - struct PipeSet *set = (struct PipeSet *)events[i].data.ptr; - - if (events[i].events & EPOLLIN) - if ((res = read_fifo(data, set)) < 0) - return res; - - if (events[i].events & EPOLLHUP) - pipe_set_status_inactive(set, data); - } - - flush_and_scan_pipes(data); - - return n; -} - -int main(int argc, char *argv[]) { - if (argc < 2) { - printf("Usage: %s \n", argv[0]); - return 1; - } - - int ret = 0; - - // Writing to a closed FIFO gives us a SIGPIPE, this is internally handled - // so ignore it. - signal(SIGPIPE, SIG_IGN); - - struct HopperData *data = alloc_hopper_data(); - if (!data) { - ret = 1; - goto cleanup; - } - - data->pipe_dir = argv[1]; - - if ((data->devnull = open("/dev/null", O_WRONLY)) < 0) { - perror("open"); - ret = 1; - goto cleanup; - } - - if ((data->epoll_fd = epoll_create1(0)) < 0) { - perror("epoll_create"); - ret = 1; - goto cleanup; - } - - if ((data->inotify_fd = inotify_init()) < 0) { - perror("inotify_init"); - ret = 1; - goto cleanup; - } - - struct epoll_event ev = {}; - ev.events = EPOLLIN; - ev.data.u64 = - INOTIFY_DATA; // Ensure u64 is used here, not u32, which could be shared - // with a pointer due to size differences. e.g. ptr could - // be 0x7fffffff{INOTIFY_DATA}, using u64 prevents this!! - - if (epoll_ctl(data->epoll_fd, EPOLL_CTL_ADD, data->inotify_fd, &ev) != 0) { - perror("epoll_ctl ADD"); - ret = 1; - goto cleanup; - } - - if ((data->inotify_root_watch_fd = - inotify_add_watch(data->inotify_fd, data->pipe_dir, - IN_CREATE | IN_DELETE | IN_DELETE_SELF)) < 0) { - perror("inotify_add_watch"); - ret = 1; - goto cleanup; - } - - if (load_pipes_directory(data) != 0) { - ret = 1; - goto cleanup; - } - - int res = 0; - while (res >= 0) { - res = run_epoll_cycle(data); - if (res < 0 && errno == EINTR) - res = 0; - } - -cleanup: - close_hopper_fds(data); - free_hopper_data(data); - return ret; -} diff --git a/hopper/server/server.h b/hopper/server/server.h deleted file mode 100644 index b892157..0000000 --- a/hopper/server/server.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef server_h_INCLUDED -#define server_h_INCLUDED - -#include - -#define MAX_EVENTS 64 -#define MAX_COPY_SIZE 1024 * 1024 -#define MAX_BUF_SIZE MAX_COPY_SIZE - -struct HopperBuffer { - void *buf; - void *buf_end; - void *wr_ptr; - void *last_wr_ptr; - ssize_t buf_len; -}; - -struct HopperData { - struct PipeSet *pipes; - struct PipeSet **outputs; - struct HopperBuffer **buffers; - int n_pipes; - int epoll_fd; - int inotify_fd; - int inotify_root_watch_fd; - int devnull; - const char *pipe_dir; -}; - -void pipe_set_status_inactive(struct PipeSet *set, struct HopperData *data); -void pipe_set_status_active(struct PipeSet *set, struct HopperData *data); -void *get_high_read_ptr(struct HopperData *data, short handler); -void *get_low_read_ptr(struct HopperData *data, short handler); - -#endif // server_h_INCLUDED diff --git a/hopper/util/read.py b/hopper/util/read.py deleted file mode 100755 index e8d4a80..0000000 --- a/hopper/util/read.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/python3 - -import time, sys, os -from hopper.client import * -from hopper.common import * - -root, name = os.path.split(sys.argv[1]) -pn = PipeName(name, root) -c = HopperClient() -c.open_pipe(pn) - -while True: - buf = c.read(pn) - if buf: - print(buf.decode(), end="") - time.sleep(0.5) - diff --git a/hopper/util/write.py b/hopper/util/write.py deleted file mode 100755 index 909cdb8..0000000 --- a/hopper/util/write.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/python3 - -import os, sys -from hopper.client import * -from hopper.common import * - -root, name = os.path.split(sys.argv[1]) -pn = PipeName(name, root) -c = HopperClient() -c.open_pipe(pn) - -while True: - try: - s = input() - b = bytes(s, "utf-8") - c.write(pn, b) - except: - break; \ No newline at end of file diff --git a/include/hopper/daemon/buffer.hpp b/include/hopper/daemon/buffer.hpp new file mode 100644 index 0000000..8e61f52 --- /dev/null +++ b/include/hopper/daemon/buffer.hpp @@ -0,0 +1,41 @@ +#ifndef buffer_hpp_INCLUDED +#define buffer_hpp_INCLUDED + +#include +#include + +#include "hopper/daemon/marker.hpp" + +namespace hopper { + +// Opaque class, see hopper/server/pipe.hpp +class HopperPipe; + +class HopperBuffer { +private: + std::vector m_buf; + std::vector m_markers; + + size_t m_edge; + +public: + HopperBuffer(size_t len = 1024 * 1024); // Use 1 MiB size by default + + BufferMarker *create_marker(); + void delete_marker(BufferMarker *marker); + + size_t write(void *src, size_t len); + size_t write(HopperPipe *pipe); + + size_t read(BufferMarker *marker, void *dst, size_t len); + size_t read(HopperPipe *pipe); + + size_t max_write(); + size_t max_read(BufferMarker *marker); + + size_t edge() { return m_edge; } +}; + +}; // namespace hopper + +#endif // buffer_hpp_INCLUDED diff --git a/include/hopper/daemon/daemon.hpp b/include/hopper/daemon/daemon.hpp new file mode 100644 index 0000000..26859da --- /dev/null +++ b/include/hopper/daemon/daemon.hpp @@ -0,0 +1,66 @@ +#ifndef daemon_hpp_INCLUDED +#define daemon_hpp_INCLUDED + +#include +#include + +#include +#include +#include + +#include "hopper/daemon/endpoint.hpp" +#include "hopper/daemon/pipe.hpp" + +namespace hopper { + +constexpr uint64_t INOTIFY_DATA = 0x1; + +class HopperDaemon { +private: + std::unordered_map m_endpoints; + + // endpoint IDs are 24 bit + uint32_t m_last_endpoint_id = 1; + uint32_t next_endpoint_id() { + if (m_last_endpoint_id > ((1ULL << 24) - 1)) + return 0; + + return m_last_endpoint_id++; + } + + int m_inotify_fd = -1; + int m_inotify_root_watch = -1; + int m_epoll_fd = -1; + + int m_max_events = 64; + int m_timeout = 250; + + std::filesystem::path m_path; + + uint32_t create_endpoint(const std::filesystem::path &path); + void delete_endpoint(const std::filesystem::path &path); + void delete_endpoint(uint32_t id); + + HopperEndpoint *endpoint_by_watch(int watch); + void setup_inotify(); + void handle_inotify(); + void handle_root_inotify(struct inotify_event *ev); + void handle_endpoint_inotify(struct inotify_event *ev, + HopperEndpoint *endpoint); + + void process_events(struct epoll_event *events, int n_events); + void remove_pipe(HopperEndpoint *endpoint, uint64_t pipe_id); + void add_pipe(HopperPipe *pipe); + void refresh_pipes(); + +public: + HopperDaemon(std::filesystem::path path, int max_events = 64, + int m_timeout = 250); + ~HopperDaemon(); + + int run(); +}; + +}; // namespace hopper + +#endif // daemon_hpp_INCLUDED diff --git a/include/hopper/daemon/endpoint.hpp b/include/hopper/daemon/endpoint.hpp new file mode 100644 index 0000000..e11d067 --- /dev/null +++ b/include/hopper/daemon/endpoint.hpp @@ -0,0 +1,75 @@ +#ifndef endpoint_hpp_INCLUDED +#define endpoint_hpp_INCLUDED + +#include + +#include "hopper/daemon/buffer.hpp" +#include "hopper/daemon/pipe.hpp" + +namespace hopper { + +class HopperEndpoint { +private: + std::unordered_map m_inputs; + std::unordered_map m_outputs; + + HopperBuffer m_buffer{}; + + uint64_t m_last_pipe_id = 1; + uint64_t next_pipe_id(uint8_t type) { + if (m_last_pipe_id > ((1ULL << 39) - 1)) + return 0; + + // I can say with high confidence, that we will probably + // never hit this limit. + + // Bit mask for pipe ID (64-bit): + // EEEEEEEEEEEEEEEEEEEEEEEEPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPT + // EEE: Endpoint ID, 24-bit, ~ 16 million endpoints + // PPP: Pipe ID, 39-bit, ~ 550 billion pipes per endpoint + // T: Type, 1 for input, 0 for output + return (((uint64_t)m_id) << 40) | + ((m_last_pipe_id++ << 1) & 0xFFFFFFFFFF) | (type & 0x1); + } + + std::filesystem::path m_path; + std::string m_name; + uint32_t m_id; + int m_watch_fd; + +public: + HopperEndpoint(uint32_t id, int watch_fd, std::filesystem::path path); + ~HopperEndpoint(); + + void on_pipe_readable(uint64_t id); + void flush_pipes(); + + HopperPipe *add_input_pipe(const std::filesystem::path &path); + HopperPipe *add_output_pipe(const std::filesystem::path &path); + void remove_by_id(uint64_t pipe_id); + void remove_input_pipe(const std::filesystem::path &path); + void remove_output_pipe(const std::filesystem::path &path); + + HopperPipe *pipe_by_path(const std::filesystem::path &path); + + const std::filesystem::path &path() { return m_path; } + const std::string &name() { return m_name; } + const std::unordered_map inputs() { + return m_inputs; + } + const std::unordered_map outputs() { + return m_outputs; + } + int id() { return m_id; } + int watch_fd() { return m_watch_fd; } + + friend std::ostream &operator<<(std::ostream &os, + HopperEndpoint &endpoint) { + os << endpoint.m_name << "(" << endpoint.m_id << ")"; + return os; + } +}; + +}; // namespace hopper + +#endif // endpoint_hpp_INCLUDED diff --git a/include/hopper/daemon/marker.hpp b/include/hopper/daemon/marker.hpp new file mode 100644 index 0000000..5cfd0cf --- /dev/null +++ b/include/hopper/daemon/marker.hpp @@ -0,0 +1,28 @@ +#ifndef marker_hpp_INCLUDED +#define marker_hpp_INCLUDED + +#include + +namespace hopper { + +enum SeekDirection { + FORWARD, + REVERSE, +}; + +class BufferMarker { +private: + size_t m_pos; + +public: + BufferMarker(size_t pos = 0) : m_pos(pos) {} + + void seek(size_t offset, size_t max, + SeekDirection dir = SeekDirection::FORWARD); + + size_t pos() { return m_pos; } +}; + +}; // namespace hopper + +#endif // marker_hpp_INCLUDED diff --git a/include/hopper/daemon/pipe.hpp b/include/hopper/daemon/pipe.hpp new file mode 100644 index 0000000..d733a6c --- /dev/null +++ b/include/hopper/daemon/pipe.hpp @@ -0,0 +1,63 @@ +#ifndef pipe_hpp_INCLUDED +#define pipe_hpp_INCLUDED + +#include +#include + +#include "hopper/daemon/marker.hpp" + +namespace hopper { + +enum PipeType { + NONE, + IN, + OUT, +}; + +enum PipeStatus { + ACTIVE, + INACTIVE, +}; + +class HopperPipe { +private: + BufferMarker *m_marker = nullptr; + + std::string m_name; + PipeStatus m_status = PipeStatus::INACTIVE; + PipeType m_type; + std::filesystem::path m_path; + + const std::string &m_endpoint_name; + + int m_fd = -1; + uint64_t m_id; + +public: + HopperPipe(uint64_t id, const std::string &endpoint_name, PipeType type, + std::filesystem::path path, BufferMarker *marker = nullptr); + ~HopperPipe(); + + int open_pipe(); + void close_pipe(); + size_t write_pipe(void *src, size_t len); + size_t read_pipe(void *dst, size_t len); + + const std::filesystem::path &path() { return m_path; } + BufferMarker *marker() { return m_marker; } + PipeStatus status() { return m_status; } + PipeType type() { return m_type; } + const std::string &name() { return m_name; } + uint64_t id() { return m_id; } + int fd() { return m_fd; } + + friend std::ostream &operator<<(std::ostream &os, const HopperPipe &pipe) { + os << (pipe.m_type == PipeType::IN ? "+" : "-") << pipe.m_name << "(" + << pipe.m_endpoint_name << ")"; + return os; + }; +}; + +}; // namespace hopper + +#endif // pipe_hpp_INCLUDED diff --git a/include/hopper/daemon/util.hpp b/include/hopper/daemon/util.hpp new file mode 100644 index 0000000..ef061a1 --- /dev/null +++ b/include/hopper/daemon/util.hpp @@ -0,0 +1,24 @@ +#ifndef util_hpp_INCLUDED +#define util_hpp_INCLUDED + +#include "hopper/daemon/pipe.hpp" +#include +#include +#include +#include +#include +#include + +namespace hopper { + +inline void throw_errno(const std::string &msg) { + std::stringstream ss; + ss << msg << ": " << std::strerror(errno); + throw std::system_error(errno, std::generic_category(), ss.str()); +} + +PipeType detect_pipe_type(const std::filesystem::path &path); + +}; // namespace hopper + +#endif // util_hpp_INCLUDED diff --git a/include/hopper/hopper.h b/include/hopper/hopper.h new file mode 100644 index 0000000..f34bc86 --- /dev/null +++ b/include/hopper/hopper.h @@ -0,0 +1,35 @@ +#ifndef hopper_h_INCLUDED +#define hopper_h_INCLUDED + +#include + +#define HOPPER_IN 1 +#define HOPPER_OUT 2 +#define HOPPER_NONBLOCK 4 + +/// Structure representing a Hopper pipe +struct hopper_pipe { + const char *name; + const char *endpoint; + const char *hopper; + int fd; + int flags; +}; + +/// Open a new Hopper pipe specified by `pipe`. -1 is returned on error, and +/// errno is set. +int hopper_open(struct hopper_pipe *pipe); + +/// Close a Hopper pipe previously opened by `hopper_open_pipe`. +int hopper_close(struct hopper_pipe *pipe); + +/// Read up to `len` bytes from a Hopper pipe. Value returned indicates +/// the number of bytes read. -1 is returned on error and errno is set. +ssize_t hopper_read(struct hopper_pipe *pipe, void *dst, size_t len); + +/// Write up to `len` bytes into a Hopper pipe. Value returned indicates +/// the number of bytes written. -1 is returned on error and errno is set. +ssize_t hopper_write(struct hopper_pipe *pipe, void *src, size_t len); + +#endif // hopper_h_INCLUDED + diff --git a/meson.build b/meson.build new file mode 100644 index 0000000..284dd7e --- /dev/null +++ b/meson.build @@ -0,0 +1,18 @@ +project( + 'hopper', + ['c', 'cpp'], + version: '0.1.0', + default_options: ['optimization=2', 'warning_level=3', 'cpp_std=c++20'], +) + + +inc = include_directories('include') + +install_headers( + 'include/hopper/hopper.h', + subdir: 'hopper', +) + +subdir('client') +subdir('daemon') + diff --git a/nix/package.nix b/nix/package.nix new file mode 100644 index 0000000..1fd5aee --- /dev/null +++ b/nix/package.nix @@ -0,0 +1,14 @@ +{ stdenv, meson, ninja, pkg-config, python313 }: + +stdenv.mkDerivation { + name = "hopper"; + + src = ./..; + + nativeBuildInputs = [ + meson + ninja + pkg-config + python313 + ]; +} diff --git a/setup.py b/setup.py index 8c6f345..5abade5 100644 --- a/setup.py +++ b/setup.py @@ -2,9 +2,9 @@ setup( name="hopper", - version="0.1", + version="0.1.0", packages=["hopper"], - + package_dir={"hopper": "client/py"}, author="Nathan Gill", author_email="nathan.j.gill@outlook.com", -) \ No newline at end of file +)