Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ fn array_cat(exprs: Vec<PyExpr>) -> PyExpr {
array_concat(exprs)
}

#[pyfunction]
fn make_map(keys: Vec<PyExpr>, values: Vec<PyExpr>) -> PyExpr {
let keys = keys.into_iter().map(|x| x.into()).collect();
let values = values.into_iter().map(|x| x.into()).collect();
datafusion::functions_nested::map::map(keys, values).into()
}

#[pyfunction]
#[pyo3(signature = (array, element, index=None))]
fn array_position(array: PyExpr, element: PyExpr, index: Option<i64>) -> PyExpr {
Expand Down Expand Up @@ -665,6 +672,12 @@ array_fn!(cardinality, array);
array_fn!(flatten, array);
array_fn!(range, start stop step);

// Map Functions
array_fn!(map_keys, map);
array_fn!(map_values, map);
array_fn!(map_extract, map key);
array_fn!(map_entries, map);

aggregate_function!(array_agg);
aggregate_function!(max);
aggregate_function!(min);
Expand Down Expand Up @@ -1124,6 +1137,13 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(flatten))?;
m.add_wrapped(wrap_pyfunction!(cardinality))?;

// Map Functions
m.add_wrapped(wrap_pyfunction!(make_map))?;
m.add_wrapped(wrap_pyfunction!(map_keys))?;
m.add_wrapped(wrap_pyfunction!(map_values))?;
m.add_wrapped(wrap_pyfunction!(map_extract))?;
m.add_wrapped(wrap_pyfunction!(map_entries))?;

// Window Functions
m.add_wrapped(wrap_pyfunction!(lead))?;
m.add_wrapped(wrap_pyfunction!(lag))?;
Expand Down
158 changes: 158 additions & 0 deletions python/datafusion/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
"degrees",
"dense_rank",
"digest",
"element_at",
"empty",
"encode",
"ends_with",
Expand Down Expand Up @@ -200,6 +201,12 @@
"make_array",
"make_date",
"make_list",
"make_map",
"map",
"map_entries",
"map_extract",
"map_keys",
"map_values",
"max",
"md5",
"mean",
Expand Down Expand Up @@ -3338,6 +3345,157 @@ def empty(array: Expr) -> Expr:
return array_empty(array)


# map functions


def map(*args: Any) -> Expr:
"""Returns a map expression.

Supports three calling conventions:

- ``map({"a": 1, "b": 2})`` — from a Python dictionary.
- ``map([keys], [values])`` — from a list of keys and a list of
their associated values. Both lists must be the same length.
- ``map(k1, v1, k2, v2, ...)`` — from alternating keys and their
associated values.

Keys and values that are not already :py:class:`~datafusion.expr.Expr`
are automatically converted to literal expressions.

Examples:
From a dictionary:

>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1]})
>>> result = df.select(
... dfn.functions.map({"a": 1, "b": 2}).alias("m"))
>>> result.collect_column("m")[0].as_py()
[('a', 1), ('b', 2)]

From two lists:

>>> df = ctx.from_pydict({"key": ["x", "y"], "val": [10, 20]})
>>> df = df.select(
... dfn.functions.map(
... [dfn.col("key")], [dfn.col("val")]
... ).alias("m"))
>>> df.collect_column("m")[0].as_py()
[('x', 10)]

From alternating keys and values:

>>> df = ctx.from_pydict({"a": [1]})
>>> result = df.select(
... dfn.functions.map("x", 1, "y", 2).alias("m"))
>>> result.collect_column("m")[0].as_py()
[('x', 1), ('y', 2)]
"""
if len(args) == 1 and isinstance(args[0], dict):
key_list = list(args[0].keys())
value_list = list(args[0].values())
elif (
len(args) == 2 # noqa: PLR2004
and isinstance(args[0], list)
and isinstance(args[1], list)
):
key_list = args[0]
value_list = args[1]
elif len(args) >= 2 and len(args) % 2 == 0: # noqa: PLR2004
Comment on lines +3396 to +3403
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When called as map([keys], [values]), the docstring states both lists must be the same length, but the implementation doesn’t validate len(key_list) == len(value_list). If lengths differ, the error will be deferred to the Rust/DataFusion layer (likely with a less actionable message). Consider adding an explicit length check here and raising ValueError with a clear message.

Copilot uses AI. Check for mistakes.
key_list = list(args[0::2])
value_list = list(args[1::2])
else:
msg = "map expects a dict, two lists, or an even number of key-value arguments"
raise ValueError(msg)

key_exprs = [k if isinstance(k, Expr) else Expr.literal(k) for k in key_list]
val_exprs = [v if isinstance(v, Expr) else Expr.literal(v) for v in value_list]
return Expr(f.make_map([k.expr for k in key_exprs], [v.expr for v in val_exprs]))


def make_map(*args: Any) -> Expr:
"""Returns a map expression.

See Also:
This is an alias for :py:func:`map`.
"""
return map(*args)


def map_keys(map: Expr) -> Expr:
"""Returns a list of all keys in the map.

Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1]})
>>> df = df.select(
... dfn.functions.map({"x": 1, "y": 2}).alias("m"))
>>> result = df.select(
... dfn.functions.map_keys(dfn.col("m")).alias("keys"))
>>> result.collect_column("keys")[0].as_py()
['x', 'y']
"""
return Expr(f.map_keys(map.expr))


def map_values(map: Expr) -> Expr:
"""Returns a list of all values in the map.

Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1]})
>>> df = df.select(
... dfn.functions.map({"x": 1, "y": 2}).alias("m"))
>>> result = df.select(
... dfn.functions.map_values(dfn.col("m")).alias("vals"))
>>> result.collect_column("vals")[0].as_py()
[1, 2]
"""
return Expr(f.map_values(map.expr))


def map_extract(map: Expr, key: Expr) -> Expr:
"""Returns the value for the given key in the map, or an empty list if absent.

Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1]})
>>> df = df.select(
... dfn.functions.map({"x": 1, "y": 2}).alias("m"))
>>> result = df.select(
... dfn.functions.map_extract(
... dfn.col("m"), dfn.lit("x")
... ).alias("val"))
>>> result.collect_column("val")[0].as_py()
[1]
"""
return Expr(f.map_extract(map.expr, key.expr))


def map_entries(map: Expr) -> Expr:
"""Returns a list of all entries (key-value struct pairs) in the map.

Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1]})
>>> df = df.select(
... dfn.functions.map({"x": 1, "y": 2}).alias("m"))
>>> result = df.select(
... dfn.functions.map_entries(dfn.col("m")).alias("entries"))
>>> result.collect_column("entries")[0].as_py()
[{'key': 'x', 'value': 1}, {'key': 'y', 'value': 2}]
"""
return Expr(f.map_entries(map.expr))


def element_at(map: Expr, key: Expr) -> Expr:
"""Returns the value for the given key in the map, or an empty list if absent.

See Also:
This is an alias for :py:func:`map_extract`.
"""
Comment on lines +3491 to +3495
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue/PR context describes element_at as “access element in a map or array by key/index”, but this implementation is a direct alias for map_extract, which appears map-specific and returns a list of matches. If upstream DataFusion provides a dedicated element_at function (and/or supports array indexing), consider binding that directly so element_at matches upstream semantics rather than being limited to map_extract behavior.

Suggested change
"""Returns the value for the given key in the map, or an empty list if absent.
See Also:
This is an alias for :py:func:`map_extract`.
"""
"""Returns the element for the given key/index in a map or array.
On DataFusion versions that provide a dedicated ``element_at`` function,
this will use that function and thus match upstream semantics (including
support for both maps and arrays). On older versions, this falls back to
:py:func:`map_extract`, which returns a list of matching values for maps.
"""
# Prefer the upstream `element_at` implementation when available so that
# semantics (map/array support, scalar return) match DataFusion. Fall back
# to the existing `map_extract`-based behavior for compatibility with
# older backends that do not yet expose `element_at`.
element_at_fn = getattr(f, "element_at", None)
if element_at_fn is not None:
return Expr(element_at_fn(map.expr, key.expr))

Copilot uses AI. Check for mistakes.
return map_extract(map, key)


# aggregate functions
def approx_distinct(
expression: Expr,
Expand Down
148 changes: 148 additions & 0 deletions python/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,154 @@ def test_array_function_obj_tests(stmt, py_expr):
assert a == b


def test_map_from_dict():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

result = df.select(f.map({"x": 1, "y": 2}).alias("m")).collect()[0].column(0)
assert result[0].as_py() == [("x", 1), ("y", 2)]


def test_map_from_dict_with_expr_values():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

result = (
df.select(f.map({"x": literal(1), "y": literal(2)}).alias("m"))
.collect()[0]
.column(0)
)
assert result[0].as_py() == [("x", 1), ("y", 2)]


def test_map_from_two_lists():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays(
[
pa.array(["k1", "k2", "k3"]),
pa.array([10, 20, 30]),
],
names=["keys", "vals"],
)
df = ctx.create_dataframe([[batch]])

m = f.map([column("keys")], [column("vals")])
result = df.select(f.map_keys(m).alias("k")).collect()[0].column(0)
for i, expected in enumerate(["k1", "k2", "k3"]):
assert result[i].as_py() == [expected]

result = df.select(f.map_values(m).alias("v")).collect()[0].column(0)
for i, expected in enumerate([10, 20, 30]):
assert result[i].as_py() == [expected]


def test_map_from_variadic_pairs():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

result = df.select(f.map("x", 1, "y", 2).alias("m")).collect()[0].column(0)
assert result[0].as_py() == [("x", 1), ("y", 2)]


def test_map_variadic_with_exprs():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

result = (
df.select(f.map(literal("x"), literal(1), literal("y"), literal(2)).alias("m"))
.collect()[0]
.column(0)
)
assert result[0].as_py() == [("x", 1), ("y", 2)]


def test_map_odd_args_raises():
with pytest.raises(ValueError, match="map expects"):
f.map("x", 1, "y")


def test_make_map_is_alias():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

result = df.select(f.make_map({"x": 1, "y": 2}).alias("m")).collect()[0].column(0)
assert result[0].as_py() == [("x", 1), ("y", 2)]


def test_map_keys():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

m = f.map({"x": 1, "y": 2})
result = df.select(f.map_keys(m).alias("keys")).collect()[0].column(0)
assert result[0].as_py() == ["x", "y"]


def test_map_values():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

m = f.map({"x": 1, "y": 2})
result = df.select(f.map_values(m).alias("vals")).collect()[0].column(0)
assert result[0].as_py() == [1, 2]


def test_map_extract():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

m = f.map({"x": 1, "y": 2})
result = (
df.select(f.map_extract(m, literal("x")).alias("val")).collect()[0].column(0)
)
assert result[0].as_py() == [1]


def test_map_extract_missing_key():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

m = f.map({"x": 1})
result = (
df.select(f.map_extract(m, literal("z")).alias("val")).collect()[0].column(0)
)
assert result[0].as_py() == [None]
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_extract is documented as returning an “empty list if absent”, but the new test test_map_extract_missing_key asserts [None] for a missing key. These two behaviors are different (empty list vs list containing null). Please verify the actual DataFusion semantics and update either the docstring or the test expectation so they match.

Suggested change
assert result[0].as_py() == [None]
assert result[0].as_py() == []

Copilot uses AI. Check for mistakes.


def test_map_entries():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

m = f.map({"x": 1, "y": 2})
result = df.select(f.map_entries(m).alias("entries")).collect()[0].column(0)
assert result[0].as_py() == [
{"key": "x", "value": 1},
{"key": "y", "value": 2},
]


def test_element_at():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays([pa.array([1])], names=["a"])
df = ctx.create_dataframe([[batch]])

m = f.map({"a": 10, "b": 20})
result = (
df.select(f.element_at(m, literal("b")).alias("val")).collect()[0].column(0)
)
assert result[0].as_py() == [20]


@pytest.mark.parametrize(
("function", "expected_result"),
[
Expand Down
Loading