Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions src/filterpath/_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def get( # noqa: C901, PLR0915
:return:
:rtype: Any | list[Any]
"""
escapable_sequences = frozenset({path_separator, "\\"})
escapable_sequences = frozenset({path_separator, "\\", "["})
sentinel = object()

def _deep_get(_obj: ObjTypes, _path: PathTypes) -> Any | list[Any]:
def _deep_get(_obj: ObjTypes, _path: PathTypes, container: list) -> Any | list[Any]: # noqa: C901
if _obj is sentinel:
# STOP: Run out of objects to traverse
logger.trace("out of objects: raising NoPathExistsError")
Expand All @@ -49,15 +49,50 @@ def _deep_get(_obj: ObjTypes, _path: PathTypes) -> Any | list[Any]:
logger.trace("out of iterables: raising NoPathExistsError")
raise NoPathExistsError(obj, path)

key, _path = _parse_path(_path)
key, _path, has_container = _parse_path(_path)
logger.trace(f"current key '{key}' and remaining path '{_path}'")

if has_container:
logger.trace("encountering container")
# Strip brackets for any filtering key or function
filter_key = key[1:-1]

logger.trace(f"filtering container on '{key}'")
try:
filtered_obj = _deep_get(_obj, filter_key, container)
except KeyError:
logger.trace(f"unable to filter '{_obj}' on '{filter_key}', return empty list")
return container

if isinstance(filtered_obj, dict):
filtered_obj = filtered_obj.values()

logger.trace(f"iterating {filtered_obj}")
try:
filtered_obj = iter(filtered_obj)
except TypeError:
logger.trace(f"{filtered_obj} not iterable, returning {filtered_obj}")
container.append(filtered_obj)
return container

for item in filtered_obj:
logger.trace(f"getting path '{_path}' of '{item}'")
try:
deep_obj = _deep_get(item, _path, container)
if deep_obj is not container:
container.append(deep_obj)
except KeyError:
pass

return container

logger.trace(f"access '{key}' in {_obj}")
return _deep_get(_get_any(_obj, key), _path)
return _deep_get(_get_any(_obj, key), _path, container)

def _parse_path(_path: PathTypes) -> tuple[Any, PathTypes]:
def _parse_path(_path: PathTypes) -> tuple[Any, PathTypes, bool]:
if isinstance(_path, str):
is_escaped = False
has_container = _path.startswith("[")
escape_indexes = []
for idx, char in enumerate(_path):
if not is_escaped:
Expand All @@ -75,18 +110,18 @@ def _parse_path(_path: PathTypes) -> tuple[Any, PathTypes]:
idx += 1

parsed_path = _remove_char_at_index(_path[:idx], escape_indexes)
return parsed_path, _path[idx + 1 :]
return parsed_path, _path[idx + 1 :], has_container and parsed_path.endswith("]")

# Get next from _path, operating on a list/tuple
curr_path = _path[0]
if isinstance(curr_path, str) and path_separator in curr_path:
# Parse the returned key for any unescaped subpaths
curr_path, remaining_path = _parse_path(curr_path)
curr_path, remaining_path, has_container = _parse_path(curr_path)
if remaining_path:
# Prepend the remaining subpath
remaining_path = [remaining_path, *_path[1:]]
return curr_path, remaining_path
return curr_path, _path[1:]
return curr_path, remaining_path, has_container
return curr_path, _path[1:], False

def _remove_char_at_index(string: str, index: int | list[int]) -> str:
if isinstance(index, int):
Expand Down Expand Up @@ -121,12 +156,12 @@ def _get_any(_obj: ObjTypes, key: Any) -> Any:

if isinstance(path, PathTypes):
try:
return _deep_get(obj, path)
except NoPathExistsError as err:
return _deep_get(obj, path, [])
except NoPathExistsError:
if raise_if_unfound:
logger.trace("raise KeyError instead of returning default")
raise KeyError from err
raise
logger.trace(f"return default value: {default}")
return default
else:
raise TypeError from NotPathLikeError(path)
raise NotPathLikeError(path)
Empty file added tests/__init__.py
Empty file.
111 changes: 111 additions & 0 deletions tests/get_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from collections import defaultdict
from typing import Any, NamedTuple

import pytest

from filterpath import get
from filterpath._exceptions import NotPathLikeError


class SomeNamedTuple(NamedTuple):
Expand All @@ -21,12 +23,26 @@ def __init__(self, **attrs):
[
({"one": {"two": {"three": 4}}}, ("one.two",), {"three": 4}),
({"one": {"two": {"three": 4}}}, ("one.two.three",), 4),
({"one": {"two": {"three": 4}}}, (["one", "two"],), {"three": 4}),
({"one": {"two": {"three": 4}}}, (["one", "two", "three"],), 4),
({"one": {"two": {"three": 4}}}, ("one.four",), None),
({"one": {"two": {"three": 4}}}, ("one.four.three", []), []),
({"one": {"two": {"three": 4}}}, ("one.four.0.a", [{"a": 1}]), [{"a": 1}]),
({"one": {"two": {"three": [{"a": 1}]}}}, ("one.four.three.0.a", []), []),
({"one": {"two": {"three": 4}}}, ("one.four.three",), None),
({"one": {"two": {"three": [{"a": 1}]}}}, ("one.four.three.0.a",), None),
({"one": {"two": {"three": 4}}}, ("one.four.three", 2), 2),
({"one": {"two": {"three": [{"a": 1}]}}}, ("one.four.three.0.a", 2), 2),
({"one": {"two": {"three": 4}}}, ("one.four.three", {"test": "value"}), {"test": "value"}),
(
{"one": {"two": {"three": [{"a": 1}]}}},
("one.four.three.0.a", {"test": "value"}),
{"test": "value"},
),
({"one": {"two": {"three": 4}}}, ("one.four.three", "haha"), "haha"),
({"one": {"two": {"three": [{"a": 1}]}}}, ("one.four.three.0.a", "haha"), "haha"),
({"one": {"two": {"three": 4}}}, ("five",), None),
({"one": ["two", {"three": [4, 5]}]}, (["one", 1, "three", 1],), 5),
({"one": ["two", {"three": [4, 5]}]}, ("one.1.three.1",), 5),
({"one": ["two", {"three": [4, 5]}]}, ("one.1.three",), [4, 5]),
(["one", {"two": {"three": [4, 5]}}], ("1.two.three.0",), 4),
Expand All @@ -38,13 +54,28 @@ def __init__(self, **attrs):
([[[[[[[[[[42]]]]]]]]]], ("0.0.0.0.0.0.0.0.0.0",), 42),
([range(50)], ("0.42",), 42),
({"a": [{"b": range(50)}]}, ("a.0.b.42",), 42),
(
{"lev.el1": {"lev\\el2": {"level3": ["value"]}}},
("lev\\.el1.lev\\el2.level3.0",),
"value",
),
({"a.1": 2, "a\\1": 3, "a\\.1": 4}, ("a\\.1",), 2),
({"a.1": 2, "a\\1": 3, "a\\.1": 4}, ("a\\\\1",), 3),
({"a.1": 2, "a\\1": 3, "a\\.1": 4}, ("a\\\\\\.1",), 4),
({"one": ["hello", "there"]}, ("one.bad.hello", []), []),
({"one": ["hello", None]}, ("one.1.hello",), None),
(SomeNamedTuple(1, 2), ("a",), 1),
(SomeNamedTuple(1, 2), ("0",), 1),
(SomeNamedTuple({"c": {"d": 1}}, 2), ("a.c.d",), 1),
({}, ("update",), None),
([], ("extend",), None),
({(1,): {(2,): 3}}, ([(1,)],), {(2,): 3}),
({(1,): {(2,): 3}}, ([(1,), (2,)],), 3),
({object: 1}, ([object],), 1),
({object: {object: 1}}, ([object, object],), 1),
({object: {object: [1, 2, 3, {"a": "b"}]}}, ([object, object, "3.a"],), "b"),
({1: {"name": "John Doe"}}, ("1.name",), "John Doe"),
(Object(), ("0.field",), None),
(["1", 2, "c"], ("2.0",), None),
(["1", 2, "c"], (".",), None),
(["1", 2, "c"], ("0.",), "1"),
Expand All @@ -54,6 +85,7 @@ def __init__(self, **attrs):
({"": {"": "b"}}, (".",), {"": "b"}),
({"": {"": "b"}}, ("..",), "b"),
({"": {"": "b"}}, ("...",), None),
({None: 1}, ([None],), 1),
(Object(), ("__name__",), None),
(Object(), ("foo.__dict__",), None),
(Object(), ("__len__",), None),
Expand All @@ -65,3 +97,82 @@ def __init__(self, **attrs):
)
def test_get(obj, args, expected):
assert get(obj, *args) == expected


@pytest.mark.parametrize(
("path", "expected"),
[
("a", [1, 2, {"b": [3, 4]}, {"b": [5, 6]}]),
("0", "c"),
("a.0", 1),
("a\\.0", 11),
("a\\\\\\.0", 12),
("a\\\\.0", 13),
("\\[0]", 9),
("\\\\[0]", 10),
("a.[]", [1, 2, {"b": [3, 4]}, {"b": [5, 6]}]),
("a.b", None),
("a.[b]", []),
("a.[4]", []),
("a.4", None),
("a.[z]", []),
("a.z", None),
("a.b.[]", None),
("[]", [[1, 2, {"b": [3, 4]}, {"b": [5, 6]}], "c", 9, 10, 11, 12, [13], {":0": 99}]),
("[].[]", [1, 2, {"b": [3, 4]}, {"b": [5, 6]}, 13, 99]),
("[].[].[]", [[3, 4], [5, 6]]),
("[].[].[].[]", [3, 4, 5, 6]),
("[].[].[].[].[]", []),
("a.[0]", [1]),
("a.[].0", []),
("a.b.0", None),
("a.2.b.0", 3),
("a.3.b.0", 5),
("a.[].b", [[3, 4], [5, 6]]),
("a.[].b.0", [3, 5]),
("a.[].b.[]", [3, 4, 5, 6]),
],
)
def test_get_enhanced(path, expected):
obj = {
"a": [1, 2, {"b": [3, 4]}, {"b": [5, 6]}],
0: "c",
"[0]": 9,
"\\[0]": 10,
"a.0": 11,
"a\\.0": 12,
"a\\": [13],
"x": {":0": 99},
}
assert get(obj, path) == expected


def test_get__should_not_populate_defaultdict():
data = defaultdict(list)
get(data, "a")
assert data == {}


@pytest.mark.parametrize(
("obj", "path"),
[
(Object(), 1),
(Object(), Object()),
],
)
def test_get__raises_type_error_for_non_pathlike(obj, path):
with pytest.raises(TypeError, match="path argument must be one of 'str | list | tuple', not '.*'"):
get(obj, path)


@pytest.mark.parametrize(
("obj", "path"),
[
({"one": {"two": {"three": 4}}}, "one.four"),
({"one": {"two": {"three": 4}}}, "one.four.three"),
({"one": {"two": {"three": [{"a": 1}]}}}, "one.four.three.0.a"),
],
)
def test_get__raises_key_error_for_unfound(obj, path):
with pytest.raises(KeyError, match=".* does not contain path '.*'"):
get(obj, path, raise_if_unfound=True)