Skip to content

Conversation

@Kaldie
Copy link

@Kaldie Kaldie commented Jul 6, 2021

Solves the issue reported in #172
Another solution was to bound the queue, however that would be very unbalanced if the size writes were not equal. The downside of this that the write needs to have a len(). However that

@Kaldie
Copy link
Author

Kaldie commented Jul 13, 2021

@mtth can you have a look?

Copy link
Owner

@mtth mtth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the late review @Kaldie; thanks for taking a stab at this. I mostly have some concerns about the locking scheme, please have a look at the inline comments.

if buffersize is None:
return AsyncWriter(consumer)
else:
return BoundedAsyncWriter(consumer, buffer_size=buffersize)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffersize is an HDFS argument, better not to overload its use here. Let's introduce a separate argument instead.

self.exception = kwargs.get("exception")


def wrapped_consumer(asyncWriter, data):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this private and use Python naming conventions (recommend shortening asyncWriter to writer).



def wrapped_consumer(asyncWriter, data):
"""Wrapped consumer that lets us get a child's exception."""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated now that the function is top-level.

Comment on lines +189 to +190
except RuntimeError:
pass
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big code smell. Is there a way to avoid this? Locks are best used via with, perhaps it can help us here.

"""A Bounded asynchronous publisher-consumer.
:param consumer: Function which takes a single generator as argument.
:param buffer_size: Number of entities that are buffered. When this number is exeeded,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stick to 80 chars per line throughout.

_logger.debug('Child terminated without errors.')
self._queue = None

def write(self, chunk):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much of the logic in this class is identical to the original writer, is there a way to consolidate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants