From 944924956a5a8ed01db1086bcbff5e42b8066101 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 13 May 2026 15:40:34 +0000 Subject: [PATCH 1/5] Add test --- tests/unit/common/utils/proto_utils_test.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/unit/common/utils/proto_utils_test.py b/tests/unit/common/utils/proto_utils_test.py index 1e8dd54a9..3017835f9 100644 --- a/tests/unit/common/utils/proto_utils_test.py +++ b/tests/unit/common/utils/proto_utils_test.py @@ -69,5 +69,21 @@ def test_can_read_gbml_config_from_yaml(self): ) + def test_read_proto_from_yaml_raises_typeerror_when_root_is_not_a_mapping(self): + list_yaml = "- a\n- b\n- c\n" + tmp_file = NamedTemporaryFile(delete=False) + tmp_file.write(list_yaml.encode()) + tmp_file.close() + try: + with self.assertRaises(TypeError) as ctx: + self.proto_utils.read_proto_from_yaml( + uri=LocalUri(tmp_file.name), + proto_cls=gbml_config_pb2.GbmlConfig, + ) + self.assertIn("expected a mapping at the YAML root", str(ctx.exception)) + finally: + os.remove(tmp_file.name) + + if __name__ == "__main__": absltest.main() From 1551a551ca9c866402d63b0f3d19560d67244ff7 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 13 May 2026 15:50:57 +0000 Subject: [PATCH 2/5] fix(proto_utils): narrow ParseDict js_dict to dict[str, Any] types-protobuf v7 tightened ParseDict's js_dict parameter from an Any-compatible alias to dict[str, Any]. Validate that the YAML root is a mapping and cast before passing to ParseDict so mypy is clean under the new stubs. --- gigl/common/utils/proto_utils.py | 9 +++++++-- tests/unit/common/utils/proto_utils_test.py | 4 +--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/gigl/common/utils/proto_utils.py b/gigl/common/utils/proto_utils.py index 56b78b312..8cd09dc28 100644 --- a/gigl/common/utils/proto_utils.py +++ b/gigl/common/utils/proto_utils.py @@ -1,5 +1,5 @@ from tempfile import NamedTemporaryFile -from typing import Optional, Type, TypeVar +from typing import Any, Optional, Type, TypeVar, cast import yaml from google.protobuf import message @@ -28,7 +28,12 @@ def read_proto_from_yaml(self, uri: Uri, proto_cls: Type[T]) -> T: omega_conf_obj = OmegaConf.create(raw_data) tfh.close() obj_dict = OmegaConf.to_object(omega_conf_obj) - proto = ParseDict(js_dict=obj_dict, message=proto_cls()) + if not isinstance(obj_dict, dict): + raise TypeError( + f"ProtoUtils.read_proto_from_yaml expected a mapping at the YAML root for " + f"{uri}, got {type(obj_dict).__name__}." + ) + proto = ParseDict(js_dict=cast(dict[str, Any], obj_dict), message=proto_cls()) return proto def read_proto_from_binary(self, uri: Uri, proto_cls: Type[T]) -> T: diff --git a/tests/unit/common/utils/proto_utils_test.py b/tests/unit/common/utils/proto_utils_test.py index 3017835f9..793e0e7a6 100644 --- a/tests/unit/common/utils/proto_utils_test.py +++ b/tests/unit/common/utils/proto_utils_test.py @@ -68,19 +68,17 @@ def test_can_read_gbml_config_from_yaml(self): f"{expected_positive_label_date_range_start}:{expected_positive_label_date_range_end}", ) - def test_read_proto_from_yaml_raises_typeerror_when_root_is_not_a_mapping(self): list_yaml = "- a\n- b\n- c\n" tmp_file = NamedTemporaryFile(delete=False) tmp_file.write(list_yaml.encode()) tmp_file.close() try: - with self.assertRaises(TypeError) as ctx: + with self.assertRaises(TypeError): self.proto_utils.read_proto_from_yaml( uri=LocalUri(tmp_file.name), proto_cls=gbml_config_pb2.GbmlConfig, ) - self.assertIn("expected a mapping at the YAML root", str(ctx.exception)) finally: os.remove(tmp_file.name) From 2bac65587d9f86173c1daae04326f18145518439 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 13 May 2026 15:55:57 +0000 Subject: [PATCH 3/5] fix(kge config): narrow ParseDict js_dict to dict[str, Any] types-protobuf v7 tightened ParseDict's js_dict parameter. Validate that dataset.metadata resolves to a mapping and cast before handing it to ParseDict so mypy is clean under the new stubs. --- .../knowledge_graph_embedding/lib/config/__init__.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/gigl/experimental/knowledge_graph_embedding/lib/config/__init__.py b/gigl/experimental/knowledge_graph_embedding/lib/config/__init__.py index 094f1375b..9caf7fe38 100644 --- a/gigl/experimental/knowledge_graph_embedding/lib/config/__init__.py +++ b/gigl/experimental/knowledge_graph_embedding/lib/config/__init__.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import cast +from typing import Any, cast import torch from google.protobuf.json_format import ParseDict @@ -70,8 +70,15 @@ def from_omegaconf(config: DictConfig) -> HeterogeneousGraphSparseEmbeddingConfi assert graph_metadata is not None, "Graph metadata is required in the config." graph_metadata_dict = OmegaConf.to_container(graph_metadata, resolve=True) + if not isinstance(graph_metadata_dict, dict): + raise TypeError( + f"HeterogeneousGraphSparseEmbeddingConfig.from_omegaconf expected " + f"dataset.metadata to resolve to a mapping, got " + f"{type(graph_metadata_dict).__name__}." + ) pb = ParseDict( - js_dict=graph_metadata_dict, message=graph_schema_pb2.GraphMetadata() + js_dict=cast(dict[str, Any], graph_metadata_dict), + message=graph_schema_pb2.GraphMetadata(), ) graph_metadata = GraphMetadataPbWrapper(graph_metadata_pb=pb) From cbdb086ad0c49e55bdca07fb3077df398be4c2e8 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Thu, 14 May 2026 14:21:45 +0000 Subject: [PATCH 4/5] update type stub --- gigl/common/utils/proto_utils.py | 4 +- gigl/utils/dev/__init__.py | 5 + gigl/utils/dev/submit_smoke_job.py | 258 +++++++++++++++++++++++++++++ gigl/utils/dev/tb_smoke_main.py | 72 ++++++++ 4 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 gigl/utils/dev/__init__.py create mode 100644 gigl/utils/dev/submit_smoke_job.py create mode 100644 gigl/utils/dev/tb_smoke_main.py diff --git a/gigl/common/utils/proto_utils.py b/gigl/common/utils/proto_utils.py index 8cd09dc28..e61258756 100644 --- a/gigl/common/utils/proto_utils.py +++ b/gigl/common/utils/proto_utils.py @@ -1,5 +1,5 @@ from tempfile import NamedTemporaryFile -from typing import Any, Optional, Type, TypeVar, cast +from typing import Optional, Type, TypeVar, cast import yaml from google.protobuf import message @@ -33,7 +33,7 @@ def read_proto_from_yaml(self, uri: Uri, proto_cls: Type[T]) -> T: f"ProtoUtils.read_proto_from_yaml expected a mapping at the YAML root for " f"{uri}, got {type(obj_dict).__name__}." ) - proto = ParseDict(js_dict=cast(dict[str, Any], obj_dict), message=proto_cls()) + proto = ParseDict(js_dict=cast(dict, obj_dict), message=proto_cls()) return proto def read_proto_from_binary(self, uri: Uri, proto_cls: Type[T]) -> T: diff --git a/gigl/utils/dev/__init__.py b/gigl/utils/dev/__init__.py new file mode 100644 index 000000000..9c1bf25ab --- /dev/null +++ b/gigl/utils/dev/__init__.py @@ -0,0 +1,5 @@ +"""Developer utilities (smoke entrypoints, ad-hoc test helpers). + +Modules under this package are intended for short, ad-hoc test jobs and +developer iteration. They are NOT part of GiGL's stable public API. +""" diff --git a/gigl/utils/dev/submit_smoke_job.py b/gigl/utils/dev/submit_smoke_job.py new file mode 100644 index 000000000..e1eab73ef --- /dev/null +++ b/gigl/utils/dev/submit_smoke_job.py @@ -0,0 +1,258 @@ +"""Submit a tiny Vertex AI CustomJob that exercises GiGL's TensorBoard wiring. + +Goal: <2 min from "I changed launcher / writer code" to "I see whether TB +shows up." Bypasses ConfigPopulator and the full pipeline; uses the +production launcher path (``launch_single_pool_job``) so the same submit +logic runs as in real training. + +Required CLI flags: + --project GCP project (e.g. ``external-snap-ci-github-gigl``). + --region Vertex AI region (e.g. ``us-central1``). + --service-account Service account email used by the CustomJob. + --staging-bucket Regional GCS bucket Vertex stages artifacts under. + --tensorboard Full TensorBoard resource name + (``projects/.../locations/.../tensorboards/...``). + --experiment-name Vertex AI ``TensorboardExperiment`` name. The + tb_smoke_main entry point will pass this and the + --tensorboard value to ``TensorBoardWriter.create``. + --container-uri Container image to use. REQUIRED — must contain the + branch under test. + +Optional: + --job-name CustomJob display name. Defaults to a timestamped + ``gigl-tb-smoke-...``. + --dry-run Print the constructed submission parameters and + exit without submitting. + +Verification: + After the CustomJob completes the script polls the TensorBoard API + surface and asserts the user-named ``TensorboardExperiment`` exists + with at least one ``TensorboardRun`` containing time series data. + + The TB UI URL is printed for manual inspection. +""" + +from __future__ import annotations + +import argparse +import datetime +import re +import sys +import time + +from google.cloud import aiplatform + +from gigl.common import Uri +from gigl.common.logger import Logger +from gigl.src.common.constants.components import GiGLComponents +from gigl.src.common.types.pb_wrappers.gigl_resource_config import ( + GiglResourceConfigWrapper, +) +from gigl.src.common.vertex_ai_launcher import launch_single_pool_job +from snapchat.research.gbml import gigl_resource_config_pb2 + +logger = Logger() + +_TENSORBOARD_RESOURCE_NAME_PATTERN = re.compile( + r"^projects/(?P[^/]+)" + r"/locations/(?P[^/]+)" + r"/tensorboards/(?P[^/]+)$" +) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--project", required=True) + parser.add_argument("--region", required=True) + parser.add_argument("--service-account", required=True) + parser.add_argument( + "--staging-bucket", + required=True, + help="Regional GCS bucket (e.g. gs://gigl-cicd-temp).", + ) + parser.add_argument( + "--tensorboard", + required=True, + help="Full TensorBoard resource name.", + ) + parser.add_argument( + "--experiment-name", + required=True, + help=( + "TensorboardExperiment name. Passed to tb_smoke_main, which " + "creates the run under this experiment." + ), + ) + parser.add_argument( + "--container-uri", + required=True, + help=( + "Container image with the branch code. Required; pointing at a " + "released image would test stale code." + ), + ) + parser.add_argument("--job-name", default=None) + parser.add_argument("--dry-run", action="store_true") + return parser.parse_args() + + +def _build_resource_config( + *, + project: str, + region: str, + service_account: str, + staging_bucket: str, +) -> gigl_resource_config_pb2.GiglResourceConfig: + """Minimal GiglResourceConfig wired for a 1-replica CPU CustomJob.""" + common = gigl_resource_config_pb2.SharedResourceConfig.CommonComputeConfig( + project=project, + region=region, + temp_regional_assets_bucket=staging_bucket, + temp_assets_bucket=staging_bucket, + perm_assets_bucket=staging_bucket, + temp_assets_bq_dataset_name="not_used_by_smoke", + embedding_bq_dataset_name="not_used_by_smoke", + gcp_service_account_email=service_account, + dataflow_runner="DataflowRunner", + ) + shared = gigl_resource_config_pb2.SharedResourceConfig( + common_compute_config=common, + resource_labels={"cost_resource_group": "gigl_dev_smoke"}, + ) + trainer = gigl_resource_config_pb2.VertexAiResourceConfig( + # n1-standard-2 is rejected by Vertex AI; n1-standard-16 is the + # smallest spec we've confirmed accepted in dev. + machine_type="n1-standard-16", + gpu_type="ACCELERATOR_TYPE_UNSPECIFIED", + gpu_limit=0, + num_replicas=1, + timeout=600, + ) + return gigl_resource_config_pb2.GiglResourceConfig( + shared_resource_config=shared, + trainer_resource_config=gigl_resource_config_pb2.TrainerResourceConfig( + vertex_ai_trainer_config=trainer, + ), + ) + + +def _verify_named_experiment( + *, + tensorboard_resource_name: str, + experiment_name: str, +) -> None: + """Confirm the chief-rank writer ingested events into the named experiment.""" + experiment_resource_name = ( + f"{tensorboard_resource_name}/experiments/{experiment_name}" + ) + runs = aiplatform.TensorboardRun.list( + tensorboard_experiment_name=experiment_resource_name, + ) + if not runs: + raise RuntimeError( + f"Named TensorboardExperiment {experiment_resource_name} has no " + "TensorboardRuns; the writer did not ingest events." + ) + for run in runs: + time_series = aiplatform.TensorboardTimeSeries.list( + tensorboard_run_name=run.resource_name, + ) + if not time_series: + raise RuntimeError( + f"Run {run.resource_name} has no TensorboardTimeSeries; " + "events did not reach the API." + ) + run_names = sorted(r.display_name for r in runs) + logger.info( + f"Named experiment OK: {len(runs)} run(s) under {experiment_resource_name}: " + f"{run_names}" + ) + + +def _print_tb_url( + *, + region: str, + project: str, + tensorboard_id: str, + experiment_name: str, +) -> None: + base = f"https://{region}.tensorboard.googleusercontent.com/experiment" + qualifier = f"projects+{project}+locations+{region}+tensorboards+{tensorboard_id}" + named = f"{base}/{qualifier}+experiments+{experiment_name}" + logger.info(f"Named TB URL: {named}") + + +def main() -> int: + args = _parse_args() + + tb_match = _TENSORBOARD_RESOURCE_NAME_PATTERN.match(args.tensorboard) + if not tb_match: + logger.error( + f"--tensorboard must be projects/.../locations/.../tensorboards/...; " + f"got {args.tensorboard!r}." + ) + return 2 + + timestamp = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S") + job_name = args.job_name or f"gigl-tb-smoke-{timestamp}" + + resource_config = _build_resource_config( + project=args.project, + region=args.region, + service_account=args.service_account, + staging_bucket=args.staging_bucket, + ) + resource_wrapper = GiglResourceConfigWrapper(resource_config=resource_config) + + process_runtime_args = { + "tensorboard_resource_name": args.tensorboard, + "tensorboard_experiment_name": args.experiment_name, + } + + if args.dry_run: + logger.info( + "Dry run — would submit a CustomJob with:\n" + f" job_name = {job_name}\n" + f" container_uri = {args.container_uri}\n" + f" tensorboard_resource = {args.tensorboard}\n" + f" experiment_name = {args.experiment_name!r}\n" + f" process_runtime_args = {process_runtime_args}\n" + ) + return 0 + + aiplatform.init(project=args.project, location=args.region) + launch_single_pool_job( + vertex_ai_resource_config=resource_config.trainer_resource_config.vertex_ai_trainer_config, + job_name=job_name, + task_config_uri=Uri("gs://unused/by/smoke.yaml"), + resource_config_uri=Uri("gs://unused/by/smoke.yaml"), + process_command="python -m gigl.utils.dev.tb_smoke_main", + process_runtime_args=process_runtime_args, + resource_config_wrapper=resource_wrapper, + cpu_docker_uri=args.container_uri, + cuda_docker_uri=args.container_uri, + component=GiGLComponents.Trainer, + vertex_ai_region=args.region, + ) + logger.info(f"Submitted CustomJob: {job_name}") + + # CustomJob.submit blocks until completion inside launch_single_pool_job + # (see VertexAIService._submit_job: job.wait_for_completion). Give the + # backing TensorboardExperiment a short grace period for any final RPCs. + time.sleep(5) + + _verify_named_experiment( + tensorboard_resource_name=args.tensorboard, + experiment_name=args.experiment_name, + ) + _print_tb_url( + region=args.region, + project=args.project, + tensorboard_id=tb_match["tensorboard_id"], + experiment_name=args.experiment_name, + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/gigl/utils/dev/tb_smoke_main.py b/gigl/utils/dev/tb_smoke_main.py new file mode 100644 index 000000000..2d4a35807 --- /dev/null +++ b/gigl/utils/dev/tb_smoke_main.py @@ -0,0 +1,72 @@ +"""Tiny smoke-test entrypoint that exercises GiGL's TensorBoard pipeline. + +Submitted as the container command by ``submit_smoke_job.py``. Constructs a +``TensorBoardWriter`` with ``enabled=True`` (single-process smoke = always +chief), writes a few scalar events, and exits. + +Configuration is plumbed via CLI flags injected by the launcher from the +smoke script's ``process_runtime_args`` map. All three are required: + + --job_name= + --tensorboard_resource_name= + --tensorboard_experiment_name= + +This entrypoint deliberately mirrors the production trainer/inferencer call +sites in ``examples/link_prediction/`` so the smoke test exercises the same +``TensorBoardWriter.create()`` code path. +""" + +from __future__ import annotations + +import argparse + +from gigl.common.logger import Logger +from gigl.utils.tensorboard_writer import TensorBoardWriter + +logger = Logger() + +_NUM_STEPS = 3 + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--job_name", + required=True, + help="Used as the TensorboardRun ID (must be unique per launch).", + ) + parser.add_argument( + "--tensorboard_resource_name", + required=True, + help="Full Vertex AI Tensorboard resource name.", + ) + parser.add_argument( + "--tensorboard_experiment_name", + required=True, + help="TensorboardExperiment ID under the resource above.", + ) + # The launcher's _build_job_config always appends --task_config_uri, + # --resource_config_uri, and (on GPU) --use_cuda. The smoke entrypoint + # doesn't need them; use parse_known_args so they don't blow up argparse. + args, _unrecognized = parser.parse_known_args() + return args + + +def main() -> None: + """Write a handful of scalar events and exit.""" + args = _parse_args() + logger.info(f"Starting tb_smoke_main; job_name={args.job_name!r}") + with TensorBoardWriter.create( + resource_name=args.tensorboard_resource_name, + experiment_name=args.tensorboard_experiment_name, + experiment_run_name=args.job_name, + enabled=True, + ) as writer: + for step in range(_NUM_STEPS): + writer.log({"smoke/value": float(step)}, step=step) + logger.info(f"Wrote smoke/value={step} at step {step}") + logger.info("tb_smoke_main complete") + + +if __name__ == "__main__": + main() From ffc7015e206715d6ab1af550c674854649be1afd Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Thu, 14 May 2026 15:07:53 +0000 Subject: [PATCH 5/5] update --- gigl/utils/dev/__init__.py | 5 - gigl/utils/dev/submit_smoke_job.py | 258 ----------------------------- gigl/utils/dev/tb_smoke_main.py | 72 -------- 3 files changed, 335 deletions(-) delete mode 100644 gigl/utils/dev/__init__.py delete mode 100644 gigl/utils/dev/submit_smoke_job.py delete mode 100644 gigl/utils/dev/tb_smoke_main.py diff --git a/gigl/utils/dev/__init__.py b/gigl/utils/dev/__init__.py deleted file mode 100644 index 9c1bf25ab..000000000 --- a/gigl/utils/dev/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Developer utilities (smoke entrypoints, ad-hoc test helpers). - -Modules under this package are intended for short, ad-hoc test jobs and -developer iteration. They are NOT part of GiGL's stable public API. -""" diff --git a/gigl/utils/dev/submit_smoke_job.py b/gigl/utils/dev/submit_smoke_job.py deleted file mode 100644 index e1eab73ef..000000000 --- a/gigl/utils/dev/submit_smoke_job.py +++ /dev/null @@ -1,258 +0,0 @@ -"""Submit a tiny Vertex AI CustomJob that exercises GiGL's TensorBoard wiring. - -Goal: <2 min from "I changed launcher / writer code" to "I see whether TB -shows up." Bypasses ConfigPopulator and the full pipeline; uses the -production launcher path (``launch_single_pool_job``) so the same submit -logic runs as in real training. - -Required CLI flags: - --project GCP project (e.g. ``external-snap-ci-github-gigl``). - --region Vertex AI region (e.g. ``us-central1``). - --service-account Service account email used by the CustomJob. - --staging-bucket Regional GCS bucket Vertex stages artifacts under. - --tensorboard Full TensorBoard resource name - (``projects/.../locations/.../tensorboards/...``). - --experiment-name Vertex AI ``TensorboardExperiment`` name. The - tb_smoke_main entry point will pass this and the - --tensorboard value to ``TensorBoardWriter.create``. - --container-uri Container image to use. REQUIRED — must contain the - branch under test. - -Optional: - --job-name CustomJob display name. Defaults to a timestamped - ``gigl-tb-smoke-...``. - --dry-run Print the constructed submission parameters and - exit without submitting. - -Verification: - After the CustomJob completes the script polls the TensorBoard API - surface and asserts the user-named ``TensorboardExperiment`` exists - with at least one ``TensorboardRun`` containing time series data. - - The TB UI URL is printed for manual inspection. -""" - -from __future__ import annotations - -import argparse -import datetime -import re -import sys -import time - -from google.cloud import aiplatform - -from gigl.common import Uri -from gigl.common.logger import Logger -from gigl.src.common.constants.components import GiGLComponents -from gigl.src.common.types.pb_wrappers.gigl_resource_config import ( - GiglResourceConfigWrapper, -) -from gigl.src.common.vertex_ai_launcher import launch_single_pool_job -from snapchat.research.gbml import gigl_resource_config_pb2 - -logger = Logger() - -_TENSORBOARD_RESOURCE_NAME_PATTERN = re.compile( - r"^projects/(?P[^/]+)" - r"/locations/(?P[^/]+)" - r"/tensorboards/(?P[^/]+)$" -) - - -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("--project", required=True) - parser.add_argument("--region", required=True) - parser.add_argument("--service-account", required=True) - parser.add_argument( - "--staging-bucket", - required=True, - help="Regional GCS bucket (e.g. gs://gigl-cicd-temp).", - ) - parser.add_argument( - "--tensorboard", - required=True, - help="Full TensorBoard resource name.", - ) - parser.add_argument( - "--experiment-name", - required=True, - help=( - "TensorboardExperiment name. Passed to tb_smoke_main, which " - "creates the run under this experiment." - ), - ) - parser.add_argument( - "--container-uri", - required=True, - help=( - "Container image with the branch code. Required; pointing at a " - "released image would test stale code." - ), - ) - parser.add_argument("--job-name", default=None) - parser.add_argument("--dry-run", action="store_true") - return parser.parse_args() - - -def _build_resource_config( - *, - project: str, - region: str, - service_account: str, - staging_bucket: str, -) -> gigl_resource_config_pb2.GiglResourceConfig: - """Minimal GiglResourceConfig wired for a 1-replica CPU CustomJob.""" - common = gigl_resource_config_pb2.SharedResourceConfig.CommonComputeConfig( - project=project, - region=region, - temp_regional_assets_bucket=staging_bucket, - temp_assets_bucket=staging_bucket, - perm_assets_bucket=staging_bucket, - temp_assets_bq_dataset_name="not_used_by_smoke", - embedding_bq_dataset_name="not_used_by_smoke", - gcp_service_account_email=service_account, - dataflow_runner="DataflowRunner", - ) - shared = gigl_resource_config_pb2.SharedResourceConfig( - common_compute_config=common, - resource_labels={"cost_resource_group": "gigl_dev_smoke"}, - ) - trainer = gigl_resource_config_pb2.VertexAiResourceConfig( - # n1-standard-2 is rejected by Vertex AI; n1-standard-16 is the - # smallest spec we've confirmed accepted in dev. - machine_type="n1-standard-16", - gpu_type="ACCELERATOR_TYPE_UNSPECIFIED", - gpu_limit=0, - num_replicas=1, - timeout=600, - ) - return gigl_resource_config_pb2.GiglResourceConfig( - shared_resource_config=shared, - trainer_resource_config=gigl_resource_config_pb2.TrainerResourceConfig( - vertex_ai_trainer_config=trainer, - ), - ) - - -def _verify_named_experiment( - *, - tensorboard_resource_name: str, - experiment_name: str, -) -> None: - """Confirm the chief-rank writer ingested events into the named experiment.""" - experiment_resource_name = ( - f"{tensorboard_resource_name}/experiments/{experiment_name}" - ) - runs = aiplatform.TensorboardRun.list( - tensorboard_experiment_name=experiment_resource_name, - ) - if not runs: - raise RuntimeError( - f"Named TensorboardExperiment {experiment_resource_name} has no " - "TensorboardRuns; the writer did not ingest events." - ) - for run in runs: - time_series = aiplatform.TensorboardTimeSeries.list( - tensorboard_run_name=run.resource_name, - ) - if not time_series: - raise RuntimeError( - f"Run {run.resource_name} has no TensorboardTimeSeries; " - "events did not reach the API." - ) - run_names = sorted(r.display_name for r in runs) - logger.info( - f"Named experiment OK: {len(runs)} run(s) under {experiment_resource_name}: " - f"{run_names}" - ) - - -def _print_tb_url( - *, - region: str, - project: str, - tensorboard_id: str, - experiment_name: str, -) -> None: - base = f"https://{region}.tensorboard.googleusercontent.com/experiment" - qualifier = f"projects+{project}+locations+{region}+tensorboards+{tensorboard_id}" - named = f"{base}/{qualifier}+experiments+{experiment_name}" - logger.info(f"Named TB URL: {named}") - - -def main() -> int: - args = _parse_args() - - tb_match = _TENSORBOARD_RESOURCE_NAME_PATTERN.match(args.tensorboard) - if not tb_match: - logger.error( - f"--tensorboard must be projects/.../locations/.../tensorboards/...; " - f"got {args.tensorboard!r}." - ) - return 2 - - timestamp = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S") - job_name = args.job_name or f"gigl-tb-smoke-{timestamp}" - - resource_config = _build_resource_config( - project=args.project, - region=args.region, - service_account=args.service_account, - staging_bucket=args.staging_bucket, - ) - resource_wrapper = GiglResourceConfigWrapper(resource_config=resource_config) - - process_runtime_args = { - "tensorboard_resource_name": args.tensorboard, - "tensorboard_experiment_name": args.experiment_name, - } - - if args.dry_run: - logger.info( - "Dry run — would submit a CustomJob with:\n" - f" job_name = {job_name}\n" - f" container_uri = {args.container_uri}\n" - f" tensorboard_resource = {args.tensorboard}\n" - f" experiment_name = {args.experiment_name!r}\n" - f" process_runtime_args = {process_runtime_args}\n" - ) - return 0 - - aiplatform.init(project=args.project, location=args.region) - launch_single_pool_job( - vertex_ai_resource_config=resource_config.trainer_resource_config.vertex_ai_trainer_config, - job_name=job_name, - task_config_uri=Uri("gs://unused/by/smoke.yaml"), - resource_config_uri=Uri("gs://unused/by/smoke.yaml"), - process_command="python -m gigl.utils.dev.tb_smoke_main", - process_runtime_args=process_runtime_args, - resource_config_wrapper=resource_wrapper, - cpu_docker_uri=args.container_uri, - cuda_docker_uri=args.container_uri, - component=GiGLComponents.Trainer, - vertex_ai_region=args.region, - ) - logger.info(f"Submitted CustomJob: {job_name}") - - # CustomJob.submit blocks until completion inside launch_single_pool_job - # (see VertexAIService._submit_job: job.wait_for_completion). Give the - # backing TensorboardExperiment a short grace period for any final RPCs. - time.sleep(5) - - _verify_named_experiment( - tensorboard_resource_name=args.tensorboard, - experiment_name=args.experiment_name, - ) - _print_tb_url( - region=args.region, - project=args.project, - tensorboard_id=tb_match["tensorboard_id"], - experiment_name=args.experiment_name, - ) - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/gigl/utils/dev/tb_smoke_main.py b/gigl/utils/dev/tb_smoke_main.py deleted file mode 100644 index 2d4a35807..000000000 --- a/gigl/utils/dev/tb_smoke_main.py +++ /dev/null @@ -1,72 +0,0 @@ -"""Tiny smoke-test entrypoint that exercises GiGL's TensorBoard pipeline. - -Submitted as the container command by ``submit_smoke_job.py``. Constructs a -``TensorBoardWriter`` with ``enabled=True`` (single-process smoke = always -chief), writes a few scalar events, and exits. - -Configuration is plumbed via CLI flags injected by the launcher from the -smoke script's ``process_runtime_args`` map. All three are required: - - --job_name= - --tensorboard_resource_name= - --tensorboard_experiment_name= - -This entrypoint deliberately mirrors the production trainer/inferencer call -sites in ``examples/link_prediction/`` so the smoke test exercises the same -``TensorBoardWriter.create()`` code path. -""" - -from __future__ import annotations - -import argparse - -from gigl.common.logger import Logger -from gigl.utils.tensorboard_writer import TensorBoardWriter - -logger = Logger() - -_NUM_STEPS = 3 - - -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "--job_name", - required=True, - help="Used as the TensorboardRun ID (must be unique per launch).", - ) - parser.add_argument( - "--tensorboard_resource_name", - required=True, - help="Full Vertex AI Tensorboard resource name.", - ) - parser.add_argument( - "--tensorboard_experiment_name", - required=True, - help="TensorboardExperiment ID under the resource above.", - ) - # The launcher's _build_job_config always appends --task_config_uri, - # --resource_config_uri, and (on GPU) --use_cuda. The smoke entrypoint - # doesn't need them; use parse_known_args so they don't blow up argparse. - args, _unrecognized = parser.parse_known_args() - return args - - -def main() -> None: - """Write a handful of scalar events and exit.""" - args = _parse_args() - logger.info(f"Starting tb_smoke_main; job_name={args.job_name!r}") - with TensorBoardWriter.create( - resource_name=args.tensorboard_resource_name, - experiment_name=args.tensorboard_experiment_name, - experiment_run_name=args.job_name, - enabled=True, - ) as writer: - for step in range(_NUM_STEPS): - writer.log({"smoke/value": float(step)}, step=step) - logger.info(f"Wrote smoke/value={step} at step {step}") - logger.info("tb_smoke_main complete") - - -if __name__ == "__main__": - main()