Skip to content

Commit 536e163

Browse files
authored
fix: Emit migrate messages for child relations (#21)
A fix for the scheduler so that migrate messages are emitted for child tables.
1 parent 146c549 commit 536e163

File tree

2 files changed

+22
-11
lines changed

2 files changed

+22
-11
lines changed

cloudquery/sdk/scheduler/scheduler.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ def sync(
171171
self, client, resolvers: List[TableResolver], deterministic_cq_id=False
172172
) -> Generator[SyncMessage, None, None]:
173173
res = queue.Queue()
174-
for resolver in resolvers:
175-
yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema())
174+
yield from self._send_migrate_table_messages(resolvers)
175+
176176
thread = futures.ThreadPoolExecutor()
177177
thread.submit(self._sync, client, resolvers, res, deterministic_cq_id)
178178
total_table_resolvers = 0
@@ -191,3 +191,11 @@ def sync(
191191
continue
192192
yield message
193193
thread.shutdown(wait=True)
194+
195+
def _send_migrate_table_messages(
196+
self, resolvers: List[TableResolver]
197+
) -> Generator[SyncMessage, None, None]:
198+
for resolver in resolvers:
199+
yield SyncMigrateTableMessage(table=resolver.table.to_arrow_schema())
200+
if resolver.child_resolvers:
201+
yield from self._send_migrate_table_messages(resolver.child_resolvers)

tests/scheduler/scheduler.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from typing import Any, List, Generator
2+
23
import pyarrow as pa
3-
import pytest
4-
from cloudquery.sdk.scheduler import Scheduler, TableResolver
5-
from cloudquery.sdk.schema import Table, Column, Resource
4+
65
from cloudquery.sdk.message import SyncMessage
6+
from cloudquery.sdk.scheduler import Scheduler, TableResolver
7+
from cloudquery.sdk.schema import Column
78
from cloudquery.sdk.schema.table import Table
89

910

@@ -46,10 +47,12 @@ def test_scheduler():
4647
expected_record1 = pa.record_batch([[1]], schema=table1.to_arrow_schema())
4748
table2 = Table("test_child_table", [Column("test_child_column", pa.int64())])
4849
expected_record2 = pa.record_batch([[2]], schema=table2.to_arrow_schema())
49-
resources: List[SyncMessage] = []
50-
for resource in s.sync(client, [SchedulerTestTableResolver()]):
51-
resources.append(resource)
52-
assert len(resources) == 3
53-
assert resources[1].record == expected_record1
54-
assert resources[2].record == expected_record2
50+
messages: List[SyncMessage] = []
51+
for message in s.sync(client, [SchedulerTestTableResolver()]):
52+
messages.append(message)
53+
assert len(messages) == 4
54+
assert Table.from_arrow_schema(messages[0].table).name == "test_table"
55+
assert Table.from_arrow_schema(messages[1].table).name == "test_child_table"
56+
assert messages[2].record == expected_record1
57+
assert messages[3].record == expected_record2
5558
s.shutdown()

0 commit comments

Comments
 (0)