Skip to content

W4ilops/DMS

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DMS - Distributed Message Queue System

A production-grade message broker written in C. Provides named FIFO queues with producer/consumer semantics, acknowledgment-based delivery, optional disk persistence, and a length-prefixed binary protocol.

Architecture

DMS follows a central-broker model. A single server process manages a registry of named queues. Clients connect over TCP, perform a version handshake, and then produce or consume messages. Each message is delivered to exactly one consumer (competing consumers, round-robin style). Messages remain in the queue until the consumer sends an explicit ACK. A NACK returns the message for redelivery. Optional persistence writes queue state to disk so messages survive broker restarts.

Protocol

Binary, length-prefixed. All multi-byte integers in network byte order.

Wire format

Offset  Size   Field
0       4      Magic (0xDEADBEEF)
4       4      Payload length (bytes)
8       1      Message type
9       1      Protocol version
10      2      Status code (responses only)
12      4      Message ID
16      N      Payload (variable, up to 4096 bytes)

Message types

Type Direction Description
0x01 C->S HANDSHAKE
0x02 S->C HANDSHAKE_ACK
0x04 S->C HANDSHAKE_NACK
0x11 C->S CREATE_QUEUE
0x12 S->C CREATE_QUEUE_OK
0x13 C->S DELETE_QUEUE
0x14 S->C DELETE_QUEUE_OK
0x15 C->S LIST_QUEUES
0x16 S->C LIST_QUEUES_OK
0x21 C->S PRODUCE
0x22 S->C PRODUCE_OK
0x31 C->S CONSUME (blocking)
0x34 S->C DELIVER
0x41 C->S ACK
0x42 S->C ACK_OK
0x43 C->S NACK
0x44 S->C NACK_OK
0x51 Any DISCONNECT
0x52 Any DISCONNECT_OK
0xFE S->C ERROR

Error codes

0=OK, 1=QUEUE_FULL, 2=QUEUE_NOT_FOUND, 3=QUEUE_EXISTS, 4=TIMEOUT, 5=PROTOCOL_ERROR, 6=BAD_MAGIC, 7=VERSION_MISMATCH, 8=PAYLOAD_TOO_LARGE, 9=INVALID_TYPE, 10=INTERNAL.

Build

Requires GCC with C11 and pthreads support. No external dependencies.

make

Or manually:

gcc -Wall -Wextra -Werror -pthread -O2 -std=c11 -o server server.c
gcc -Wall -Wextra -Werror -pthread -O2 -std=c11 -o client client.c

Usage

Start the broker

./server                           # defaults: port 9090, no persistence
./server -p 8080 -d 5000 -P       # port 8080, depth 5000, persistence on
./server -l 0                      # debug logging

Options:

  • -p PORT -- listen port (default: 9090)
  • -d DEPTH -- max messages per queue (default: 10000)
  • -t SECONDS -- consume timeout (default: 30)
  • -P -- enable disk persistence
  • -D DIR -- persistence directory (default: ./dms_data)
  • -l LEVEL -- log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR

Produce a message

./client produce -q orders -m "order #42"

Consume a message

./client consume -q orders         # consume one message
./client consume -q orders -c      # continuous consume (Ctrl+C to stop)

Queue management

./client create -q orders
./client delete -q orders
./client list

Stress test

./client stress -q bench -n 1000

Smoke test

make test

Concurrency model

  • One thread per client connection (detached pthreads).
  • Global queue registry protected by registry_lock mutex.
  • Each queue has its own mutex and condition variables (not_empty, not_full).
  • Lock ordering: registry_lock -> queue.lock (never reversed).
  • Blocking consume uses pthread_cond_timedwait instead of polling.
  • Atomic counters for delivery IDs and statistics.
  • SIGPIPE ignored to handle disconnected clients safely.

Persistence

When started with -P, the broker writes queue state to disk on every produce and ACK. On restart, queues are restored from .dmsq files in the data directory. Persistence is best-effort: a failure to write does not fail the produce operation (logged as a warning).

File format per queue: [count:4][delivery_id:4][data_length:4][data:N]...

Known limitations

  • No TLS/authentication. Designed for trusted network environments.
  • Single-server only. No clustering or replication.
  • Persistence is full-rewrite per operation, not append-only WAL. Acceptable for moderate throughput; a WAL would improve write performance.
  • Queue deletion while consumers are blocked may cause those consumers to receive an error on their next operation.
  • Maximum 128 concurrent queues, 256 concurrent clients, 4096-byte payloads. All configurable via constants in protocol.h.
  • No message TTL or dead-letter queue (would be a natural extension).
  • Thread-per-client model limits scalability to hundreds of concurrent connections. An epoll-based event loop would support thousands.

License

This project is licensed under the MIT License.

Contact

PGP Public Key

-----BEGIN PGP PUBLIC KEY BLOCK-----

mDMEaX31qRYJKwYBBAHaRw8BAQdA4NAOMdNRqRdEl5VTzEDgWh9PbzWjMk/QZ7Do
htbIPtS0GHdhaWwgPHdhaWxAdHV0YW1haWwuY29tPoiZBBMWCgBBFiEE2hDNLVN+
cOGq0wcon7QDDWC8ApoFAml99akCGwMFCQWjmoAFCwkIBwICIgIGFQoJCAsCBBYC
AwECHgcCF4AACgkQn7QDDWC8AppwpgEApF5DddWsIJ0Q+rbp+DVax9mw/Bdnj1ip
qKWX5MGqeLMA/3pm620peWjlP113AwPcKrfgPB/BJcNwc8DoYUVAMDMMuDgEaX31
qRIKKwYBBAGXVQEFAQEHQDC5PkpGUHv1Pd0rObwaqHg2rzi6xGoZMTp35PqnPMJJ
AwEIB4h+BBgWCgAmFiEE2hDNLVN+cOGq0wcon7QDDWC8ApoFAml99akCGwwFCQWj
moAACgkQn7QDDWC8ApqXgQEA1eoTiQbFX17t3PeIL1Bxzz2jZfuMEHu0aAkQ+MOF
JCMBAMKcc4AfCxQgaj9nHoW4jYs7VCap4+F2R8nN/7JLbcAG
=Ln+f
-----END PGP PUBLIC KEY BLOCK-----

SSH Public Key

ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJum3lcA0XQXBf93nSxYZAIOkfdqTmeAIeP4R6HWqfdw

About

Distributed Message Queue System

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors