Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions cantok/tokens/abstract/abstract_token.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from abc import ABC, abstractmethod
from threading import RLock
from typing import Any, Awaitable, Dict, List, Optional, Union
from time import sleep
from typing import Any, Dict, List, Optional, Union

from cantok.errors import CancellationError
from cantok.tokens.abstract.cancel_cause import CancelCause
from cantok.tokens.abstract.coroutine_wrapper import WaitCoroutineWrapper
from cantok.tokens.abstract.report import CancellationReport


Expand Down Expand Up @@ -156,22 +156,18 @@ def wait(
self,
step: Union[int, float] = 0.0001,
timeout: Optional[Union[int, float]] = None,
) -> Awaitable: # type: ignore[type-arg]
) -> None:
"""
Waits until the token is cancelled.

When used with ``await``, runs non-blocking inside an asyncio event loop.
When called without ``await``, blocks the current thread.
Blocks the current thread until the token is cancelled.

:param step: Interval between status checks, in seconds. Defaults to 0.0001.
:param timeout: Maximum time to wait, in seconds. If exceeded,
raises TimeoutCancellationError. Defaults to None (no limit).

>>> import asyncio
>>>
>>> token = TimeoutToken(5)
>>> token.wait() # blocks for ~5 seconds, then returns
>>> asyncio.run(TimeoutToken(5).wait()) # non-blocking, inside an asyncio event loop
"""
if step < 0:
raise ValueError(
Expand All @@ -193,7 +189,12 @@ def wait(

token = TimeoutToken(timeout)

return WaitCoroutineWrapper(step, self + token, token)
token_for_wait = self + token

while token_for_wait:
sleep(step)

token.check()

def cancel(self) -> 'AbstractToken':
"""
Expand Down
67 changes: 0 additions & 67 deletions cantok/tokens/abstract/coroutine_wrapper.py

This file was deleted.

13 changes: 13 additions & 0 deletions docs/what_are_tokens/exceptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ token.check()
#> cantok.errors.TimeoutCancellationError: The timeout of 1 second has expired.
```

The `wait()` method can also raise an exception directly when its own waiting timeout expires:

```python
from cantok import SimpleToken, TimeoutCancellationError

token = SimpleToken()

try:
token.wait(timeout=1)
except TimeoutCancellationError:
print('Waiting took too long.')
```

Each type of token (except [`DefaultToken`](../types_of_tokens/DefaultToken.md)) has a corresponding type of exception that can be raised in this case:

- [`SimpleToken`](../types_of_tokens/SimpleToken.md) -> `CancellationError`
Expand Down
24 changes: 12 additions & 12 deletions docs/what_are_tokens/waiting.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Each token has a `wait()` method, which allows you to wait for its cancellation.
Each token has a `wait()` method, which allows you to block the current thread until the token is cancelled.

```python
from cantok import TimeoutToken
Expand All @@ -10,26 +10,26 @@ token.check() # Since the timeout has expired, an exception will be raised.
#> cantok.errors.TimeoutCancellationError: The timeout of 5 seconds has expired.
```

If you add the `await` keyword before calling `wait()`, the method will be automatically used in non-blocking mode:
This is useful when one thread needs to wait for cancellation requested from another thread:

```python
import asyncio
from threading import Thread
from time import sleep
from cantok import SimpleToken

async def do_something(token):
await asyncio.sleep(3) # Imitation of some real async activity.
def do_something(token):
sleep(3) # Imitation of some real activity.
token.cancel()

async def main():
token = SimpleToken()
await asyncio.gather(do_something(token), token.wait())
print('Something has been done!')
token = SimpleToken()
thread = Thread(target=do_something, args=(token,))
thread.start()

asyncio.run(main())
token.wait()
thread.join()
print('Something has been done!')
```

Yes, it looks like magic — it is magic. The method itself finds out how it was used: inside an expression with or without the `await` keyword. In the first case, it runs in CPU-saving mode, in the second — in non-blocking event-loop mode.

In addition to the above, the `wait()` method has two optional arguments:

- **`timeout`** (`int` or `float`) — the maximum waiting time in seconds. If this time is exceeded, a [`TimeoutCancellationError` exception](../what_are_tokens/exceptions.md) will be raised. By default, the `timeout` is not set.
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ build-backend = "setuptools.build_meta"

[project]
name = "cantok"
version = "0.0.37"
version = "0.0.38"
authors = [{ name = "Evgeniy Blinov", email = "zheni-b@yandex.ru" }]
description = 'Implementation of the "Cancellation Token" pattern'
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
'displayhooks>=0.0.6',
'transfunctions>=0.0.12',
'typing_extensions ; python_version <= "3.9"',
]
Expand Down
40 changes: 1 addition & 39 deletions tests/examples/test_examples.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import asyncio
from contextlib import redirect_stdout
from io import StringIO
from random import randint
from threading import Thread

from cantok import ConditionToken, CounterToken, SimpleToken, TimeoutToken
from cantok import ConditionToken, CounterToken, TimeoutToken

counter = 0

Expand All @@ -30,38 +27,3 @@ def test_cancel_simple_token_with_function_and_thread_2():
counter += 1

assert counter


def test_waiting_of_cancelled_token():
async def do_something(token):
await asyncio.sleep(0.1) # Imitation of some real async activity.
token.cancel()

async def main():
token = SimpleToken()
await do_something(token)
await token.wait()
print('Something has been done!') # noqa: T201

buffer = StringIO()
with redirect_stdout(buffer):
asyncio.run(main())

assert buffer.getvalue() == 'Something has been done!\n'


def test_waiting_of_cancelled_token_with_gather():
async def do_something(token):
await asyncio.sleep(0.1) # Imitation of some real async activity.
token.cancel()

async def main():
token = SimpleToken()
await asyncio.gather(do_something(token), token.wait())
print('Something has been done!') # noqa: T201

buffer = StringIO()
with redirect_stdout(buffer):
asyncio.run(main())

assert buffer.getvalue() == 'Something has been done!\n'
Empty file.
Empty file.
Empty file.
44 changes: 44 additions & 0 deletions tests/skipped/sync_and_async_wait/examples/test_examples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import asyncio
from contextlib import redirect_stdout
from io import StringIO

import pytest

from cantok import SimpleToken


@pytest.mark.skip(reason='The universal sync/async wait() method is no longer supported because it suppressed exceptions raised during wait-time cancellation checks.')
def test_waiting_of_cancelled_token():
async def do_something(token):
await asyncio.sleep(0.1) # Imitation of some real async activity.
token.cancel()

async def main():
token = SimpleToken()
await do_something(token)
await token.wait()
print('Something has been done!') # noqa: T201

buffer = StringIO()
with redirect_stdout(buffer):
asyncio.run(main())

assert buffer.getvalue() == 'Something has been done!\n'


@pytest.mark.skip(reason='The universal sync/async wait() method is no longer supported because it suppressed exceptions raised during wait-time cancellation checks.')
def test_waiting_of_cancelled_token_with_gather():
async def do_something(token):
await asyncio.sleep(0.1) # Imitation of some real async activity.
token.cancel()

async def main():
token = SimpleToken()
await asyncio.gather(do_something(token), token.wait())
print('Something has been done!') # noqa: T201

buffer = StringIO()
with redirect_stdout(buffer):
asyncio.run(main())

assert buffer.getvalue() == 'Something has been done!\n'
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
from functools import partial
from time import perf_counter

import pytest

from cantok import (
ConditionToken,
CounterToken,
DefaultToken,
SimpleToken,
TimeoutToken,
)

ALL_TOKEN_CLASSES = [SimpleToken, ConditionToken, TimeoutToken, CounterToken]
ALL_ARGUMENTS_FOR_TOKEN_CLASSES = [tuple(), (lambda: False, ), (15, ), (15, )]
ALL_TOKENS_FABRICS = [partial(token_class, *arguments) for token_class, arguments in zip(ALL_TOKEN_CLASSES, ALL_ARGUMENTS_FOR_TOKEN_CLASSES)]


@pytest.mark.parametrize(
'parameters',
[
{'step': -1},
{'step': -1, 'timeout': -1},
{'step': -1, 'timeout': 0},
{'timeout': -1},
{'step': 1, 'timeout': -1},
{'step': -1, 'timeout': 1},
{'step': 2, 'timeout': 1},
],
)
@pytest.mark.parametrize(
'token_fabric',
[*ALL_TOKENS_FABRICS, DefaultToken],
)
@pytest.mark.parametrize(
'do_await',
[
True,
],
)
@pytest.mark.skip(reason='The universal sync/async wait() method is no longer supported because it suppressed exceptions raised during wait-time cancellation checks.')
def test_wait_wrong_parameters(token_fabric, parameters, do_await):
token = token_fabric()

if do_await:
with pytest.raises(ValueError, match=r'.'):
asyncio.run(token.wait(**parameters))
else:
with pytest.raises(ValueError, match=r'.'):
token.wait(**parameters)


@pytest.mark.parametrize(
'token_fabric',
[*ALL_TOKENS_FABRICS, DefaultToken],
)
@pytest.mark.skip(reason='The universal sync/async wait() method is no longer supported because it suppressed exceptions raised during wait-time cancellation checks.')
def test_async_wait_timeout(token_fabric):
timeout = 0.0001
token = token_fabric()

with pytest.raises(TimeoutToken.exception):
asyncio.run(token.wait(timeout=timeout))


@pytest.mark.parametrize(
'token_fabric',
ALL_TOKENS_FABRICS,
)
@pytest.mark.skip(reason='The universal sync/async wait() method is no longer supported because it suppressed exceptions raised during wait-time cancellation checks.')
def test_async_wait_with_cancel(token_fabric):
timeout = 0.001
token = token_fabric()

async def cancel_with_timeout(token):
await asyncio.sleep(timeout)
token.cancel()

async def runner(token):
coroutines = [cancel_with_timeout(token), token.wait() ]
return await asyncio.gather(*coroutines)

start_time = perf_counter()
asyncio.run(runner(token))
finish_time = perf_counter()

assert not token
assert finish_time - start_time >= timeout
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
(lambda: 'kek', f'{"kek"!r}\n'),
],
)
@pytest.mark.skip(reason='The universal sync/async wait() method is no longer supported because it suppressed exceptions raised during wait-time cancellation checks.')
def test_displayhook_printing_coroutine_wrappers_and_other_objects(create_value, expected_string):
buffer = io.StringIO()
with redirect_stdout(buffer):
Expand Down
Loading
Loading