Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- run: curl -sSL https://install.python-poetry.org | python - -y
- run: curl -sSL https://install.python-poetry.org | POETRY_VERSION=1.8.4 python - -y
- run: poetry config virtualenvs.in-project true
- run: make test
- run: make test-dep-versions
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v3

Expand All @@ -50,7 +50,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Bootstrap poetry
run: curl -sSL https://install.python-poetry.org | python - -y
run: curl -sSL https://install.python-poetry.org | POETRY_VERSION=1.8.4 python - -y

- name: Configure poetry
run: poetry config virtualenvs.in-project true
Expand Down
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ test: prepare

.PHONY: test-python-versions
test-python-versions:
poetry env use python3.8
make test

poetry env use python3.9
make test

Expand Down
735 changes: 362 additions & 373 deletions poetry.lock

Large diffs are not rendered by default.

22 changes: 6 additions & 16 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipedata"
version = "0.3"
version = "0.4"
description = "Framework for building pipelines for data processing"
authors = ["Simon Wicks <simw@users.noreply.github.com>"]
readme = "README.md"
Expand All @@ -22,35 +22,25 @@ classifiers = [
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
"Typing :: Typed",
]
packages = [{include = "pipedata", from = "src"}]

[tool.poetry.dependencies]
python = "^3.8"
python = "^3.9"

[tool.poetry.group.ops.dependencies]
fsspec = [
{ version = ">=0.9.0", python = "<3.12" },
{ version = ">=2022.1.0", python = ">=3.12,<3.13"},
]
fsspec = ">=2022.1.0"
ijson = "^3.0.0"
pyarrow = [
{ version = ">=9.0.0", python = "<3.11" },
{ version = ">=11.0.0", python = ">=3.11,<3.12" },
{ version = ">=14.0.0", python = ">=3.12,<=3.13" },
]
# We don't have a direct numpy dependency, but pyarrow depends on numpy
# and numpy has python version constraints with python 3.12
numpy = [
{ version = "<1.25.0", python = "<3.9" },
{ version = "^1.26.0", python = ">=3.12,<3.13" }
{ version = ">=16.0.0", python = "<3.13" },
{ version = ">=18.0.0", python = ">=3.13" },
]

[tool.poetry.group.lint.dependencies]
Expand Down
24 changes: 2 additions & 22 deletions scripts/test_dependency_versions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,11 @@ main() {
PYTHON_MINOR_VERSION=$(poetry run python -c 'import sys; version=sys.version_info[:3]; print("{1}".format(*version))')
echo "Python minor version: $PYTHON_MINOR_VERSION"

# The errors are mostly / all installation errors,
# about building from source. Could lower
# the requirements if able to build from source.
if (( $PYTHON_MINOR_VERSION < "11" )); then
poetry run pip install pyarrow==9.0.0
poetry run python -m pytest

poetry run pip install pyarrow==10.0.0
if (( $PYTHON_MINOR_VERSION < "13" )); then
poetry run pip install pyarrow==16.0.0
poetry run python -m pytest
fi

if (( $PYTHON_MINOR_VERSION < "12" )); then
poetry run pip install pyarrow==11.0.0
poetry run python -m pytest

poetry run pip install pyarrow==13.0.0
poetry run python -m pytest

poetry run pip install fsspec==0.9.0
poetry run python -m pytest
fi

poetry run pip install pyarrow==14.0.0
poetry run python -m pytest

poetry run pip install ijson==3.0.0
poetry run python -m pytest

Expand Down
2 changes: 1 addition & 1 deletion src/pipedata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.3"
__version__ = "0.4"

__all__ = [
"__version__",
Expand Down
8 changes: 5 additions & 3 deletions src/pipedata/core/links.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def __iter__(self) -> Iterator[TStart]:
def __next__(self) -> TStart:
self._count += 1
try:
return next(self._iterator)
next_value = next(self._iterator)
return next_value
except StopIteration as err:
self._count -= 1
raise StopIteration from err
Expand All @@ -45,9 +46,10 @@ def __init__(
def __name__(self) -> str: # noqa: A003
return self._func.__name__

def __call__(self, input_iterator: Iterator[TStart]) -> Iterator[TEnd]:
def __call__(self, input_iterator: Iterator[TStart]) -> CountingIterator[TEnd]:
self._input = CountingIterator(input_iterator)
self._output = CountingIterator(self._func(self._input))
result = self._func(self._input)
self._output = CountingIterator(result)
return self._output

def get_counts(self) -> Tuple[int, int]:
Expand Down
34 changes: 34 additions & 0 deletions src/pipedata/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,37 @@ def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]:
return (func(elements) for elements in _batched(previous_step, n))

super().__init__(new_action)


class chain_iterables(ChainLink[Iterator[TEnd], TEnd]): # noqa: N801
def __init__(self) -> None:
def chain_iterables_(previous_step: Iterator[Iterator[TEnd]]) -> Iterator[TEnd]:
return itertools.chain.from_iterable(previous_step)

super().__init__(chain_iterables_)


class grouper(ChainLink[TEnd, list[TEnd]]): # noqa: N801
def __init__(
self,
*,
starter: Optional[Callable[[TEnd], bool]] = None,
ender: Optional[Callable[[TEnd], bool]] = None,
) -> None:
def grouper_(previous_step: Iterator[TEnd]) -> Iterator[list[TEnd]]:
group: list[TEnd] = []
for element in previous_step:
if starter is not None and starter(element) and len(group) > 0:
yield group
group = [element]
elif ender is not None and ender(element):
group.append(element)
yield group
group = []
else:
group.append(element)

if len(group) > 0:
yield group

super().__init__(grouper_)
67 changes: 67 additions & 0 deletions tests/core/test_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,70 @@ def add_values(values: Tuple[int, ...]) -> int:
"outputs": 3,
},
]


def test_chain_iterables() -> None:
# TODO: make typing work with chain_iterables
chain = Chain[int]().then(ops.chain_iterables()) # type: ignore
inputs = iter([iter([0, 1]), iter([2, 3])])
result = list(chain(inputs)) # type: ignore
assert result == [0, 1, 2, 3]
assert chain.get_counts() == [
{
"name": "_identity",
"inputs": 2,
"outputs": 2,
},
{
"name": "chain_iterables_",
"inputs": 2,
"outputs": 4,
},
]


def test_chain_grouper() -> None:
def is_one(val: int) -> bool:
return val == 1

def is_three(val: int) -> bool:
return val == 3 # noqa: PLR2004

chain = Chain[int]().then(ops.grouper(starter=is_one, ender=is_three))
inputs = [1, 2, 3, 4, 1, 2, 3, 4]
result = list(chain(iter(inputs)))
assert result == [[1, 2, 3], [4], [1, 2, 3], [4]]
assert chain.get_counts() == [
{
"name": "_identity",
"inputs": 8,
"outputs": 8,
},
{
"name": "grouper_",
"inputs": 8,
"outputs": 4,
},
]


def test_chain_grouper_no_end() -> None:
def is_four(val: int) -> bool:
return val == 4 # noqa: PLR2004

chain = Chain[int]().then(ops.grouper(ender=is_four))
inputs = [1, 2, 3, 4, 1, 2, 3, 4]
result = list(chain(iter(inputs)))
assert result == [[1, 2, 3, 4], [1, 2, 3, 4]]
assert chain.get_counts() == [
{
"name": "_identity",
"inputs": 8,
"outputs": 8,
},
{
"name": "grouper_",
"inputs": 8,
"outputs": 2,
},
]
41 changes: 40 additions & 1 deletion tests/ops/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pyarrow.parquet as pq # type: ignore

from pipedata.core import Stream, ops
from pipedata.core import Chain, Stream, ops
from pipedata.ops import json_records, parquet_writer, zipped_files


Expand Down Expand Up @@ -52,3 +52,42 @@ def test_zipped_files() -> None:
"a": [1, 3, 5, 7, 9, 11],
"b": [2, 4, 6, 8, 10, 12],
}


def test_zipped_file_contents() -> None:
contents = """
<xml>
<name>John</name>
<age>30</age>
</xml>
<xml>
<name>Smith</name>
<age>40</age>
</xml>
"""

# Note: can't do type checking with lambdas
with tempfile.TemporaryDirectory() as temp_dir:
zip_path = Path(temp_dir) / "test.zip"

with zipfile.ZipFile(zip_path, "w") as zip_file:
zip_file.writestr("test.txt", contents)
zip_file.writestr("test2.txt", contents)
zip_file.writestr("test3.txt", contents)

extract_xmls = (
Chain()
.then(ops.grouper(starter=lambda line: line.strip().startswith(b"<xml"))) # type: ignore
.then(ops.mapping(lambda x: b"\n".join(x))) # type: ignore
.then(ops.filtering(lambda x: x.strip() != b"")) # type: ignore
)

result = (
Stream([str(zip_path)])
.then(zipped_files)
.then(ops.mapping(lambda x: x.contents)) # type: ignore
.then(ops.chain_iterables())
.then(extract_xmls)
.to_list()
)
assert len(result) == 6 # noqa: PLR2004
Loading