Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@ producers/consumers.
Pipes simply have an input and an output channel; creating and using them is
pretty straightforward:

.. code:: python

import asyncio

from pipekit.pipe import QueuePipe

async def main():
mypipe = QueuePipe('my-pipe')
await mypipe.start()
await mypipe.send('Hello world')
print(await mypipe.receive())

asyncio.run(main())
# Hello world

The current examples are still being worked on:



.. code:: python

from pipekit import ThreadPipe
Expand Down
10 changes: 8 additions & 2 deletions pipekit/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, *args, id=None, workflow=None, parent=None, logger=_l, **kwar
self._event_lock = set()
self._debug = {'events'}

self._settings = Box(self.configure(*args, **kwargs) or dict())
self._settings = Box(self.configure(**kwargs) or dict())
if not workflow:
workflow = self
settings = [f'{k}={v}' for k, v in workflow.safe_settings(self._settings).items()]
Expand All @@ -52,6 +52,9 @@ def configure(self, **settings):
def settings(self, **override):
return Box(self._settings, **override)

def safe_settings(self, settings):
return settings

@property
def type(self):
return type(self).__name__
Expand Down Expand Up @@ -175,7 +178,10 @@ def __getattr__(self, name):

def _proxied_logging_method(self, method, *args, **kwargs):
if method == 'debug':
debug = (self.workflow or self).settings().logging.debug
if logging in (self.workflow or self).settings():
debug = (self.workflow or self).settings().logging.debug
else:
debug = []
if not ('all' in debug or self.type in debug or (self.id in debug)):
return lambda *a, **kw: None

Expand Down
3 changes: 2 additions & 1 deletion pipekit/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ async def receiver(self):


class QueuePipe(Pipe):
def configure(self, queue=None, maxsize=1):
def configure(self, queue=None, maxsize=1, **kwargs):
self._queue = queue or asyncio.Queue(maxsize=maxsize)
return kwargs

async def send(self, *args, **kwargs):
return await self._queue.put(*args, **kwargs)
Expand Down