Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/bigframes/bigframes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
from bigframes.core.global_session import ( # noqa: E402
close_session,
execution_history,
get_global_session,
)
from bigframes.session import Session, connect # noqa: E402
Expand All @@ -69,6 +70,7 @@ def load_ipython_extension(ipython):
"BigQueryOptions",
"get_global_session",
"close_session",
"execution_history",
"enums",
"exceptions",
"connect",
Expand Down
10 changes: 10 additions & 0 deletions packages/bigframes/bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import bigframes.exceptions as bfe

if TYPE_CHECKING:
import pandas

import bigframes.session

_global_session: Optional[bigframes.session.Session] = None
Expand Down Expand Up @@ -124,6 +126,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)


def execution_history() -> "pandas.DataFrame":
import pandas # noqa: F401

import bigframes.session

return with_default_session(bigframes.session.Session.execution_history)


class _GlobalSessionContext:
"""
Context manager for testing that sets global session.
Expand Down
38 changes: 38 additions & 0 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,39 @@
logger = logging.getLogger(__name__)


class _ExecutionHistory(pandas.DataFrame):
@property
def _constructor(self):
return _ExecutionHistory

def _repr_html_(self) -> str | None:
try:
import bigframes.formatting_helpers as formatter

if self.empty:
return "<div>No executions found.</div>"

cols = ["job_id", "status", "total_bytes_processed", "job_url"]
df_display = self[cols].copy()
df_display["total_bytes_processed"] = df_display[
"total_bytes_processed"
].apply(formatter.get_formatted_bytes)

def format_url(url):
return f'<a target="_blank" href="{url}">Open Job</a>' if url else ""

df_display["job_url"] = df_display["job_url"].apply(format_url)

# Rename job_id to query_id to match user expectations
df_display = df_display.rename(columns={"job_id": "query_id"})

compact_html = df_display.to_html(escape=False, index=False)

return compact_html
except Exception:
return super()._repr_html_() # type: ignore
Comment on lines +141 to +142
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a broad except Exception: can hide bugs in the HTML representation logic. It's better to catch specific exceptions or at least log the caught exception to aid in debugging if the formatting fails. This will provide visibility into any issues without breaking the user's interactive session.

Suggested change
except Exception:
return super()._repr_html_() # type: ignore
except Exception as e:
logger.warning("Failed to generate custom HTML representation for execution history: %s", e)
return super()._repr_html_() # type: ignore



@log_adapter.class_logger
class Session(
third_party_pandas_gbq.GBQIOMixin,
Expand Down Expand Up @@ -233,6 +266,7 @@ def __init__(
)

self._metrics = metrics.ExecutionMetrics()
self._publisher.subscribe(self._metrics.on_event)
self._function_session = bff_session.FunctionSession()
self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager(
self._clients_provider.bqclient,
Expand Down Expand Up @@ -371,6 +405,10 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis

def execution_history(self) -> pandas.DataFrame:
"""Returns a list of underlying BigQuery executions initiated by BigFrames in the current session."""
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])

@property
def _allows_ambiguity(self) -> bool:
return self._allow_ambiguity
Expand Down
5 changes: 5 additions & 0 deletions packages/bigframes/bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import pandas
import pyarrow as pa
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.query import QueryJob
from google.cloud.bigquery_storage_v1 import types as bq_storage_types

import bigframes._tools
Expand Down Expand Up @@ -605,6 +607,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
else:
job.result()

if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
self._metrics.count_job_stats(query_job=job)

@overload
def read_gbq_table( # type: ignore[overload-overlap]
self,
Expand Down
210 changes: 187 additions & 23 deletions packages/bigframes/bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,143 @@
from __future__ import annotations

import dataclasses
import datetime
import os
from typing import Optional, Tuple
from typing import Any, Mapping, Optional, Tuple, Union

import google.cloud.bigquery as bigquery
import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.query import QueryJob

LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"


@dataclasses.dataclass
class JobMetadata:
job_id: Optional[str] = None
query_id: Optional[str] = None
location: Optional[str] = None
project: Optional[str] = None
creation_time: Optional[datetime.datetime] = None
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
duration_seconds: Optional[float] = None
status: Optional[str] = None
total_bytes_processed: Optional[int] = None
total_slot_ms: Optional[int] = None
job_type: Optional[str] = None
error_result: Optional[Mapping[str, Any]] = None
cached: Optional[bool] = None
job_url: Optional[str] = None
query: Optional[str] = None
destination_table: Optional[str] = None
source_uris: Optional[list[str]] = None
input_files: Optional[int] = None
input_bytes: Optional[int] = None
output_rows: Optional[int] = None
source_format: Optional[str] = None

@classmethod
def from_job(
cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
) -> "JobMetadata":
query_text = getattr(query_job, "query", None)
if query_text and len(query_text) > 1024:
query_text = query_text[:1021] + "..."

job_id = getattr(query_job, "job_id", None)
job_url = None
if job_id:
job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"

metadata = cls(
job_id=query_job.job_id,
location=query_job.location,
project=query_job.project,
creation_time=query_job.created,
start_time=query_job.started,
end_time=query_job.ended,
duration_seconds=exec_seconds,
status=query_job.state,
job_type=query_job.job_type,
error_result=query_job.error_result,
query=query_text,
job_url=job_url,
)
if isinstance(query_job, QueryJob):
metadata.cached = getattr(query_job, "cache_hit", None)
metadata.destination_table = (
str(query_job.destination) if query_job.destination else None
)
metadata.total_bytes_processed = getattr(
query_job, "total_bytes_processed", None
)
metadata.total_slot_ms = getattr(query_job, "slot_millis", None)
elif isinstance(query_job, LoadJob):
metadata.output_rows = getattr(query_job, "output_rows", None)
metadata.input_files = getattr(query_job, "input_files", None)
metadata.input_bytes = getattr(query_job, "input_bytes", None)
metadata.destination_table = (
str(query_job.destination)
if getattr(query_job, "destination", None)
else None
)
if getattr(query_job, "source_uris", None):
metadata.source_uris = list(query_job.source_uris)
if query_job.configuration and hasattr(
query_job.configuration, "source_format"
):
metadata.source_format = query_job.configuration.source_format

return metadata

@classmethod
def from_row_iterator(
cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
) -> "JobMetadata":
query_text = getattr(row_iterator, "query", None)
if query_text and len(query_text) > 1024:
query_text = query_text[:1021] + "..."

job_id = getattr(row_iterator, "job_id", None)
job_url = None
if job_id:
project = getattr(row_iterator, "project", "")
location = getattr(row_iterator, "location", "")
job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"

return cls(
job_id=job_id,
query_id=getattr(row_iterator, "query_id", None),
location=getattr(row_iterator, "location", None),
project=getattr(row_iterator, "project", None),
creation_time=getattr(row_iterator, "created", None),
start_time=getattr(row_iterator, "started", None),
end_time=getattr(row_iterator, "ended", None),
duration_seconds=exec_seconds,
status="DONE",
total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
total_slot_ms=getattr(row_iterator, "slot_millis", None),
job_type="query",
cached=getattr(row_iterator, "cache_hit", None),
query=query_text,
job_url=job_url,
)


@dataclasses.dataclass
class ExecutionMetrics:
execution_count: int = 0
slot_millis: int = 0
bytes_processed: int = 0
execution_secs: float = 0
query_char_count: int = 0
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)

def count_job_stats(
self,
query_job: Optional[bq_job.QueryJob] = None,
query_job: Optional[Union[QueryJob, LoadJob]] = None,
row_iterator: Optional[bq_table.RowIterator] = None,
):
if query_job is None:
Expand All @@ -57,41 +173,89 @@ def count_job_stats(
self.slot_millis += slot_millis
self.execution_secs += exec_seconds

elif query_job.configuration.dry_run:
query_char_count = len(query_job.query)
self.jobs.append(
JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
)

elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
query_char_count = len(getattr(query_job, "query", ""))

# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
slot_millis = 0
exec_seconds = 0.0

elif (stats := get_performance_stats(query_job)) is not None:
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
elif isinstance(query_job, bigquery.QueryJob):
if (stats := get_performance_stats(query_job)) is not None:
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
self.execution_count += 1
self.query_char_count += query_char_count or 0
self.bytes_processed += bytes_processed or 0
self.slot_millis += slot_millis or 0
self.execution_secs += exec_seconds or 0

metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
metadata.total_bytes_processed = bytes_processed
metadata.total_slot_ms = slot_millis
Comment on lines +198 to +199
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These assignments are redundant because JobMetadata.from_job already populates total_bytes_processed and total_slot_ms from the query_job object when the job is a QueryJob. The values from get_performance_stats are sourced from the same attributes on the job object. Removing these lines will make the code cleaner.

self.jobs.append(metadata)

else:
self.execution_count += 1
self.query_char_count += query_char_count or 0
self.bytes_processed += bytes_processed or 0
self.slot_millis += slot_millis or 0
self.execution_secs += exec_seconds or 0
duration = (
(query_job.ended - query_job.created).total_seconds()
if query_job.ended and query_job.created
else None
)
self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))

# For pytest runs only, log information about the query job
# to a file in order to create a performance report.
if (
isinstance(query_job, bigquery.QueryJob)
and not query_job.configuration.dry_run
):
stats = get_performance_stats(query_job)
if stats:
write_stats_to_disk(
query_char_count=stats[0],
bytes_processed=stats[1],
slot_millis=stats[2],
exec_seconds=stats[3],
)
elif row_iterator is not None:
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
query_char_count = len(getattr(row_iterator, "query", "") or "")
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
created = getattr(row_iterator, "created", None)
ended = getattr(row_iterator, "ended", None)
exec_seconds = (
(ended - created).total_seconds() if created and ended else 0.0
)
write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)

else:
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
query_char_count = 0
slot_millis = 0
exec_seconds = 0
def on_event(self, event: Any):
try:
import bigframes.core.events
from bigframes.session.executor import LocalExecuteResult
except ImportError:
return

write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)
if isinstance(event, bigframes.core.events.ExecutionFinished):
if event.result and isinstance(event.result, LocalExecuteResult):
self.execution_count += 1
bytes_processed = event.result.total_bytes_processed or 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The execution_count is being incremented for local Polars executions, but bytes_processed is not. For consistency with how other job types are handled in count_job_stats, self.bytes_processed should also be updated. This ensures that metrics like session.bytes_processed_sum are comprehensive. Note that the docstring for bytes_processed_sum might need to be updated in a separate change to reflect that it includes more than just BigQuery jobs.

Suggested change
bytes_processed = event.result.total_bytes_processed or 0
bytes_processed = event.result.total_bytes_processed or 0
self.bytes_processed += bytes_processed


metadata = JobMetadata(
job_type="polars",
status="DONE",
total_bytes_processed=bytes_processed,
)
self.jobs.append(metadata)


def get_performance_stats(
Expand Down
Loading
Loading