From 302e496890e90678fcd593f2af23f9a939833cae Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Sat, 13 Dec 2025 16:27:04 -0800 Subject: [PATCH] android-netsim transport enhancements --- bumble/transport/android_netsim.py | 74 +++++++++++++++++------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/bumble/transport/android_netsim.py b/bumble/transport/android_netsim.py index 35653806..4a6a210c 100644 --- a/bumble/transport/android_netsim.py +++ b/bumble/transport/android_netsim.py @@ -156,21 +156,26 @@ async def open_android_netsim_controller_transport( logger.warning("unable to publish gRPC port") class HciDevice: - def __init__(self, context, on_data_received): + def __init__(self, context, server): self.context = context - self.on_data_received = on_data_received + self.server = server self.name = None + self.sink = None self.loop = asyncio.get_running_loop() self.done = self.loop.create_future() - self.task = self.loop.create_task(self.pump()) async def pump(self): try: await self.pump_loop() except asyncio.CancelledError: logger.debug('Pump task canceled') - if not self.done.done(): - self.done.set_result(None) + finally: + if self.sink: + logger.debug('Releasing sink') + self.server.release_sink() + self.sink = None + + logger.debug('Pump task terminated') async def pump_loop(self): while True: @@ -186,15 +191,26 @@ async def pump_loop(self): if request.WhichOneof('request_type') == 'initial_info': logger.debug(f'Received initial info: {request}') + self.name = request.initial_info.name + # We only accept BLUETOOTH if request.initial_info.chip.kind != ChipKind.BLUETOOTH: logger.warning('Unsupported chip type') error = PacketResponse(error='Unsupported chip type') await self.context.write(error) - return + # return + continue + + # Lease the sink so that no other device can send + self.sink = self.server.lease_sink(self) + if self.sink is None: + logger.warning('Another device is already connected') + error = PacketResponse(error='Device busy') + await self.context.write(error) + # return + continue - self.name = request.initial_info.name - continue + continue # Expect a data packet request_type = request.WhichOneof('request_type') @@ -205,10 +221,10 @@ async def pump_loop(self): continue # Process the packet - data = ( + assert self.sink is not None + self.sink( bytes([request.hci_packet.packet_type]) + request.hci_packet.packet ) - self.on_data_received(data) async def send_packet(self, data): return await self.context.write( @@ -217,12 +233,6 @@ async def send_packet(self, data): ) ) - def terminate(self): - self.task.cancel() - - async def wait_for_termination(self): - await self.done - server_address = f'{server_host}:{server_port}' class Server(PacketStreamerServicer, ParserSource): @@ -258,27 +268,27 @@ async def send_packet(self, packet): return await self.device.send_packet(packet) - async def StreamPackets(self, _request_iterator, context): - logger.debug('StreamPackets request') - - # Check that we don't already have a device + def lease_sink(self, device): if self.device: - logger.debug('Busy, already serving a device') - return PacketResponse(error='Busy') + return None + self.device = device + return self.parser.feed_data + + def release_sink(self): + self.device = None + + async def StreamPackets(self, request_iterator, context): + logger.debug('StreamPackets request') # Instantiate a new device - self.device = HciDevice(context, self.parser.feed_data) + device = HciDevice(context, self) - # Wait for the device to terminate - logger.debug('Waiting for device to terminate') + # Pump packets to/from the device + logger.debug('Pumping device packets') try: - await self.device.wait_for_termination() - except asyncio.CancelledError: - logger.debug('Request canceled') - self.device.terminate() - - logger.debug('Device terminated') - self.device = None + await device.pump() + finally: + logger.debug('Pump terminated') server = Server() await server.start()