Skip to content

Commit c3210ce

Browse files
committed
Refactor HTTP2->GRPC forwarding for clarity
1 parent a64b7e0 commit c3210ce

File tree

1 file changed

+42
-50
lines changed

1 file changed

+42
-50
lines changed

typedb/localstack_typedb/utils/h2_proxy.py

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -63,64 +63,56 @@ def apply_http2_patches_for_grpc_support(
6363
Note: this is a very brute-force approach and needs to be fixed/enhanced over time!
6464
"""
6565

66+
class ForwardingBuffer:
67+
"""
68+
A buffer atop the HTTP2 client connection, that will hold
69+
data until the ProxyRequestMatcher tells us whether to send it
70+
to the backend, or leave it to the default handler.
71+
"""
72+
def __init__(self, http_response_stream):
73+
self.http_response_stream = http_response_stream
74+
LOG.debug(f"Starting TCP forwarder to port {target_port} for new HTTP2 connection")
75+
self.backend = TcpForwarder(target_port, host=target_host)
76+
self.buffer = []
77+
self.proxying = False
78+
reactor.getThreadPool().callInThread(self.backend.receive_loop, self.received_from_backend)
79+
80+
def received_from_backend(self, data):
81+
LOG.debug(f"Received {len(data)} bytes from backend")
82+
self.http_response_stream.write(data)
83+
84+
def received_from_http2_client(self, data, default_handler):
85+
if self.proxying:
86+
assert not self.buffer
87+
# Keep sending data to the backend for the lifetime of this connection
88+
self.backend.send(data)
89+
else:
90+
self.buffer.append(data)
91+
if headers := get_headers_from_data_stream(self.buffer):
92+
self.proxying = request_matcher.should_proxy_request(headers)
93+
# Now we know what to do with the buffer
94+
buffered_data = b"".join(self.buffer)
95+
self.buffer = []
96+
if self.proxying:
97+
LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend")
98+
self.backend.send(buffered_data)
99+
else:
100+
return default_handler(buffered_data)
101+
102+
def close(self):
103+
self.backend.close()
104+
66105
@patch(H2Connection.connectionMade)
67106
def _connectionMade(fn, self, *args, **kwargs):
68-
def _process(data):
69-
LOG.debug("Received data (%s bytes) from upstream HTTP2 server", len(data))
70-
self.transport.write(data)
71-
72-
# TODO: make port configurable
73-
self._ls_forwarder = TcpForwarder(target_port, host=target_host)
74-
LOG.debug(
75-
"Starting TCP forwarder to port %s for new HTTP2 connection", target_port
76-
)
77-
reactor.getThreadPool().callInThread(self._ls_forwarder.receive_loop, _process)
107+
self._ls_forwarding_buffer = ForwardingBuffer(self.transport)
78108

79109
@patch(H2Connection.dataReceived)
80110
def _dataReceived(fn, self, data, *args, **kwargs):
81-
forwarder = getattr(self, "_ls_forwarder", None)
82-
should_proxy_request = getattr(self, "_ls_should_proxy_request", None)
83-
if not forwarder or should_proxy_request is False:
84-
return fn(self, data, *args, **kwargs)
85-
86-
if should_proxy_request:
87-
forwarder.send(data)
88-
return
89-
90-
setattr(self, "_data_received", getattr(self, "_data_received", []))
91-
self._data_received.append(data)
92-
93-
# parse headers from request frames received so far
94-
headers = get_headers_from_data_stream(self._data_received)
95-
if not headers:
96-
# if no headers received yet, then return (method will be called again for next chunk of data)
97-
return
98-
99-
# check if the incoming request should be proxies, based on the request headers
100-
self._ls_should_proxy_request = request_matcher.should_proxy_request(headers)
101-
102-
if not self._ls_should_proxy_request:
103-
# if this is not a target request, then call the upstream function
104-
result = None
105-
for chunk in self._data_received:
106-
result = fn(self, chunk, *args, **kwargs)
107-
self._data_received = []
108-
return result
109-
110-
# forward data chunks to the target
111-
for chunk in self._data_received:
112-
LOG.debug(
113-
"Forwarding data (%s bytes) from HTTP2 client to server", len(chunk)
114-
)
115-
forwarder.send(chunk)
116-
self._data_received = []
111+
self._ls_forwarding_buffer.received_from_http2_client(data, lambda d: fn(d, *args, **kwargs))
117112

118113
@patch(H2Connection.connectionLost)
119114
def connectionLost(fn, self, *args, **kwargs):
120-
forwarder = getattr(self, "_ls_forwarder", None)
121-
if not forwarder:
122-
return fn(self, *args, **kwargs)
123-
forwarder.close()
115+
self._ls_forwarding_buffer.close()
124116

125117

126118
def get_headers_from_data_stream(data_list: Iterable[bytes]) -> Headers:

0 commit comments

Comments
 (0)