diff --git a/packages/bigframes/bigframes/__init__.py b/packages/bigframes/bigframes/__init__.py
index 29a27e4b6f90..7061300b5cc5 100644
--- a/packages/bigframes/bigframes/__init__.py
+++ b/packages/bigframes/bigframes/__init__.py
@@ -45,6 +45,7 @@
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
from bigframes.core.global_session import ( # noqa: E402
close_session,
+ execution_history,
get_global_session,
)
from bigframes.session import Session, connect # noqa: E402
@@ -69,6 +70,7 @@ def load_ipython_extension(ipython):
"BigQueryOptions",
"get_global_session",
"close_session",
+ "execution_history",
"enums",
"exceptions",
"connect",
diff --git a/packages/bigframes/bigframes/core/global_session.py b/packages/bigframes/bigframes/core/global_session.py
index ce3b16d041e3..472255894534 100644
--- a/packages/bigframes/bigframes/core/global_session.py
+++ b/packages/bigframes/bigframes/core/global_session.py
@@ -26,6 +26,8 @@
import bigframes.exceptions as bfe
if TYPE_CHECKING:
+ import pandas
+
import bigframes.session
_global_session: Optional[bigframes.session.Session] = None
@@ -124,6 +126,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)
+def execution_history() -> "pandas.DataFrame":
+ import pandas # noqa: F401
+
+ import bigframes.session
+
+ return with_default_session(bigframes.session.Session.execution_history)
+
+
class _GlobalSessionContext:
"""
Context manager for testing that sets global session.
diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py
index a6bb3041764c..9ea5ded46b22 100644
--- a/packages/bigframes/bigframes/session/__init__.py
+++ b/packages/bigframes/bigframes/session/__init__.py
@@ -109,6 +109,39 @@
logger = logging.getLogger(__name__)
+class _ExecutionHistory(pandas.DataFrame):
+ @property
+ def _constructor(self):
+ return _ExecutionHistory
+
+ def _repr_html_(self) -> str | None:
+ try:
+ import bigframes.formatting_helpers as formatter
+
+ if self.empty:
+ return "
No executions found.
"
+
+ cols = ["job_id", "status", "total_bytes_processed", "job_url"]
+ df_display = self[cols].copy()
+ df_display["total_bytes_processed"] = df_display[
+ "total_bytes_processed"
+ ].apply(formatter.get_formatted_bytes)
+
+ def format_url(url):
+ return f'Open Job' if url else ""
+
+ df_display["job_url"] = df_display["job_url"].apply(format_url)
+
+ # Rename job_id to query_id to match user expectations
+ df_display = df_display.rename(columns={"job_id": "query_id"})
+
+ compact_html = df_display.to_html(escape=False, index=False)
+
+ return compact_html
+ except Exception:
+ return super()._repr_html_() # type: ignore
+
+
@log_adapter.class_logger
class Session(
third_party_pandas_gbq.GBQIOMixin,
@@ -233,6 +266,7 @@ def __init__(
)
self._metrics = metrics.ExecutionMetrics()
+ self._publisher.subscribe(self._metrics.on_event)
self._function_session = bff_session.FunctionSession()
self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager(
self._clients_provider.bqclient,
@@ -371,6 +405,10 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis
+ def execution_history(self) -> pandas.DataFrame:
+ """Returns a list of underlying BigQuery executions initiated by BigFrames in the current session."""
+ return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
+
@property
def _allows_ambiguity(self) -> bool:
return self._allow_ambiguity
diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py
index b0a9e0a1ed31..c1c8f0eda409 100644
--- a/packages/bigframes/bigframes/session/loader.py
+++ b/packages/bigframes/bigframes/session/loader.py
@@ -52,6 +52,8 @@
import pandas
import pyarrow as pa
from google.cloud import bigquery_storage_v1
+from google.cloud.bigquery.job.load import LoadJob
+from google.cloud.bigquery.job.query import QueryJob
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
import bigframes._tools
@@ -605,6 +607,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
else:
job.result()
+ if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
+ self._metrics.count_job_stats(query_job=job)
+
@overload
def read_gbq_table( # type: ignore[overload-overlap]
self,
diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py
index 8d43a83d7309..cf34a620e099 100644
--- a/packages/bigframes/bigframes/session/metrics.py
+++ b/packages/bigframes/bigframes/session/metrics.py
@@ -15,16 +15,131 @@
from __future__ import annotations
import dataclasses
+import datetime
import os
-from typing import Optional, Tuple
+from typing import Any, Mapping, Optional, Tuple, Union
import google.cloud.bigquery as bigquery
-import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table
+from google.cloud.bigquery.job.load import LoadJob
+from google.cloud.bigquery.job.query import QueryJob
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
+@dataclasses.dataclass
+class JobMetadata:
+ job_id: Optional[str] = None
+ query_id: Optional[str] = None
+ location: Optional[str] = None
+ project: Optional[str] = None
+ creation_time: Optional[datetime.datetime] = None
+ start_time: Optional[datetime.datetime] = None
+ end_time: Optional[datetime.datetime] = None
+ duration_seconds: Optional[float] = None
+ status: Optional[str] = None
+ total_bytes_processed: Optional[int] = None
+ total_slot_ms: Optional[int] = None
+ job_type: Optional[str] = None
+ error_result: Optional[Mapping[str, Any]] = None
+ cached: Optional[bool] = None
+ job_url: Optional[str] = None
+ query: Optional[str] = None
+ destination_table: Optional[str] = None
+ source_uris: Optional[list[str]] = None
+ input_files: Optional[int] = None
+ input_bytes: Optional[int] = None
+ output_rows: Optional[int] = None
+ source_format: Optional[str] = None
+
+ @classmethod
+ def from_job(
+ cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
+ ) -> "JobMetadata":
+ query_text = getattr(query_job, "query", None)
+ if query_text and len(query_text) > 1024:
+ query_text = query_text[:1021] + "..."
+
+ job_id = getattr(query_job, "job_id", None)
+ job_url = None
+ if job_id:
+ job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"
+
+ metadata = cls(
+ job_id=query_job.job_id,
+ location=query_job.location,
+ project=query_job.project,
+ creation_time=query_job.created,
+ start_time=query_job.started,
+ end_time=query_job.ended,
+ duration_seconds=exec_seconds,
+ status=query_job.state,
+ job_type=query_job.job_type,
+ error_result=query_job.error_result,
+ query=query_text,
+ job_url=job_url,
+ )
+ if isinstance(query_job, QueryJob):
+ metadata.cached = getattr(query_job, "cache_hit", None)
+ metadata.destination_table = (
+ str(query_job.destination) if query_job.destination else None
+ )
+ metadata.total_bytes_processed = getattr(
+ query_job, "total_bytes_processed", None
+ )
+ metadata.total_slot_ms = getattr(query_job, "slot_millis", None)
+ elif isinstance(query_job, LoadJob):
+ metadata.output_rows = getattr(query_job, "output_rows", None)
+ metadata.input_files = getattr(query_job, "input_files", None)
+ metadata.input_bytes = getattr(query_job, "input_bytes", None)
+ metadata.destination_table = (
+ str(query_job.destination)
+ if getattr(query_job, "destination", None)
+ else None
+ )
+ if getattr(query_job, "source_uris", None):
+ metadata.source_uris = list(query_job.source_uris)
+ if query_job.configuration and hasattr(
+ query_job.configuration, "source_format"
+ ):
+ metadata.source_format = query_job.configuration.source_format
+
+ return metadata
+
+ @classmethod
+ def from_row_iterator(
+ cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
+ ) -> "JobMetadata":
+ query_text = getattr(row_iterator, "query", None)
+ if query_text and len(query_text) > 1024:
+ query_text = query_text[:1021] + "..."
+
+ job_id = getattr(row_iterator, "job_id", None)
+ job_url = None
+ if job_id:
+ project = getattr(row_iterator, "project", "")
+ location = getattr(row_iterator, "location", "")
+ job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"
+
+ return cls(
+ job_id=job_id,
+ query_id=getattr(row_iterator, "query_id", None),
+ location=getattr(row_iterator, "location", None),
+ project=getattr(row_iterator, "project", None),
+ creation_time=getattr(row_iterator, "created", None),
+ start_time=getattr(row_iterator, "started", None),
+ end_time=getattr(row_iterator, "ended", None),
+ duration_seconds=exec_seconds,
+ status="DONE",
+ total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
+ total_slot_ms=getattr(row_iterator, "slot_millis", None),
+ job_type="query",
+ cached=getattr(row_iterator, "cache_hit", None),
+ query=query_text,
+ job_url=job_url,
+ )
+
+
@dataclasses.dataclass
class ExecutionMetrics:
execution_count: int = 0
@@ -32,10 +147,11 @@ class ExecutionMetrics:
bytes_processed: int = 0
execution_secs: float = 0
query_char_count: int = 0
+ jobs: list[JobMetadata] = dataclasses.field(default_factory=list)
def count_job_stats(
self,
- query_job: Optional[bq_job.QueryJob] = None,
+ query_job: Optional[Union[QueryJob, LoadJob]] = None,
row_iterator: Optional[bq_table.RowIterator] = None,
):
if query_job is None:
@@ -57,21 +173,64 @@ def count_job_stats(
self.slot_millis += slot_millis
self.execution_secs += exec_seconds
- elif query_job.configuration.dry_run:
- query_char_count = len(query_job.query)
+ self.jobs.append(
+ JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
+ )
+
+ elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
+ query_char_count = len(getattr(query_job, "query", ""))
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
slot_millis = 0
exec_seconds = 0.0
- elif (stats := get_performance_stats(query_job)) is not None:
- query_char_count, bytes_processed, slot_millis, exec_seconds = stats
+ elif isinstance(query_job, bigquery.QueryJob):
+ if (stats := get_performance_stats(query_job)) is not None:
+ query_char_count, bytes_processed, slot_millis, exec_seconds = stats
+ self.execution_count += 1
+ self.query_char_count += query_char_count or 0
+ self.bytes_processed += bytes_processed or 0
+ self.slot_millis += slot_millis or 0
+ self.execution_secs += exec_seconds or 0
+
+ metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
+ metadata.total_bytes_processed = bytes_processed
+ metadata.total_slot_ms = slot_millis
+ self.jobs.append(metadata)
+
+ else:
self.execution_count += 1
- self.query_char_count += query_char_count or 0
- self.bytes_processed += bytes_processed or 0
- self.slot_millis += slot_millis or 0
- self.execution_secs += exec_seconds or 0
+ duration = (
+ (query_job.ended - query_job.created).total_seconds()
+ if query_job.ended and query_job.created
+ else None
+ )
+ self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))
+
+ # For pytest runs only, log information about the query job
+ # to a file in order to create a performance report.
+ if (
+ isinstance(query_job, bigquery.QueryJob)
+ and not query_job.configuration.dry_run
+ ):
+ stats = get_performance_stats(query_job)
+ if stats:
+ write_stats_to_disk(
+ query_char_count=stats[0],
+ bytes_processed=stats[1],
+ slot_millis=stats[2],
+ exec_seconds=stats[3],
+ )
+ elif row_iterator is not None:
+ bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
+ query_char_count = len(getattr(row_iterator, "query", "") or "")
+ slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
+ created = getattr(row_iterator, "created", None)
+ ended = getattr(row_iterator, "ended", None)
+ exec_seconds = (
+ (ended - created).total_seconds() if created and ended else 0.0
+ )
write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
@@ -79,19 +238,24 @@ def count_job_stats(
exec_seconds=exec_seconds,
)
- else:
- # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
- bytes_processed = 0
- query_char_count = 0
- slot_millis = 0
- exec_seconds = 0
+ def on_event(self, event: Any):
+ try:
+ import bigframes.core.events
+ from bigframes.session.executor import LocalExecuteResult
+ except ImportError:
+ return
- write_stats_to_disk(
- query_char_count=query_char_count,
- bytes_processed=bytes_processed,
- slot_millis=slot_millis,
- exec_seconds=exec_seconds,
- )
+ if isinstance(event, bigframes.core.events.ExecutionFinished):
+ if event.result and isinstance(event.result, LocalExecuteResult):
+ self.execution_count += 1
+ bytes_processed = event.result.total_bytes_processed or 0
+
+ metadata = JobMetadata(
+ job_type="polars",
+ status="DONE",
+ total_bytes_processed=bytes_processed,
+ )
+ self.jobs.append(metadata)
def get_performance_stats(
diff --git a/packages/bigframes/tests/system/small/test_polars_execution.py b/packages/bigframes/tests/system/small/test_polars_execution.py
index 1b58dc9d12b1..0b986becd3e6 100644
--- a/packages/bigframes/tests/system/small/test_polars_execution.py
+++ b/packages/bigframes/tests/system/small/test_polars_execution.py
@@ -72,3 +72,26 @@ def test_polar_execution_unsupported_sql_fallback(
# geo fns not supported by polar engine yet, so falls back to bq execution
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert math.isclose(bf_result.geo_area.sum(), 70.52332050, rel_tol=0.00001)
+
+
+def test_polars_execution_history(session_w_polars):
+ import pandas as pd
+
+ # Create a small local DataFrame
+ pdf = pd.DataFrame({"col_a": [1, 2, 3], "col_b": ["x", "y", "z"]})
+
+ # Read simple local data
+ df = session_w_polars.read_pandas(pdf)
+
+ # Trigger execution
+ _ = df.to_pandas()
+
+ # Verify the execution history captured the local job
+ history = session_w_polars.execution_history()
+
+ # Verify we have at least one job and logged as polars
+ assert len(history) > 0
+ last_job = history.iloc[-1]
+
+ assert last_job["job_type"] == "polars"
+ assert last_job["status"] == "DONE"
diff --git a/packages/bigframes/tests/unit/session/test_metrics.py b/packages/bigframes/tests/unit/session/test_metrics.py
index 7c2f01c5b98f..296c6e96c5af 100644
--- a/packages/bigframes/tests/unit/session/test_metrics.py
+++ b/packages/bigframes/tests/unit/session/test_metrics.py
@@ -245,3 +245,21 @@ def test_write_stats_to_disk_no_env_var(tmp_path, monkeypatch):
exec_seconds=1.23,
)
assert len(list(tmp_path.iterdir())) == 0
+
+
+def test_on_event_with_local_execute_result():
+ import bigframes.core.events
+ from bigframes.session.executor import LocalExecuteResult
+
+ local_result = unittest.mock.create_autospec(LocalExecuteResult, instance=True)
+ local_result.total_bytes_processed = 1024
+
+ event = bigframes.core.events.ExecutionFinished(result=local_result)
+ execution_metrics = metrics.ExecutionMetrics()
+ execution_metrics.on_event(event)
+
+ assert execution_metrics.execution_count == 1
+ assert len(execution_metrics.jobs) == 1
+ assert execution_metrics.jobs[0].job_type == "polars"
+ assert execution_metrics.jobs[0].status == "DONE"
+ assert execution_metrics.jobs[0].total_bytes_processed == 1024
diff --git a/packages/bigframes/tests/unit/test_col.py b/packages/bigframes/tests/unit/test_col.py
index cf9aa5c4b86a..9f5bbca5d9bc 100644
--- a/packages/bigframes/tests/unit/test_col.py
+++ b/packages/bigframes/tests/unit/test_col.py
@@ -16,13 +16,13 @@
import pathlib
from typing import Generator
+import numpy as np
import pandas as pd
import pytest
import bigframes
import bigframes.pandas as bpd
from bigframes.testing.utils import assert_frame_equal, convert_pandas_dtypes
-import numpy as np
pytest.importorskip("polars")
pytest.importorskip("pandas", minversion="3.0.0")
diff --git a/packages/bigframes/tests/unit/test_pandas.py b/packages/bigframes/tests/unit/test_pandas.py
index e1e713697db1..a79d7a059bb9 100644
--- a/packages/bigframes/tests/unit/test_pandas.py
+++ b/packages/bigframes/tests/unit/test_pandas.py
@@ -37,6 +37,8 @@ def all_session_methods():
session_attributes.remove("close")
# streaming isn't in pandas
session_attributes.remove("read_gbq_table_streaming")
+ # execution_history is in base namespace, not pandas
+ session_attributes.remove("execution_history")
for attribute in sorted(session_attributes):
session_method = getattr(bigframes.session.Session, attribute)