From a4959f2ec2462aa9c85a6a5f3b585fe8db357534 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 10:29:55 -1000 Subject: [PATCH 01/35] Always specify the id as a keyword argument. --- src/mktl/protocol/request.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index 3979feb2..4a08da68 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -127,7 +127,7 @@ def _rep_incoming(self, parts): # allow it to pass, assuming the users know what they're doing. pass - response = message.Message('REP', target, payload, response_id) + response = message.Message('REP', target, payload, id=response_id) pending._complete(response) del self.pending[response_id] @@ -415,7 +415,7 @@ def req_incoming(self, parts): # allow it to pass, assuming the users know what they're doing. pass - request = message.Request(req_type, target, payload, req_id) + request = message.Request(req_type, target, payload, id=req_id) request.prefix = (ident,) payload = None error = None @@ -442,7 +442,7 @@ def req_incoming(self, parts): elif payload.error is None: payload.error = error - response = message.Message('REP', target, payload, req_id) + response = message.Message('REP', target, payload, id=req_id) response.prefix = request.prefix self.send(response) From 257484f9b8de6f51e876bcd4edff272f97f58f05 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 13:08:59 -1000 Subject: [PATCH 02/35] Honor a client request to omit the ACK response. --- src/mktl/daemon.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/mktl/daemon.py b/src/mktl/daemon.py index e50a5f4b..f0577118 100644 --- a/src/mktl/daemon.py +++ b/src/mktl/daemon.py @@ -480,7 +480,13 @@ def req_handler(self, request): will be generated. """ - self.req_ack(request) + try: + ack_requested = request.payload.ack + except: + ack_requested = True + + if ack_requested: + self.req_ack(request) type = request.type target = request.target From c7ad4fd8bf2b04afd216665faa7824d3c097085a Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 13:09:32 -1000 Subject: [PATCH 03/35] Honor a client request to omit the ACK response. --- sbin/mkbrokerd | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sbin/mkbrokerd b/sbin/mkbrokerd index ea28d3cf..f43bd5ac 100755 --- a/sbin/mkbrokerd +++ b/sbin/mkbrokerd @@ -414,7 +414,13 @@ class RequestServer(mktl.protocol.request.Server): will be generated. """ - self.req_ack(request) + try: + ack_requested = request.payload.ack + except: + ack_requested = True + + if ack_requested: + self.req_ack(request) type = request.type target = request.target From f3a58114a680ac33c158fbf2c15796248008123b Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 13:10:01 -1000 Subject: [PATCH 04/35] Allow arbitrary additional arguments in a payload. Use that capability to signal a lack of interest in an ACK response. --- src/mktl/item.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/mktl/item.py b/src/mktl/item.py index 6cf8119c..a802e64c 100644 --- a/src/mktl/item.py +++ b/src/mktl/item.py @@ -684,7 +684,11 @@ def set(self, new_value, wait=True, formatted=False, quantity=False): else: raise ValueError('formatted+quantity arguments must be boolean') - payload = self.to_payload(new_value) + if wait == False: + payload = self.to_payload(new_value, ack=False) + else: + payload = self.to_payload(new_value) + message = protocol.message.Request('SET', self.full_key, payload) self.req.send(message) @@ -821,7 +825,7 @@ def to_format(self, value): return formatted - def to_payload(self, value=None, timestamp=None): + def to_payload(self, value=None, timestamp=None, **kwargs): """ Interpret the provided arguments into a :class:`mktl.protocol.message.Payload` instance; if the *value* is not specified the current value of this :class:`Item` will be @@ -857,11 +861,11 @@ def to_payload(self, value=None, timestamp=None): bulk = value.tobytes() except AttributeError: bulk = None - payload = protocol.message.Payload(value, timestamp) + payload = protocol.message.Payload(value, timestamp, **kwargs) else: shape = value.shape dtype = str(value.dtype) - payload = protocol.message.Payload(None, timestamp, bulk=bulk, shape=shape, dtype=dtype) + payload = protocol.message.Payload(None, timestamp, bulk=bulk, shape=shape, dtype=dtype, **kwargs) return payload From dec3a212befb5c211fde7e8b1ee2177f062c040c Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 13:11:03 -1000 Subject: [PATCH 05/35] Disable the high water mark for request/response; it was coming up with a default of 1000, which was good for about 4000 fire-and-forget requests before the buffer ran out. --- src/mktl/protocol/request.py | 44 ++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index 4a08da68..78d96e73 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -40,6 +40,7 @@ def __init__(self, address, port): self.socket = zmq_context.socket(zmq.DEALER) self.socket.setsockopt(zmq.LINGER, 0) + self.socket.set_hwm(0) self.socket.identity = identity.encode() self.socket.connect(server) @@ -168,18 +169,28 @@ def run(self): https://github.com/zeromq/libzmq/issues/1108 """ - poller = zmq.Poller() - poller.register(self.socket, zmq.POLLIN) - poller.register(self.request_receive, zmq.POLLIN) + incoming = zmq.Poller() + incoming.register(self.socket, zmq.POLLIN) + incoming.register(self.request_receive, zmq.POLLIN) + + outgoing = zmq.Poller() + outgoing.register(self.socket, zmq.POLLOUT) while True: - sockets = poller.poll(10000) # milliseconds - for active, flag in sockets: + inbound_sockets = incoming.poll(10000) # milliseconds + for inbound, flag in inbound_sockets: + + if self.request_receive == inbound: + # Success is assumed on this next polling request. + # A failure will result in lost data on the outbound + # socket; any polling delays here should only occur + # in super high throughput cases, presumably because + # a transmission buffer is full. - if self.request_receive == active: + outbound_sockets = outgoing.poll(1000) self._req_outgoing() - elif self.socket == active: + elif self.socket == inbound: parts = self.socket.recv_multipart() self._rep_incoming(parts) @@ -199,7 +210,15 @@ def send(self, message): self.requests.put(message) self.request_signal.send(b'') - ack = message.wait_ack(self.timeout) + try: + ack_requested = message.payload.ack + except: + ack_requested = True + + if ack_requested: + ack = message.wait_ack(self.timeout) + else: + return if ack == False: error = '%s @ %s:%d: no response received in %.2f sec' @@ -240,6 +259,7 @@ def __init__(self, hostname=None, port=None, avoid=set()): self.hostname = hostname self.socket = zmq_context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 0) + self.socket.set_hwm(0) # If the port is set, use it; otherwise, look for the first available # port within the default range. @@ -358,7 +378,13 @@ def req_handler(self, request): structure of what's happening in the daemon code. """ - self.req_ack(request) + try: + ack_requested = request.payload.ack + except: + ack_requested = True + + if ack_requested: + self.req_ack(request) response = message.Message('REP', target, id=request.id) response.prefix = request.prefix From 9039a28f94ee18f0ceaf39a40718f56b83f8707f Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 13:15:51 -1000 Subject: [PATCH 06/35] Put the client background thread back the way it was, the changes there weren't necessary. --- src/mktl/protocol/request.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index 78d96e73..74f3624e 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -169,28 +169,18 @@ def run(self): https://github.com/zeromq/libzmq/issues/1108 """ - incoming = zmq.Poller() - incoming.register(self.socket, zmq.POLLIN) - incoming.register(self.request_receive, zmq.POLLIN) - - outgoing = zmq.Poller() - outgoing.register(self.socket, zmq.POLLOUT) + poller = zmq.Poller() + poller.register(self.socket, zmq.POLLIN) + poller.register(self.request_receive, zmq.POLLIN) while True: - inbound_sockets = incoming.poll(10000) # milliseconds - for inbound, flag in inbound_sockets: - - if self.request_receive == inbound: - # Success is assumed on this next polling request. - # A failure will result in lost data on the outbound - # socket; any polling delays here should only occur - # in super high throughput cases, presumably because - # a transmission buffer is full. + sockets = poller.poll(10000) # milliseconds + for socket, flag in sockets: - outbound_sockets = outgoing.poll(1000) + if self.request_receive == socket: self._req_outgoing() - elif self.socket == inbound: + elif self.socket == socket: parts = self.socket.recv_multipart() self._rep_incoming(parts) From 99ca2a09795d85cde063ea5fc1b3da79948df696 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 14:01:37 -1000 Subject: [PATCH 07/35] Change the new 'ack' argument to set() to be 'response' instead. --- src/mktl/item.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/mktl/item.py b/src/mktl/item.py index a802e64c..90bd3b28 100644 --- a/src/mktl/item.py +++ b/src/mktl/item.py @@ -647,15 +647,18 @@ def req_set(self, request): return payload - def set(self, new_value, wait=True, formatted=False, quantity=False): + def set(self, new_value, wait=True, response=True, formatted=False, quantity=False): """ Set a new value. Set *wait* to True to block until the request completes; this is the default behavior. If *wait* is set to False, the caller will be returned a :class:`mktl.protocol.message.Request` instance, which has a :func:`mktl.protocol.message.Request.wait` method that can optionally be invoked to block until completion of the request; the wait will return immediately once the request is - satisfied. There is no return value for a blocking request; failed - requests will raise exceptions. + satisfied. Set *response* to False to disable all error handling and + acknowledgements for the request (fire and forget); setting + *response* to False implies *wait* is also False. + There is no return value for a blocking request; failed requests + will raise exceptions. The optional *formatted* and *quantity* options enable calling :func:`set` with either the string-formatted representation or @@ -684,10 +687,11 @@ def set(self, new_value, wait=True, formatted=False, quantity=False): else: raise ValueError('formatted+quantity arguments must be boolean') - if wait == False: - payload = self.to_payload(new_value, ack=False) - else: + if response: payload = self.to_payload(new_value) + else: + payload = self.to_payload(new_value, ack=False) + wait = False message = protocol.message.Request('SET', self.full_key, payload) self.req.send(message) From deb8d753b490fb00f206a19b2bbce642a8bde1fc Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 14:23:56 -1000 Subject: [PATCH 08/35] Trade out the 'ack' field of the payload for 'silent'. --- sbin/mkbrokerd | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sbin/mkbrokerd b/sbin/mkbrokerd index f43bd5ac..86bf0043 100755 --- a/sbin/mkbrokerd +++ b/sbin/mkbrokerd @@ -415,12 +415,14 @@ class RequestServer(mktl.protocol.request.Server): """ try: - ack_requested = request.payload.ack + silent = request.payload.silent except: - ack_requested = True + silent = False - if ack_requested: - self.req_ack(request) + if silent: + return + + self.req_ack(request) type = request.type target = request.target From 1b887423950eacaaf94e7fb9dc426e6d75438c6f Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 14:24:37 -1000 Subject: [PATCH 09/35] Trade out the 'ack' field of the payload for 'silent'. --- src/mktl/daemon.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/mktl/daemon.py b/src/mktl/daemon.py index f0577118..7874c1ca 100644 --- a/src/mktl/daemon.py +++ b/src/mktl/daemon.py @@ -481,11 +481,11 @@ def req_handler(self, request): """ try: - ack_requested = request.payload.ack + silent = request.payload.silent except: - ack_requested = True + silent = False - if ack_requested: + if silent == False: self.req_ack(request) type = request.type @@ -505,7 +505,10 @@ def req_handler(self, request): else: raise ValueError('unhandled request type: ' + type) - return response + if silent: + return None + else: + return response def req_get(self, request): From dcdec27b88ecb3cfea5d7f1c6519f9187d46365f Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 14:24:52 -1000 Subject: [PATCH 10/35] Trade out the response argument in set() for silent. --- src/mktl/item.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/mktl/item.py b/src/mktl/item.py index 90bd3b28..ccdf6180 100644 --- a/src/mktl/item.py +++ b/src/mktl/item.py @@ -647,16 +647,16 @@ def req_set(self, request): return payload - def set(self, new_value, wait=True, response=True, formatted=False, quantity=False): + def set(self, new_value, wait=True, silent=False, formatted=False, quantity=False): """ Set a new value. Set *wait* to True to block until the request completes; this is the default behavior. If *wait* is set to False, the caller will be returned a :class:`mktl.protocol.message.Request` instance, which has a :func:`mktl.protocol.message.Request.wait` method that can optionally be invoked to block until completion of the request; the wait will return immediately once the request is - satisfied. Set *response* to False to disable all error handling and + satisfied. Set *silent* to False to disable all error handling and acknowledgements for the request (fire and forget); setting - *response* to False implies *wait* is also False. + *silent* to True implies *wait* is also False. There is no return value for a blocking request; failed requests will raise exceptions. @@ -687,11 +687,11 @@ def set(self, new_value, wait=True, response=True, formatted=False, quantity=Fal else: raise ValueError('formatted+quantity arguments must be boolean') - if response: - payload = self.to_payload(new_value) - else: - payload = self.to_payload(new_value, ack=False) + if silent: + payload = self.to_payload(new_value, silent=True) wait = False + else: + payload = self.to_payload(new_value) message = protocol.message.Request('SET', self.full_key, payload) self.req.send(message) From 8d1a21dda4b1d7fa9ed57f2cd845b8cdd8b7ce3e Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 14:25:32 -1000 Subject: [PATCH 11/35] Trade out the 'ack' field of the payload for 'silent'. --- src/mktl/protocol/request.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index 74f3624e..eceb743f 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -201,15 +201,15 @@ def send(self, message): self.request_signal.send(b'') try: - ack_requested = message.payload.ack + silent = message.payload.silent except: - ack_requested = True + silent = False - if ack_requested: - ack = message.wait_ack(self.timeout) - else: + if silent: return + ack = message.wait_ack(self.timeout) + if ack == False: error = '%s @ %s:%d: no response received in %.2f sec' args = (message.type, self.address, self.port, self.timeout) @@ -369,12 +369,14 @@ def req_handler(self, request): """ try: - ack_requested = request.payload.ack + silent = request.payload.silent except: - ack_requested = True + silent = False + + if silent: + return - if ack_requested: - self.req_ack(request) + self.req_ack(request) response = message.Message('REP', target, id=request.id) response.prefix = request.prefix From ca3b193c9e9970dc20eeaeeeaa51a7fb895bde3c Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 14:31:18 -1000 Subject: [PATCH 12/35] Added a comment adjacent to the call to req_handler() indicating that the absence of a response could also be requested by the client. --- src/mktl/protocol/request.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index eceb743f..bdbb69c4 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -450,7 +450,8 @@ def req_incoming(self, parts): if payload is None and error is None: # The handler should only return None when no response is # immediately forthcoming-- the handler has invoked some - # other processing chain that will issue a proper response. + # other processing chain that will issue a proper response, + # or the client explicitly requested no response. return if error is not None: From eecd838a260594df940aadc004c42deea9b4058f Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 15:17:39 -1000 Subject: [PATCH 13/35] Docstring tweak. --- src/mktl/daemon.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mktl/daemon.py b/src/mktl/daemon.py index 7874c1ca..22c7ea29 100644 --- a/src/mktl/daemon.py +++ b/src/mktl/daemon.py @@ -476,8 +476,8 @@ def req_config(self, request): def req_handler(self, request): - """ Inspect the incoming request type and decide how a response - will be generated. + """ Inspect the incoming request type and call an appropriate + method to handle that specific request. """ try: From 4bc95ffb82dc58b1621a4f53bddcb277672f99c7 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 3 Feb 2026 15:21:08 -1000 Subject: [PATCH 14/35] Added a comment about potential out-of-order processing for high frequency requests. --- src/mktl/protocol/request.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index bdbb69c4..64441e25 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -483,6 +483,9 @@ def run(self): elif self.socket == active: parts = self.socket.recv_multipart() # Calling submit() will block if a worker is not available. + # Note that for high frequency operations this can result + # in out-of-order handling of requests, for example, if a + # stream of SET requests are inbound for a single item. self.workers.submit(self.req_incoming, parts) From 65b990b033f99005f5b50fcf3102528d7dee8cc9 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 4 Feb 2026 09:41:44 -1000 Subject: [PATCH 15/35] Trade out the 'silent' argument for 'reply' instead. --- sbin/mkbrokerd | 8 +++++--- src/mktl/daemon.py | 12 ++++++------ src/mktl/item.py | 14 +++++++------- src/mktl/protocol/request.py | 16 ++++++++++------ 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/sbin/mkbrokerd b/sbin/mkbrokerd index 86bf0043..01aef9a5 100755 --- a/sbin/mkbrokerd +++ b/sbin/mkbrokerd @@ -415,11 +415,13 @@ class RequestServer(mktl.protocol.request.Server): """ try: - silent = request.payload.silent + reply = request.payload.reply except: - silent = False + reply = True - if silent: + if reply: + pass + else: return self.req_ack(request) diff --git a/src/mktl/daemon.py b/src/mktl/daemon.py index 22c7ea29..a6ee7b29 100644 --- a/src/mktl/daemon.py +++ b/src/mktl/daemon.py @@ -481,11 +481,11 @@ def req_handler(self, request): """ try: - silent = request.payload.silent + reply = request.payload.reply except: - silent = False + reply = True - if silent == False: + if reply: self.req_ack(request) type = request.type @@ -505,10 +505,10 @@ def req_handler(self, request): else: raise ValueError('unhandled request type: ' + type) - if silent: - return None - else: + if reply: return response + else: + return None def req_get(self, request): diff --git a/src/mktl/item.py b/src/mktl/item.py index ccdf6180..f7e26d5b 100644 --- a/src/mktl/item.py +++ b/src/mktl/item.py @@ -647,16 +647,16 @@ def req_set(self, request): return payload - def set(self, new_value, wait=True, silent=False, formatted=False, quantity=False): + def set(self, new_value, wait=True, reply=True, formatted=False, quantity=False): """ Set a new value. Set *wait* to True to block until the request completes; this is the default behavior. If *wait* is set to False, the caller will be returned a :class:`mktl.protocol.message.Request` instance, which has a :func:`mktl.protocol.message.Request.wait` method that can optionally be invoked to block until completion of the request; the wait will return immediately once the request is - satisfied. Set *silent* to False to disable all error handling and + satisfied. Set *reply* to False to disable all error handling and acknowledgements for the request (fire and forget); setting - *silent* to True implies *wait* is also False. + *reply to False implies *wait* is also False. There is no return value for a blocking request; failed requests will raise exceptions. @@ -687,11 +687,11 @@ def set(self, new_value, wait=True, silent=False, formatted=False, quantity=Fals else: raise ValueError('formatted+quantity arguments must be boolean') - if silent: - payload = self.to_payload(new_value, silent=True) - wait = False - else: + if reply: payload = self.to_payload(new_value) + else: + payload = self.to_payload(new_value, reply=False) + wait = False message = protocol.message.Request('SET', self.full_key, payload) self.req.send(message) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index 64441e25..46ae625f 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -201,11 +201,13 @@ def send(self, message): self.request_signal.send(b'') try: - silent = message.payload.silent + reply = message.payload.reply except: - silent = False + reply = True - if silent: + if reply: + pass + else: return ack = message.wait_ack(self.timeout) @@ -369,11 +371,13 @@ def req_handler(self, request): """ try: - silent = request.payload.silent + reply = request.payload.reply except: - silent = False + reply = True - if silent: + if reply: + pass + else: return self.req_ack(request) From 22bd3a3ab634af43867a61f4c83df7e96eb76094 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Fri, 6 Feb 2026 11:44:11 -1000 Subject: [PATCH 16/35] I was trying to realign that background thread with the original contents but I missed a few of the names. --- src/mktl/protocol/request.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index 46ae625f..6a3e06ca 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -175,12 +175,12 @@ def run(self): while True: sockets = poller.poll(10000) # milliseconds - for socket, flag in sockets: + for active, flag in sockets: - if self.request_receive == socket: + if self.request_receive == active: self._req_outgoing() - elif self.socket == socket: + elif self.socket == active: parts = self.socket.recv_multipart() self._rep_incoming(parts) From 3426b8ef88aa26ab233fe57f4e9768a4bddd062e Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Mon, 9 Feb 2026 17:25:54 -1000 Subject: [PATCH 17/35] Use a property for the 'reply' attribute on a Payload. --- src/mktl/protocol/message.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index 6c9bad8c..a2c8ac22 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -375,6 +375,30 @@ def encapsulate(self): return payload + @property + def reply(self): + """ The reply attribute is generally only set to indicate that a + reply is not necessary. Establishing a property to return the + current value allows the exception handling to be done once, + here, and not everywhere the reply attribute might be inspected. + By a happy coincidence, the existence of this property does not + trigger the inclusion of 'reply' in the output of vars(), which + is how the :func:`encapsulate` method determines which local + attributes to include in the final output. + """ + + try: + return self.__reply + except AttributeError: + # Message replies are enabled by default. + return True + + + @reply.setter + def reply(self, new_value): + self.__reply = new_value + + # end of class Payload From b278fa6038bf3de14f44831ba819b0705f6a1545 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Mon, 9 Feb 2026 17:33:26 -1000 Subject: [PATCH 18/35] Mirror the Payload.reply attribute as Message.reply to simplify exception handling. --- src/mktl/protocol/message.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index a2c8ac22..d8f63cd2 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -128,6 +128,21 @@ def _finalize(self): self._parts = parts + @property + def reply(self): + """ The payload reply attribute is mirrored here for the sake of + simplifying exception handling elsewhere in the mKTL code base. + Otherwise, the other code would need to catch the AttributeError + thrown when the local payload is None. + """ + + try: + return self.payload.reply + except AttributeError: + # There is no payload, and message replies are enabled by default. + return True + + # end of class Message From 374da6c8f9c7116062ce5e453679f518a0a2ff7e Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Mon, 9 Feb 2026 17:34:31 -1000 Subject: [PATCH 19/35] Payload.reply is now a property (mirrored to Message.reply) to simplify exception handling. --- sbin/mkbrokerd | 7 +------ src/mktl/daemon.py | 5 +---- src/mktl/protocol/request.py | 14 ++------------ 3 files changed, 4 insertions(+), 22 deletions(-) diff --git a/sbin/mkbrokerd b/sbin/mkbrokerd index 01aef9a5..61833354 100755 --- a/sbin/mkbrokerd +++ b/sbin/mkbrokerd @@ -414,12 +414,7 @@ class RequestServer(mktl.protocol.request.Server): will be generated. """ - try: - reply = request.payload.reply - except: - reply = True - - if reply: + if request.reply: pass else: return diff --git a/src/mktl/daemon.py b/src/mktl/daemon.py index a6ee7b29..1c72abe8 100644 --- a/src/mktl/daemon.py +++ b/src/mktl/daemon.py @@ -480,10 +480,7 @@ def req_handler(self, request): method to handle that specific request. """ - try: - reply = request.payload.reply - except: - reply = True + reply = request.reply if reply: self.req_ack(request) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index 6a3e06ca..4f0bf286 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -200,12 +200,7 @@ def send(self, message): self.requests.put(message) self.request_signal.send(b'') - try: - reply = message.payload.reply - except: - reply = True - - if reply: + if message.reply: pass else: return @@ -370,12 +365,7 @@ def req_handler(self, request): structure of what's happening in the daemon code. """ - try: - reply = request.payload.reply - except: - reply = True - - if reply: + if request.reply: pass else: return From 9539e02a8a8a05b188f6a93d1d35b7a87411aa46 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Tue, 21 Apr 2026 16:35:57 -1000 Subject: [PATCH 20/35] Sketching what it might look like to use Message-centric flags for 'no reply' behavior. --- src/mktl/protocol/message.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index b6aec67d..1838c974 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -19,6 +19,11 @@ version = b'a' +# Define any/all optional flags for message handling. + +NO_ACK = 0b1 +NO_REPLY = 0b10 + # The cached origin information is used by the Payload class to (optionally) # provide information used to determine the origin of a message. The call to # os.getlogin() appears to be more expensive than the others. For that reason @@ -77,7 +82,7 @@ class Message: valid_types = set(('ACK', 'REP')) - def __init__(self, type, target=None, payload=None, id=None): + def __init__(self, type, target=None, payload=None, id=None, flags=None): if type in self.valid_types: pass @@ -88,6 +93,7 @@ def __init__(self, type, target=None, payload=None, id=None): # for example, publish messages do not have or need an identification # number or a prefix. + self.flags = flags self.id = id self.type = type self.payload = payload @@ -118,11 +124,17 @@ def _finalize(self): # Once finalized, always finalized. return + flags = self.flags id = self.id type = self.type target = self.target payload = self.payload + if flags: + flags = flags.to_bytes(byteorder='big') + else: + flags = b'\x00' + # It is legal to create a Message with None as the id-- this happens # all the time when a Message is used as a container-- but trying to # send such a message is not permitted. @@ -157,9 +169,9 @@ def _finalize(self): payload = payload.encapsulate() if self.prefix: - parts = self.prefix + (version, id, type, target, payload, bulk) + parts = self.prefix + (version, id, flags, type, target, payload, bulk) else: - parts = (version, id, type, target, payload, bulk) + parts = (version, id, flags, type, target, payload, bulk) self._parts = parts @@ -197,16 +209,10 @@ def log(self, logger=None, level=logging.DEBUG): @property def reply(self): - """ The payload reply attribute is mirrored here for the sake of - simplifying exception handling elsewhere in the mKTL code base. - Otherwise, the other code would need to catch the AttributeError - thrown when the local payload is None. - """ - try: - return self.payload.reply - except AttributeError: - # There is no payload, and message replies are enabled by default. + if self.flags and self.flags & NO_REPLY: + return False + else: return True From 493d44066f86a47159c004da3926f868e4ff70a4 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 22 Apr 2026 14:09:45 -1000 Subject: [PATCH 21/35] A couple extra touches to handle the optional 'flags' argument. --- src/mktl/protocol/message.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index 849cf311..99af7655 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -220,8 +220,8 @@ def reconstruct(cls, parts): quantity = len(parts) - if quantity != 5 and quantity != 6: - raise ValueError("expected 5 or 6 parts, got %d" % (quantity)) + if quantity != 6 and quantity != 7: + raise ValueError("expected 6 or 7 parts, got %d" % (quantity)) their_version = parts[0] @@ -229,15 +229,17 @@ def reconstruct(cls, parts): raise ValueError("version mismatch: expected %s, got %s" % (repr(version), repr(their_version))) message_id = parts[1] - message_type = parts[2] - target = parts[3] - payload = parts[4] + message_flags = parts[2] + message_type = parts[3] + target = parts[4] + payload = parts[5] try: - bulk = parts[5] + bulk = parts[6] except IndexError: bulk = None + message_flags = int.from_bytes(message_flags, byteorder='big') message_type = message_type.decode() target = target.decode() @@ -252,7 +254,7 @@ def reconstruct(cls, parts): # allow it to pass, assuming the users know what they're doing. pass - message = cls(message_type, target, payload, message_id) + message = cls(message_type, target, payload, message_id, message_flags) return message @@ -367,7 +369,7 @@ class Request(Message): valid_types = set(('CONFIG', 'GET', 'HASH', 'SET')) - def __init__(self, type, target=None, payload=None, id=None): + def __init__(self, type, target=None, payload=None, id=None, flags=None): # Requests are generally initiated without an id number, but they're # required to have one. The expectation is that requests will have an From 45a6ce4cef29966c69f647478e27606abb157ddc Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 22 Apr 2026 14:40:40 -1000 Subject: [PATCH 22/35] Add a Message.ack property to return the 'no ack' status the same way as 'no reply' is handled. --- src/mktl/protocol/message.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index 99af7655..84904787 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -114,6 +114,15 @@ def __repr__(self): return repr(self._parts) + @property + def ack(self): + + if self.flags and self.flags & NO_ACK: + return False + else: + return True + + def _finalize(self): """ Take the contents of this :class:`Message`, interpet them as bytes, and prepare the tuple that will be used for the multipart From 93f55f5de5f11116ad70c536267c2e7a80f53f89 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 22 Apr 2026 14:41:58 -1000 Subject: [PATCH 23/35] Remove 'reply' awareness from the Payload. --- src/mktl/protocol/message.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index 84904787..723138de 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -583,30 +583,6 @@ def encapsulate(self): return payload - @property - def reply(self): - """ The reply attribute is generally only set to indicate that a - reply is not necessary. Establishing a property to return the - current value allows the exception handling to be done once, - here, and not everywhere the reply attribute might be inspected. - By a happy coincidence, the existence of this property does not - trigger the inclusion of 'reply' in the output of vars(), which - is how the :func:`encapsulate` method determines which local - attributes to include in the final output. - """ - - try: - return self.__reply - except AttributeError: - # Message replies are enabled by default. - return True - - - @reply.setter - def reply(self, new_value): - self.__reply = new_value - - # end of class Payload From 8c67d0433b81017ca7ab8676445e7e3800a1873d Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 22 Apr 2026 15:13:07 -1000 Subject: [PATCH 24/35] Request needs to pass on the flags to the parent init. --- src/mktl/protocol/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index 723138de..5e9aea6d 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -393,7 +393,7 @@ def __init__(self, type, target=None, payload=None, id=None, flags=None): if id is None: id = _id_next() - Message.__init__(self, type, target, payload, id) + Message.__init__(self, type, target, payload, id, flags) self.response = None From dec9ab13f12aeceeb3731d400ddc8dfd053d4aed Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 22 Apr 2026 15:14:02 -1000 Subject: [PATCH 25/35] Check request.ack instead of request.reply on whether to ACK. --- src/mktl/protocol/request.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/mktl/protocol/request.py b/src/mktl/protocol/request.py index bfd8fae3..23d0535e 100644 --- a/src/mktl/protocol/request.py +++ b/src/mktl/protocol/request.py @@ -182,7 +182,7 @@ def send(self, message): self.requests.put(message) self.request_signal.send(b'') - if message.reply: + if message.ack: pass else: return @@ -347,13 +347,15 @@ def req_handler(self, request): structure of what's happening in the daemon code. """ + + if request.ack: + self.req_ack(request) + if request.reply: pass else: return - self.req_ack(request) - response = message.Message('REP', target, id=request.id) response.prefix = request.prefix From 457e622cf2f892405b462ce2f9dddcde30328bc0 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 22 Apr 2026 15:14:38 -1000 Subject: [PATCH 26/35] Use message envelope flags instead of the payload to indicate no-ack and no-reply. --- src/mktl/item.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/mktl/item.py b/src/mktl/item.py index a9f26d17..8c8d3168 100644 --- a/src/mktl/item.py +++ b/src/mktl/item.py @@ -701,14 +701,17 @@ def set(self, new_value, wait=True, reply=True, formatted=False, quantity=False) else: raise ValueError('formatted+quantity arguments must be boolean') + payload = self.to_payload(new_value) + if reply: - payload = self.to_payload(new_value) + flags = None payload.add_origin() else: - payload = self.to_payload(new_value, reply=False) + flags = protocol.message.NO_ACK | protocol.message.NO_REPLY wait = False - message = protocol.message.Request('SET', self.full_key, payload) + key = self.full_key + message = protocol.message.Request('SET', key, payload, flags=flags) self.req.send(message) if wait == False: From 43c887dfb0c4f2cfbfae68bb32b1a95e0f196c82 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Wed, 22 Apr 2026 15:15:16 -1000 Subject: [PATCH 27/35] Check request.ack and request.reply separately. --- src/mktl/daemon.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/mktl/daemon.py b/src/mktl/daemon.py index 3bef0dbf..ecd8b54d 100644 --- a/src/mktl/daemon.py +++ b/src/mktl/daemon.py @@ -492,9 +492,7 @@ def req_handler(self, request): method to handle that specific request. """ - reply = request.reply - - if reply: + if request.ack: self.req_ack(request) type = request.type @@ -514,7 +512,7 @@ def req_handler(self, request): else: raise ValueError('unhandled request type: ' + type) - if reply: + if request.reply: return response else: return None From e8f0c161e8a8091c576d1efed700c661245aaf7a Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 08:22:41 -1000 Subject: [PATCH 28/35] Define a combined flag to request no ACK or REP response. --- src/mktl/item.py | 2 +- src/mktl/protocol/message.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mktl/item.py b/src/mktl/item.py index 8c8d3168..5bf3f333 100644 --- a/src/mktl/item.py +++ b/src/mktl/item.py @@ -707,7 +707,7 @@ def set(self, new_value, wait=True, reply=True, formatted=False, quantity=False) flags = None payload.add_origin() else: - flags = protocol.message.NO_ACK | protocol.message.NO_REPLY + flags = protocol.message.NO_ACK_OR_REP wait = False key = self.full_key diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index 5e9aea6d..e64b74d3 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -23,6 +23,7 @@ NO_ACK = 0b1 NO_REPLY = 0b10 +NO_ACK_OR_REP = NO_ACK | NO_REPLY # The cached origin information is used by the Payload class to (optionally) # provide information used to determine the origin of a message. The call to From 13bd055a2654d0b210ce05fd5449f7979795f2d6 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 08:23:49 -1000 Subject: [PATCH 29/35] Trade out 'NO_REPLY' for 'NO_REP'. --- src/mktl/protocol/message.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index e64b74d3..ce9b79f3 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -22,8 +22,8 @@ # Define any/all optional flags for message handling. NO_ACK = 0b1 -NO_REPLY = 0b10 -NO_ACK_OR_REP = NO_ACK | NO_REPLY +NO_REP = 0b10 +NO_ACK_OR_REP = NO_ACK | NO_REP # The cached origin information is used by the Payload class to (optionally) # provide information used to determine the origin of a message. The call to @@ -271,7 +271,7 @@ def reconstruct(cls, parts): @property def reply(self): - if self.flags and self.flags & NO_REPLY: + if self.flags and self.flags & NO_REP: return False else: return True From 153c1bf3ea30ca81417d80fd284c70797d459bb6 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 09:54:09 -1000 Subject: [PATCH 30/35] Rearrange the ordering of the parts to put 'flags' later in the sequence. --- src/mktl/protocol/message.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index ce9b79f3..0f84ce6b 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -140,11 +140,6 @@ def _finalize(self): target = self.target payload = self.payload - if flags: - flags = flags.to_bytes(byteorder='big') - else: - flags = b'\x00' - # It is legal to create a Message with None as the id-- this happens # all the time when a Message is used as a container-- but trying to # send such a message is not permitted. @@ -169,6 +164,11 @@ def _finalize(self): # Assume it is already bytes. pass + if flags: + flags = flags.to_bytes(byteorder='big') + else: + flags = b'\x00' + if payload is None or payload == '': bulk = None payload = b'' @@ -180,9 +180,9 @@ def _finalize(self): # be represented as None, distinct from being an empty byte sequence. if bulk is None: - parts = (version, id, flags, type, target, payload) + parts = (version, id, type, target, flags, payload) else: - parts = (version, id, flags, type, target, payload, bulk) + parts = (version, id, type, target, flags, payload, bulk) if self.prefix: parts = self.prefix + parts @@ -238,10 +238,10 @@ def reconstruct(cls, parts): if their_version != version: raise ValueError("version mismatch: expected %s, got %s" % (repr(version), repr(their_version))) - message_id = parts[1] - message_flags = parts[2] - message_type = parts[3] - target = parts[4] + id = parts[1] + type = parts[2] + target = parts[3] + flags = parts[4] payload = parts[5] try: @@ -249,8 +249,8 @@ def reconstruct(cls, parts): except IndexError: bulk = None - message_flags = int.from_bytes(message_flags, byteorder='big') - message_type = message_type.decode() + flags = int.from_bytes(flags, byteorder='big') + type = type.decode() target = target.decode() if payload == b'': @@ -264,7 +264,7 @@ def reconstruct(cls, parts): # allow it to pass, assuming the users know what they're doing. pass - message = cls(message_type, target, payload, message_id, message_flags) + message = cls(type, target, payload, id, flags) return message From dab7acf806d6eaa2cfe122f709a5f48641170f06 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 10:25:24 -1000 Subject: [PATCH 31/35] Add documentation of the 'flags' message component. --- doc/protocol.rst | 67 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/doc/protocol.rst b/doc/protocol.rst index e9d91c57..33167823 100644 --- a/doc/protocol.rst +++ b/doc/protocol.rst @@ -52,12 +52,25 @@ implementation in ZeroMQ, which enforces a strict one request, one response pattern; instead, we use DEALER/ROUTER, which allows any amount of messages in any order, in any direction. +Upon receipt of a request the daemon will immediately issue an ACK response. +The absence of a quick response indicates that the daemon is not available, +and the client should immediately raise an error. After the client receives +the initial ACK it should then look for the full response. There will be no +further messages associated with that id number after the full response is +received. A daemon may choose to forego the ACK response, but should only +do so in circumstances where processing a request requires zero additional +processing time. + +All requests are handled +fully asynchronously; a client could send a thousand requests in quick +succession, but the responses will not be serialized, and the response order +is not guaranteed. Synchronous behavior, if desired, is implemented by client +code and not in the protocol itself. + The request/response interaction between the client and daemon is a multipart -message, where each part is required and has specific meaning. The reference -implementation provides a :class:`mktl.protocol.message.Message` class to -minimize the amount of code that has to be aware about the on-the-wire message -structure. For both ends of the request/response exchange, the message parts -are: +message, where each part is required and has specific meaning. +The message parts are identical for both ends of the request/response +exchange; the message parts are: .. list-table:: @@ -90,6 +103,26 @@ are: is a store or a key, depending on the request; this field will be an empty byte sequence if the target is not specified. + * - **flags** + - A big-endian integer representing boolean flags that modify how this + message is handled. The default value is an integer zero; if this field + is transmitted as an empty byte sequence it must be interpreted as the + integer zero. Each bit in the integer has a specific meaning: + + .. list-table:: + + * - *Bit* + - *Name* + - *Meaning* + + * - 0b0001 + - NO_ACK + - The ACK response to this request should be suppressed. + + * - 0b0010 + - NO_REP + - The REP response to this request should be suppressed. + * - **payload** - The message payload. This is the JSON representation of any additional data required as part of this exchange; if setting a new value, it would @@ -105,21 +138,6 @@ are: to interpret the buffer. This field will be omitted entirely if there is no bulk component. -Upon receipt of a request the daemon will immediately issue an ACK response. -The absence of a quick response indicates that the daemon is not available, -and the client should immediately raise an error. After the client receives -the initial ACK it should then look for the full response. There will be no -further messages associated with that id number after the full response is -received. A daemon may choose to forego the ACK response, but should only -do so in circumstances where processing a request requires zero additional -processing time. - -All requests are handled -fully asynchronously; a client could send a thousand requests in quick -succession, but the responses will not be serialized, and the response order -is not guaranteed. Synchronous behavior, if desired, is implemented by client -code and not in the protocol itself. - Here is an example of what the full exchange on the client side might look like, in this case handling the exchange as a synchronous request:: @@ -137,12 +155,13 @@ like, in this case handling the exchange as a synchronous request:: response = self.socket.recv_multipart() Here is a representation of what the on-the-wire messages might look like -for the simple exchange outlined above:: +for a simple GET request:: b'a' b'00000023' b'GET' b'kpfguide.LASTFILENAME' + b'\x00' b'' b'a' @@ -150,13 +169,19 @@ for the simple exchange outlined above:: b'ACK' b'' b'' + b'' b'a' b'00000023' b'REP' b'' + b'' b'{"value": /sdata1701/kpf1/2025-06-23/image_672.fits', "time": 234.23}' +The reference implementation provides a :class:`mktl.protocol.message.Message` +class to minimize the amount of code that has to be aware about the on-the-wire +message structure. + .. _message_types: From f52af248e5fe50ca2dc2d2194d12a721ad07e2ba Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 10:36:27 -1000 Subject: [PATCH 32/35] Allow transmission of the flags as the empty byte string, equivalent to the integer zero. --- src/mktl/protocol/message.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/mktl/protocol/message.py b/src/mktl/protocol/message.py index 0f84ce6b..19015f23 100644 --- a/src/mktl/protocol/message.py +++ b/src/mktl/protocol/message.py @@ -167,7 +167,7 @@ def _finalize(self): if flags: flags = flags.to_bytes(byteorder='big') else: - flags = b'\x00' + flags = b'' if payload is None or payload == '': bulk = None @@ -249,10 +249,14 @@ def reconstruct(cls, parts): except IndexError: bulk = None - flags = int.from_bytes(flags, byteorder='big') type = type.decode() target = target.decode() + if flags == b'': + flags = None + else: + flags = int.from_bytes(flags, byteorder='big') + if payload == b'': payload = None else: From fe7361a1b14dec728ccb066bd1c98019b5016e6d Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 11:22:07 -1000 Subject: [PATCH 33/35] Touching up the request/response section. --- doc/protocol.rst | 58 +++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/doc/protocol.rst b/doc/protocol.rst index 33167823..e3b18a61 100644 --- a/doc/protocol.rst +++ b/doc/protocol.rst @@ -66,11 +66,26 @@ fully asynchronously; a client could send a thousand requests in quick succession, but the responses will not be serialized, and the response order is not guaranteed. Synchronous behavior, if desired, is implemented by client code and not in the protocol itself. +Here is an example of what the full exchange on the client side might look +like, in this case handling the exchange as a synchronous request:: + + self.socket = zmq_context.socket(zmq.DEALER) + self.socket.setsockopt(zmq.LINGER, 0) + self.socket.identity = identity.encode() + self.socket.connect(daemon) + + self.socket.send_multipart(request) + result = self.socket.poll(100) # milliseconds + if result == 0: + raise TimeoutError('no response received in 100 ms') + + ack = self.socket.recv_multipart() + response = self.socket.recv_multipart() The request/response interaction between the client and daemon is a multipart -message, where each part is required and has specific meaning. +message, where each component of the message has a specific meaning. The message parts are identical for both ends of the request/response -exchange; the message parts are: +exchange. .. list-table:: @@ -80,6 +95,7 @@ exchange; the message parts are: * - **version** - A single ASCII character indicating the mKTL protocol version number. The initial release of the mKTL protocol uses the version character 'a'. + The version identifier is always present. * - **identifier** - A unique identifier for the request. The format of this identifier is @@ -89,17 +105,18 @@ exchange; the message parts are: to the original request. Note that this identifier does not necessarily have significance on the daemon side, daemons will use their own internal scheme to uniquely identify requests, but the response will always include - this original identifier. + this original identifier. The request identifier is always present. * - **type** - The message type. This is a short string of characters that identifies what type of request, or response, this message represents. It is one - of the values described in the :ref:`message_types` section below. + of the values described in the :ref:`message_types` section below. The + message type is always present. * - **target** - The target for this request/response, if any. Not all requests have a - target; responses don't need to specify it, since it is the identification - number that ties a response to its request. If a target is specified it + target; responses don't need to specify it, since the identifier field + associates a response with a request. If a target is specified it is a store or a key, depending on the request; this field will be an empty byte sequence if the target is not specified. @@ -117,42 +134,27 @@ exchange; the message parts are: * - 0b0001 - NO_ACK - - The ACK response to this request should be suppressed. + - Suppress the ACK response to this request. * - 0b0010 - NO_REP - - The REP response to this request should be suppressed. + - Suppress the REP response to this request. * - **payload** - The message payload. This is the JSON representation of any additional data required as part of this exchange; if setting a new value, it would contain the value; if it is a response containing additional information - it would go here. This field will be an empty byte sequence if no - additional information is required. See the :ref:`message_payload` section - for a more complete description of the payload contents. + it would go here. See the :ref:`message_payload` section + for a more complete description of the payload contents. This field will + be an empty byte sequence if there is no payload. * - **bulk** - A bulk byte sequence, typically a component of the payload. This is to allow the transmission of information like image data, where the bulk bytes represent the image buffer, and the JSON payload describes how to interpret the buffer. This field will be omitted entirely if there - is no bulk component. - -Here is an example of what the full exchange on the client side might look -like, in this case handling the exchange as a synchronous request:: - - self.socket = zmq_context.socket(zmq.DEALER) - self.socket.setsockopt(zmq.LINGER, 0) - self.socket.identity = identity.encode() - self.socket.connect(daemon) - - self.socket.send_multipart(request) - result = self.socket.poll(100) # milliseconds - if result == 0: - raise zmq.ZMQError('no response received in 100 ms') - - ack = self.socket.recv_multipart() - response = self.socket.recv_multipart() + is no bulk component, which allows a recipient to distinguish between + an empty byte sequence and the complete absence of data. Here is a representation of what the on-the-wire messages might look like for a simple GET request:: From f77eca4f2ecd13efcb26fd7186fea840221c50c7 Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 11:29:34 -1000 Subject: [PATCH 34/35] Use request.ack to decide whether to issue an ACK response. --- sbin/mkbrokerd | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sbin/mkbrokerd b/sbin/mkbrokerd index ba161d50..212d4f8d 100755 --- a/sbin/mkbrokerd +++ b/sbin/mkbrokerd @@ -400,13 +400,14 @@ class RequestServer(mktl.protocol.request.Server): will be generated. """ + if request.ack: + self.req_ack(request) + if request.reply: pass else: return - self.req_ack(request) - type = request.type target = request.target From c8230fbcca6856787a1a2889ec6a4ce27d2e464f Mon Sep 17 00:00:00 2001 From: Kyle Lanclos Date: Thu, 23 Apr 2026 11:31:38 -1000 Subject: [PATCH 35/35] Execute the request if no reply is requested, just don't reply. --- sbin/mkbrokerd | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sbin/mkbrokerd b/sbin/mkbrokerd index 212d4f8d..f74bea1c 100755 --- a/sbin/mkbrokerd +++ b/sbin/mkbrokerd @@ -403,11 +403,6 @@ class RequestServer(mktl.protocol.request.Server): if request.ack: self.req_ack(request) - if request.reply: - pass - else: - return - type = request.type target = request.target @@ -421,7 +416,8 @@ class RequestServer(mktl.protocol.request.Server): else: raise ValueError('invalid request type: ' + type) - return payload + if request.reply: + return payload def req_hash(self, request):