Skip to content

Commit 810819f

Browse files
authored
Fix possible race condition in topic common (#769)
1 parent 78af122 commit 810819f

10 files changed

Lines changed: 25 additions & 16 deletions

File tree

examples/coordination/example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def run(endpoint, database):
4343
threads = []
4444

4545
for i in range(4):
46-
worker_name = f"worker {i+1}"
46+
worker_name = f"worker {i + 1}"
4747
if i < 2:
4848
thread = threading.Thread(target=linear_workload, args=(driver.coordination_client, worker_name))
4949
else:

examples/topic/writer_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import concurrent.futures
22
import datetime
33
from typing import Dict, List
4-
from concurrent.futures import Future, wait
4+
from concurrent.futures import Future, wait # noqa: F401
55

66
import ydb
77
from ydb import TopicWriterMessage

test-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ zipp==3.19.1
3838
aiohttp>=3.9.0
3939
pytest-pep8
4040
pytest-flake8
41-
flake8==3.9.2
41+
flake8==6.1.0
4242
sqlalchemy==1.4.26
4343
pylint-protobuf
4444
cython

tests/iam/test_auth.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def __init__(self, account_id, key_id, private_key, iam_endpoint=None, iam_chann
1414
self.iam_channel_credentials = iam_channel_credentials
1515

1616
def __eq__(self, other):
17-
return self.__dict__ == other.__dict__ if type(self) == type(other) else False
17+
return self.__dict__ == other.__dict__ if type(self) is type(other) else False
1818

1919

2020
@patch("builtins.open", new_callable=mock_open, read_data=CONTENT1)

tests/scheme/scheme_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import typing
1+
import typing # noqa: F401
22
import pytest
33

44
import ydb

tests/test_errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def test_scheme_error(driver_sync, database):
1111

1212
server_code = ydb.issues.StatusCode.SCHEME_ERROR
1313

14-
assert type(exc.value) == ydb.issues.SchemeError
14+
assert type(exc.value) is ydb.issues.SchemeError
1515
assert exc.value.status == server_code
1616
assert f"server_code: {server_code}" in str(exc.value)
1717
assert "Path does not exist" in str(exc.value)

tests/topics/test_topic_writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4-
from typing import List
4+
from typing import List # noqa: F401
55

66
import pytest
77

ydb/_topic_common/common.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,26 @@ def wrapper(rpc_state, response_pb, driver=None):
3636

3737

3838
def _get_shared_event_loop() -> asyncio.AbstractEventLoop:
39-
global _shared_event_loop
40-
4139
if _shared_event_loop is not None:
4240
return _shared_event_loop
4341

4442
with _shared_event_loop_lock:
4543
if _shared_event_loop is not None:
4644
return _shared_event_loop
4745

48-
event_loop_set_done: concurrent.futures.Future[asyncio.AbstractEventLoop] = concurrent.futures.Future()
46+
loop_ready: threading.Event = threading.Event()
4947

5048
def start_event_loop():
5149
event_loop = asyncio.new_event_loop()
52-
event_loop_set_done.set_result(event_loop)
5350
asyncio.set_event_loop(event_loop)
51+
52+
def on_loop_started():
53+
# Set global only when loop is actually running
54+
global _shared_event_loop
55+
_shared_event_loop = event_loop
56+
loop_ready.set()
57+
58+
event_loop.call_soon(on_loop_started)
5459
event_loop.run_forever()
5560

5661
t = threading.Thread(
@@ -60,7 +65,11 @@ def start_event_loop():
6065
)
6166
t.start()
6267

63-
_shared_event_loop = event_loop_set_done.result()
68+
loop_ready.wait()
69+
70+
if _shared_event_loop is None:
71+
raise RuntimeError("Event loop was not properly initialized")
72+
6473
return _shared_event_loop
6574

6675

ydb/retries.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(self, timeout: float) -> None:
7777

7878
def __eq__(self, other: object) -> bool:
7979
return (
80-
type(self) == type(other) and isinstance(other, YdbRetryOperationSleepOpt) and self.timeout == other.timeout
80+
type(self) is type(other) and isinstance(other, YdbRetryOperationSleepOpt) and self.timeout == other.timeout
8181
)
8282

8383
def __repr__(self) -> str:
@@ -91,7 +91,7 @@ def __init__(self, result: Any) -> None:
9191

9292
def __eq__(self, other: object) -> bool:
9393
return (
94-
type(self) == type(other)
94+
type(self) is type(other)
9595
and isinstance(other, YdbRetryOperationFinalResult)
9696
and self.result == other.result
9797
and self.exc == other.exc

ydb/table_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def test_retry_operation_impl(monkeypatch):
1414
monkeypatch.setattr(
1515
issues.Error,
1616
"__eq__",
17-
lambda self, other: type(self) == type(other) and self.message == other.message,
17+
lambda self, other: type(self) is type(other) and self.message == other.message,
1818
)
1919

2020
retry_once_settings = RetrySettings(
@@ -43,7 +43,7 @@ def __init__(self, message):
4343
self.message = message
4444

4545
def __eq__(self, other):
46-
return type(self) == type(other) and self.message == other.message
46+
return type(self) is type(other) and self.message == other.message
4747

4848
def check_unretriable_error(err_type, call_ydb_handler):
4949
retry_once_settings.on_ydb_error_callback.reset_mock()

0 commit comments

Comments
 (0)