Skip to content

Commit d74649c

Browse files
committed
migrate from davidkhala.data.format to davidkhala.data.integration
1 parent 19050e2 commit d74649c

13 files changed

Lines changed: 117 additions & 128 deletions

File tree

.github/workflows/data.yaml

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ jobs:
1010
tests: format/tests
1111
test-entry-point: pytest
1212
working-directory: format
13-
env:
14-
PRIVATE_KEY: ${{secrets.PRIVATE_KEY}}
1513
frame:
1614
runs-on: ubuntu-latest
1715
steps:
@@ -30,4 +28,15 @@ jobs:
3028
- run: export DOCKER_BUILDKIT=1
3129
- run: docker build -t dash:latest visualization/app/dash
3230
- run: docker run --name $container -d dash:latest
33-
- run: curl https://raw.githubusercontent.com/davidkhala/containers/refs/heads/main/cli/health.sh | bash -s wait-until-healthy $container
31+
- run: curl https://raw.githubusercontent.com/davidkhala/containers/refs/heads/main/cli/health.sh | bash -s wait-until-healthy $container
32+
integration:
33+
runs-on: ubuntu-latest
34+
steps:
35+
- uses: actions/checkout@main
36+
- uses: davidkhala/uv-buildpack@main
37+
with:
38+
tests: integration/tests
39+
test-entry-point: pytest
40+
working-directory: integration
41+
env:
42+
PRIVATE_KEY: ${{secrets.PRIVATE_KEY}}

format/davidkhala/data/format/parquet/__init__.py renamed to format/davidkhala/data/format/arrow/parquet.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ def read_batch(self) -> Table:
1717
return self.file.read()
1818

1919
def read_stream(self) -> Iterator[RecordBatch]:
20-
return self.file.iter_batches()
20+
return self.file.iter_batches()

format/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "davidkhala.data.format"
33
version = "0.0.5"
44
description = ""
55
authors = [{ name = "David Liu", email = "david-khala@hotmail.com" }]
6-
requires-python = "~=3.10"
6+
requires-python = ">=3.10"
77
readme = "README.md"
88

99
[project.optional-dependencies]

format/tests/arrow_test.py

Lines changed: 19 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import os
21
import unittest
32

4-
from pyarrow import input_stream
5-
63
from davidkhala.data.format.arrow.gcp import GCS
7-
from davidkhala.data.format.arrow.local_fs import LocalFS
8-
from davidkhala.data.format.parquet import Parquet
4+
from davidkhala.data.format.arrow.parquet import Parquet
5+
from pyarrow import input_stream
6+
from pyarrow.parquet import ColumnSchema, ParquetSchema, ParquetLogicalType
97

108

119
class Samples(unittest.TestCase):
@@ -31,51 +29,25 @@ def test_GCS(self):
3129
with fs.open_input_stream(file_list[0]) as f:
3230
self.assertEqual(f.read(64), b'GROUP = FILE_HEADER\n LANDSAT_SCENE_ID = "LC80010032013082LGN03"')
3331

34-
def test_parquet2arrow(self):
35-
parquet = Parquet('fixtures/gcp-data-davidkhala.dbt_davidkhala.country_codes.parquet')
36-
arrow_batch_path = 'artifacts/gcp-data-davidkhala.dbt_davidkhala.country_codes.batch.arrow'
37-
38-
fs = LocalFS()
39-
fs.overwrite = True
40-
fs.write_batch(arrow_batch_path, parquet.read_batch())
41-
arrow_stream_path = 'artifacts/gcp-data-davidkhala.dbt_davidkhala.country_codes.stream.arrow'
42-
fs.write_stream(arrow_stream_path, parquet.read_stream())
43-
44-
45-
class GCSTests(unittest.TestCase):
46-
"""
47-
tests on private bucket
48-
"""
49-
bucket = "davidkhala-data"
50-
51-
def test_ADC(self):
52-
"""
53-
based on GCP ADC
54-
"""
55-
for file in self.gcs.ls(self.bucket):
56-
print(file.path)
57-
58-
@property
59-
def gcs(self):
60-
private_key = os.environ.get("PRIVATE_KEY")
61-
if private_key:
62-
return GCS.from_service_account({
63-
'client_email': 'data-integration@gcp-data-davidkhala.iam.gserviceaccount.com',
64-
'private_key': private_key,
65-
})
66-
else:
67-
return GCS()
6832

69-
def test_service_account(self):
70-
GCSTests.gcs.fget(self)
33+
class ParquetTestCase(unittest.TestCase):
7134

72-
def test_parquet2arrow(self):
73-
parquet = Parquet('fixtures/gcp-data-davidkhala.dbt_davidkhala.country_codes.parquet')
74-
stream_uri = "gs://davidkhala-data/gcp-data-davidkhala.dbt_davidkhala.country_codes.stream.arrow"
75-
self.gcs.write_stream(stream_uri, parquet.read_stream())
76-
batch_uri = "gs://davidkhala-data/gcp-data-davidkhala.dbt_davidkhala.country_codes.batch.arrow"
77-
self.gcs.write_batch(batch_uri, parquet.read_batch())
35+
def setUp(self):
36+
self.parquet = Parquet('fixtures/gcp-data-davidkhala.dbt_davidkhala.country_codes.parquet')
7837

38+
def test_schema(self):
39+
self.assertIsInstance(self.parquet.schema, ParquetSchema)
40+
for field in self.parquet.schema:
41+
self.assertIsInstance(field, ColumnSchema)
42+
print(field)
43+
self.assertIsInstance(field.logical_type, ParquetLogicalType)
44+
self.assertEqual(str(field.logical_type), "String")
7945

46+
def test_stream(self):
47+
for record_batch in self.parquet.read_stream():
48+
print(record_batch)
49+
print('<<<<')
50+
for column in record_batch.column_names:
51+
print(column, record_batch.column(column))
8052
if __name__ == '__main__':
8153
unittest.main()

format/tests/avro_test.py

Lines changed: 0 additions & 39 deletions
This file was deleted.

format/tests/parquet_test.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

integration/davidkhala/data/integration/from/__init__.py renamed to integration/davidkhala/data/integration/sink/__init__.py

File renamed without changes.
File renamed without changes.

integration/davidkhala/data/integration/to/__init__.py renamed to integration/davidkhala/data/integration/source/__init__.py

File renamed without changes.

format/davidkhala/data/format/transform/__init__.py renamed to integration/davidkhala/data/integration/source/arrow.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
)
99

1010

11-
class Arrow2Avro:
12-
arrow: Table
11+
class ToAvro:
1312

1413
def __init__(self, table: Table):
15-
self.arrow = table
14+
self.table = table
1615

1716
@staticmethod
1817
def type(arrow_type: DataType):
@@ -34,12 +33,12 @@ def type(arrow_type: DataType):
3433
return {"type": "int", "logicalType": "date"}
3534
elif is_list(arrow_type):
3635

37-
return {"type": "array", "items": Arrow2Avro.type(cast(ListType, arrow_type).value_type)}
36+
return {"type": "array", "items": ToAvro.type(cast(ListType, arrow_type).value_type)}
3837
elif is_struct(arrow_type):
3938
return {
4039
"type": "record",
4140
"name": "struct",
42-
"fields": [{"name": field.name, "type": Arrow2Avro.type(field.type)} for field in
41+
"fields": [{"name": field.name, "type": ToAvro.type(field.type)} for field in
4342
cast(StructType, arrow_type)]
4443
}
4544
else:
@@ -52,10 +51,10 @@ def schema(self):
5251
"name": "Root",
5352
"fields": list(map(lambda _field: {
5453
"name": _field.name,
55-
"type": Arrow2Avro.type(_field.type)
56-
}, self.arrow.schema))
54+
"type": ToAvro.type(_field.type)
55+
}, self.table.schema))
5756
}
5857

5958
@property
6059
def records(self):
61-
return self.arrow.to_pylist()
60+
return self.table.to_pylist()

0 commit comments

Comments
 (0)