Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def project_window_op(
for column in inputs:
clauses.append((column.isnull(), ibis_types.null()))
if window_spec.min_periods and len(inputs) > 0:
if expression.op.skips_nulls:
if not expression.op.nulls_count_for_min_values:
# Most operations do not count NULL values towards min_periods
per_col_does_count = (column.notnull() for column in inputs)
# All inputs must be non-null for observation to count
Expand Down
42 changes: 42 additions & 0 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,48 @@ def kurt(

kurtosis = kurt

@validations.requires_ordering()
def first(self, numeric_only: bool = False, min_count: int = -1) -> df.DataFrame:
window_spec = window_specs.unbound(
grouping_keys=tuple(self._by_col_ids),
min_periods=min_count if min_count >= 0 else 0,
)
to_use, index = self._aggregated_columns(numeric_only)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: "to_use" -> "target_cols"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

block, firsts_ids = self._block.multi_apply_window_op(
to_use,
agg_ops.FirstNonNullOp(),
window_spec=window_spec,
)
block, _ = block.aggregate(
self._by_col_ids,
tuple(
aggs.agg(firsts_id, agg_ops.AnyValueOp()) for firsts_id in firsts_ids
),
dropna=self._dropna,
column_labels=index,
)
return df.DataFrame(block)

@validations.requires_ordering()
def last(self, numeric_only: bool = False, min_count: int = -1) -> df.DataFrame:
window_spec = window_specs.unbound(
grouping_keys=tuple(self._by_col_ids),
min_periods=min_count if min_count >= 0 else 0,
)
to_use, index = self._aggregated_columns(numeric_only)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: "to_use" -> "target_cols"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

block, lasts_ids = self._block.multi_apply_window_op(
to_use,
agg_ops.LastNonNullOp(),
window_spec=window_spec,
)
block, _ = block.aggregate(
self._by_col_ids,
tuple(aggs.agg(lasts_id, agg_ops.AnyValueOp()) for lasts_id in lasts_ids),
dropna=self._dropna,
column_labels=index,
)
return df.DataFrame(block)

def all(self) -> df.DataFrame:
return self._aggregate_all(agg_ops.all_op)

Expand Down
51 changes: 50 additions & 1 deletion bigframes/core/groupby/series_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import bigframes.core.window as windows
import bigframes.core.window_spec as window_specs
import bigframes.dataframe as df
import bigframes.dtypes
import bigframes.operations.aggregations as agg_ops
import bigframes.series as series

Expand Down Expand Up @@ -162,6 +163,54 @@ def kurt(self, *args, **kwargs) -> series.Series:

kurtosis = kurt

@validations.requires_ordering()
def first(self, numeric_only: bool = False, min_count: int = -1) -> series.Series:
if numeric_only and not bigframes.dtypes.is_numeric(
self._block.expr.get_column_type(self._value_column)
):
raise TypeError(
f"Cannot use 'numeric_only' with non-numeric column {self._value_name}."
)
window_spec = window_specs.unbound(
grouping_keys=tuple(self._by_col_ids),
min_periods=min_count if min_count >= 0 else 0,
)
block, firsts_id = self._block.apply_window_op(
self._value_column,
agg_ops.FirstNonNullOp(),
window_spec=window_spec,
)
block, _ = block.aggregate(
self._by_col_ids,
(aggs.agg(firsts_id, agg_ops.AnyValueOp()),),
dropna=self._dropna,
)
return series.Series(block.with_column_labels([self._value_name]))

@validations.requires_ordering()
def last(self, numeric_only: bool = False, min_count: int = -1) -> series.Series:
if numeric_only and not bigframes.dtypes.is_numeric(
self._block.expr.get_column_type(self._value_column)
):
raise TypeError(
f"Cannot use 'numeric_only' with non-numeric column {self._value_name}."
)
window_spec = window_specs.unbound(
grouping_keys=tuple(self._by_col_ids),
min_periods=min_count if min_count >= 0 else 0,
)
block, firsts_id = self._block.apply_window_op(
self._value_column,
agg_ops.LastNonNullOp(),
window_spec=window_spec,
)
block, _ = block.aggregate(
self._by_col_ids,
(aggs.agg(firsts_id, agg_ops.AnyValueOp()),),
dropna=self._dropna,
)
return series.Series(block.with_column_labels([self._value_name]))

def prod(self, *args) -> series.Series:
return self._aggregate(agg_ops.product_op)

Expand Down Expand Up @@ -314,7 +363,7 @@ def _apply_window_op(
discard_name=False,
window: typing.Optional[window_specs.WindowSpec] = None,
never_skip_nulls: bool = False,
):
) -> series.Series:
"""Apply window op to groupby. Defaults to grouped cumulative window."""
window_spec = window or window_specs.cumulative_rows(
grouping_keys=tuple(self._by_col_ids)
Expand Down
13 changes: 13 additions & 0 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def skips_nulls(self):
"""Whether the window op skips null rows."""
return True

@property
def nulls_count_for_min_values(self) -> bool:
"""Whether null values count for min_values."""
return not self.skips_nulls

@property
def implicitly_inherits_order(self):
"""
Expand Down Expand Up @@ -480,6 +485,10 @@ class FirstNonNullOp(UnaryWindowOp):
def skips_nulls(self):
return False

@property
def nulls_count_for_min_values(self) -> bool:
return False


@dataclasses.dataclass(frozen=True)
class LastOp(UnaryWindowOp):
Expand All @@ -492,6 +501,10 @@ class LastNonNullOp(UnaryWindowOp):
def skips_nulls(self):
return False

@property
def nulls_count_for_min_values(self) -> bool:
return False


@dataclasses.dataclass(frozen=True)
class ShiftOp(UnaryWindowOp):
Expand Down
98 changes: 98 additions & 0 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,3 +768,101 @@ def test_series_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q):
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("numeric_only", "min_count"),
[
(True, 2),
(False, -1),
],
)
def test_series_groupby_first(
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
):
bf_result = (
scalars_df_index.groupby("string_col")["int64_col"].first(
numeric_only=numeric_only, min_count=min_count
)
).to_pandas()
pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].first(
numeric_only=numeric_only, min_count=min_count
)
pd.testing.assert_series_equal(
pd_result,
bf_result,
)


@pytest.mark.parametrize(
("numeric_only", "min_count"),
[
(False, 4),
(True, 0),
],
)
def test_series_groupby_last(
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
):
bf_result = (
scalars_df_index.groupby("string_col")["int64_col"].last(
numeric_only=numeric_only, min_count=min_count
)
).to_pandas()
pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].last(
numeric_only=numeric_only, min_count=min_count
)
pd.testing.assert_series_equal(pd_result, bf_result)


@pytest.mark.parametrize(
("numeric_only", "min_count"),
[
(False, 4),
(True, 0),
],
)
def test_dataframe_groupby_first(
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
):
# min_count seems to not work properly on older pandas
pytest.importorskip("pandas", minversion="2.0.0")
# bytes, dates not handling min_count properly in pandas
bf_result = (
scalars_df_index.drop(columns=["bytes_col", "date_col"])
.groupby(scalars_df_index.int64_col % 2)
.first(numeric_only=numeric_only, min_count=min_count)
).to_pandas()
pd_result = (
scalars_pandas_df_index.drop(columns=["bytes_col", "date_col"])
.groupby(scalars_pandas_df_index.int64_col % 2)
.first(numeric_only=numeric_only, min_count=min_count)
)
pd.testing.assert_frame_equal(
pd_result,
bf_result,
)


@pytest.mark.parametrize(
("numeric_only", "min_count"),
[
(True, 2),
(False, -1),
],
)
def test_dataframe_groupby_last(
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
):
bf_result = (
scalars_df_index.groupby(scalars_df_index.int64_col % 2).last(
numeric_only=numeric_only, min_count=min_count
)
).to_pandas()
pd_result = scalars_pandas_df_index.groupby(
scalars_pandas_df_index.int64_col % 2
).last(numeric_only=numeric_only, min_count=min_count)
pd.testing.assert_frame_equal(
pd_result,
bf_result,
)
74 changes: 74 additions & 0 deletions third_party/bigframes_vendored/pandas/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,80 @@ def kurtosis(
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def first(self, numeric_only: bool = False, min_count: int = -1):
"""
Compute the first entry of each column within each group.

Defaults to skipping NA elements.

**Examples:**
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> df = bpd.DataFrame(dict(A=[1, 1, 3], B=[None, 5, 6], C=[1, 2, 3]))
>>> df.groupby("A").first()
B C
A
1 5.0 1
3 6.0 3
<BLANKLINE>
[2 rows x 2 columns]

>>> df.groupby("A").first(min_count=2)
B C
A
1 <NA> 1
3 <NA> <NA>
<BLANKLINE>
[2 rows x 2 columns]

Args:
numeric_only (bool, default False):
Include only float, int, boolean columns. If None, will attempt to use
everything, then use only numeric data.
min_count (int, default -1):
The required number of valid values to perform the operation. If fewer
than ``min_count`` valid values are present the result will be NA.

Returns:
bigframes.pandas.DataFrame or bigframes.pandas.Series:
First of values within each group.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def last(self, numeric_only: bool = False, min_count: int = -1):
"""
Compute the last entry of each column within each group.

Defaults to skipping NA elements.

**Examples:**
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> df = bpd.DataFrame(dict(A=[1, 1, 3], B=[5, None, 6], C=[1, 2, 3]))
>>> df.groupby("A").last()
B C
A
1 5.0 2
3 6.0 3
<BLANKLINE>
[2 rows x 2 columns]

Args:
numeric_only (bool, default False):
Include only float, int, boolean columns. If None, will attempt to use
everything, then use only numeric data.
min_count (int, default -1):
The required number of valid values to perform the operation. If fewer
than ``min_count`` valid values are present the result will be NA.

Returns:
bigframes.pandas.DataFrame or bigframes.pandas.Series:
Last of values within each group.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def sum(
self,
numeric_only: bool = False,
Expand Down