Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions bumble/transport/android_netsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,26 @@ async def open_android_netsim_controller_transport(
logger.warning("unable to publish gRPC port")

class HciDevice:
def __init__(self, context, on_data_received):
def __init__(self, context, server):
self.context = context
self.on_data_received = on_data_received
self.server = server
self.name = None
self.sink = None
self.loop = asyncio.get_running_loop()
self.done = self.loop.create_future()
self.task = self.loop.create_task(self.pump())

async def pump(self):
try:
await self.pump_loop()
except asyncio.CancelledError:
logger.debug('Pump task canceled')
if not self.done.done():
self.done.set_result(None)
finally:
if self.sink:
logger.debug('Releasing sink')
self.server.release_sink()
self.sink = None

logger.debug('Pump task terminated')

async def pump_loop(self):
while True:
Expand All @@ -186,15 +191,26 @@ async def pump_loop(self):
if request.WhichOneof('request_type') == 'initial_info':
logger.debug(f'Received initial info: {request}')

self.name = request.initial_info.name

# We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type')
error = PacketResponse(error='Unsupported chip type')
await self.context.write(error)
return
# return
continue

# Lease the sink so that no other device can send
self.sink = self.server.lease_sink(self)
if self.sink is None:
logger.warning('Another device is already connected')
error = PacketResponse(error='Device busy')
await self.context.write(error)
# return
continue

self.name = request.initial_info.name
continue
continue

# Expect a data packet
request_type = request.WhichOneof('request_type')
Expand All @@ -205,10 +221,10 @@ async def pump_loop(self):
continue

# Process the packet
data = (
assert self.sink is not None
self.sink(
bytes([request.hci_packet.packet_type]) + request.hci_packet.packet
)
self.on_data_received(data)

async def send_packet(self, data):
return await self.context.write(
Expand All @@ -217,12 +233,6 @@ async def send_packet(self, data):
)
)

def terminate(self):
self.task.cancel()

async def wait_for_termination(self):
await self.done

server_address = f'{server_host}:{server_port}'

class Server(PacketStreamerServicer, ParserSource):
Expand Down Expand Up @@ -258,27 +268,27 @@ async def send_packet(self, packet):

return await self.device.send_packet(packet)

async def StreamPackets(self, _request_iterator, context):
logger.debug('StreamPackets request')

# Check that we don't already have a device
def lease_sink(self, device):
if self.device:
logger.debug('Busy, already serving a device')
return PacketResponse(error='Busy')
return None
self.device = device
return self.parser.feed_data

def release_sink(self):
self.device = None

async def StreamPackets(self, request_iterator, context):
logger.debug('StreamPackets request')

# Instantiate a new device
self.device = HciDevice(context, self.parser.feed_data)
device = HciDevice(context, self)

# Wait for the device to terminate
logger.debug('Waiting for device to terminate')
# Pump packets to/from the device
logger.debug('Pumping device packets')
try:
await self.device.wait_for_termination()
except asyncio.CancelledError:
logger.debug('Request canceled')
self.device.terminate()

logger.debug('Device terminated')
self.device = None
await device.pump()
finally:
logger.debug('Pump terminated')

server = Server()
await server.start()
Expand Down
Loading