Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/google-cloud-spanner/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def unit(session, protobuf_implementation):
CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt"
)
install_unittest_dependencies(session, "-c", constraints_path)
session.install("pytest-xdist")
Comment thread
chalmerlowe marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be added to UNIT_TEST_STANDARD_DEPENDENCIES?


# TODO(https://github.com/googleapis/synthtool/issues/1976):
# Remove the 'cpp' implementation once support for Protobuf 3.x is dropped.
Expand All @@ -240,6 +241,8 @@ def unit(session, protobuf_implementation):
# Run py.test against the unit tests.
args = [
"py.test",
"-n",
"auto",
"-s",
f"--junitxml=unit_{session.python}_sponge_log.xml",
"--cov=google",
Expand Down Expand Up @@ -754,7 +757,6 @@ def prerelease_deps(session, protobuf_implementation, database_dialect):
def mypy(session):
"""Run the type checker."""
session.skip("Mypy is not yet supported")

# TODO(https://github.com/googleapis/gapic-generator-python/issues/2579):
# use the latest version of mypy
session.install(
Expand Down Expand Up @@ -832,12 +834,15 @@ def core_deps_from_source(session, protobuf_implementation):
dep_paths = [str(deps_dir / dep) for dep in core_dependencies_from_source]

session.install(*dep_paths, "--no-deps", "--ignore-installed")
session.install("pytest-xdist")
print(
f"Installed {', '.join(core_dependencies_from_source)} locally from {deps_dir}"
)

session.run(
"py.test",
"-n",
"auto",
"tests/unit",
env={
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation,
Expand Down
12 changes: 11 additions & 1 deletion packages/google-cloud-spanner/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,17 @@
"google-cloud-monitoring >= 2.16.0",
"mmh3 >= 4.1.0",
]
extras = {"libcst": "libcst >= 0.2.5"}
extras = {
"libcst": "libcst >= 0.2.5",
"test": [
"pytest",
"mock",
"asyncmock",
"pytest-cov",
"pytest-asyncio",
"pytest-xdist",
],
}

url = "https://github.com/googleapis/google-cloud-python/tree/main/packages/google-cloud-spanner"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ async def _unit_of_work(transaction):
transaction.insert_or_update(sd.TABLE, sd.COLUMNS, sd.ROW_DATA)

await shared_database.run_in_transaction(_unit_of_work)
assert attempts == 2
# Expect at least 2 attempts due to our simulated manual abort on first try.
# We use >= 2 rather than == 2 because the live Spanner server can also
# trigger transient abort retries depending on real-world GCP resource contention.
assert attempts >= 2


@pytest.mark.asyncio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,21 @@ async def test_sync_branches_admin_apis(self):
self.assertIsNotNone(ia_api)
self.assertIsNotNone(da_api)

def test_initialize_metrics_double_check(self):
# Safety shield mocks: We intercept the OpenTelemetry metric classes at the client module namespace level
# to prevent instantiating real exporter objects. This prevents spawning live background worker threads
# that periodically wake up and trigger 401 credential errors inside unauthenticated unit test runs.
@mock.patch("google.cloud.spanner_v1._async.client.CloudMonitoringMetricsExporter")
@mock.patch("google.cloud.spanner_v1._async.client.PeriodicExportingMetricReader")
@mock.patch("google.cloud.spanner_v1._async.client.MeterProvider")
# Global state reset: Temporarily override the module's process-wide global boolean _metrics_monitor_initialized
# to False so that the client enters the initialization logic instead of returning early.
@mock.patch(
"google.cloud.spanner_v1._async.client._metrics_monitor_initialized",
False,
)
def test_initialize_metrics_double_check(
self, mock_provider, mock_reader, mock_exporter
):
# coverage for line 143->exit
from google.cloud.spanner_v1._async import client as MUT

Expand All @@ -147,15 +161,17 @@ def __enter__(self):
def __exit__(self, *args):
return original_lock.__exit__(*args)

# Concurrency race condition simulator: Replace the process synchronization lock with our custom SettingLock.
# When this lock enters, it toggles _metrics_monitor_initialized to True to simulate another thread
# completing metrics setup while this thread was waiting for the lock.
with mock.patch(
"google.cloud.spanner_v1._async.client._metrics_monitor_initialized", False
"google.cloud.spanner_v1._async.client._metrics_monitor_lock",
SettingLock(),
):
with mock.patch(
"google.cloud.spanner_v1._async.client._metrics_monitor_lock",
SettingLock(),
):
MUT._initialize_metrics("project", self.credentials)
self.assertTrue(MUT._metrics_monitor_initialized)
# Trigger the initialization function and verify Spanner's double-checked lock safely
# checks the flag again and aborts cleanly to prevent dual-registration.
MUT._initialize_metrics("project", self.credentials)
self.assertTrue(MUT._metrics_monitor_initialized)

def test_default_transaction_options_validation(self):
# coverage for line 344
Expand Down
26 changes: 20 additions & 6 deletions packages/google-cloud-spanner/tests/unit/_async/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1800,11 +1800,18 @@ async def unit_of_work(txn, *args, **kw):
called_with.append((txn, args, kw))
txn.insert(TABLE_NAME, COLUMNS, VALUES)

import threading
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be moved to to top of the file?


main_thread = threading.current_thread()
_results = [1, 1.5]

# retry once w/ timeout_secs=1
def _time(_results=[1, 1.5]):
if len(_results) > 1:
return _results.pop(0)
return _results[0]
def _time():
if threading.current_thread() is main_thread:
if len(_results) > 1:
return _results.pop(0)
return _results[0]
return 1.0

with mock.patch("time.time", _time):
with mock.patch(
Expand Down Expand Up @@ -1877,9 +1884,16 @@ async def unit_of_work(txn, *args, **kw):
called_with.append((txn, args, kw))
txn.insert(TABLE_NAME, COLUMNS, VALUES)

import threading
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we import threading globally, this shouldnt be needed


main_thread = threading.current_thread()
_results = [1] * 100

# retry several times to check backoff
def _time(_results=[1] * 100):
return _results.pop(0)
def _time():
if threading.current_thread() is main_thread:
return _results.pop(0)
return 1.0

with (
mock.patch("time.time", _time),
Expand Down
18 changes: 18 additions & 0 deletions packages/google-cloud-spanner/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,23 @@

import os

import pytest

from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import (
SpannerMetricsTracerFactory,
)

# Disable builtin metrics to avoid background thread noise and 401 errors in unit tests
os.environ["SPANNER_DISABLE_BUILTIN_METRICS"] = "true"


@pytest.fixture(autouse=True)
def reset_metrics_singletons(monkeypatch):
# Reset singletons and env var before test to avoid state pollution
monkeypatch.setenv("SPANNER_DISABLE_BUILTIN_METRICS", "true")
SpannerMetricsTracerFactory._metrics_tracer_factory = None
SpannerMetricsTracerFactory._current_metrics_tracer_ctx.set(None)
yield
# Reset singletons after test to ensure no leakage
SpannerMetricsTracerFactory._metrics_tracer_factory = None
SpannerMetricsTracerFactory._current_metrics_tracer_ctx.set(None)
12 changes: 8 additions & 4 deletions packages/google-cloud-spanner/tests/unit/gapic/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ def provide_loop_to_sync_grpc_tests():
If no global loop exists, `grpc.aio` engine crashes during initialization.
"""
try:
loop = asyncio.get_event_loop()
asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

yield
# No close here, just ensure existance
try:
yield
finally:
loop.close()
asyncio.set_event_loop(None)
else:
yield
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def collect_protobufs(val):

registered_classes = set(partition_helper._PROTO_CLASS_MAP.values())
for cls in discovered_protobuf_classes:
with self.subTest(cls=cls):
with self.subTest(cls_name=cls.__name__):
self.assertIn(
cls,
registered_classes,
Expand Down
4 changes: 2 additions & 2 deletions packages/google-cloud-spanner/tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def test_w_numeric_precision_and_scale_valid(self):
decimal.Decimal("1E-9"),
]
for value in cases:
with self.subTest(value=value):
with self.subTest(value=str(value)):
value_pb = self._callFUT(value)
self.assertIsInstance(value_pb, Value)
self.assertEqual(value_pb.string_value, str(value))
Expand Down Expand Up @@ -371,7 +371,7 @@ def test_w_numeric_precision_and_scale_invalid(self):
]

for value, err_msg in cases:
with self.subTest(value=value, err_msg=err_msg):
with self.subTest(value=str(value), err_msg=err_msg):
self.assertRaisesRegex(
ValueError,
err_msg,
Expand Down
6 changes: 2 additions & 4 deletions packages/google-cloud-spanner/tests/unit/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ def patched_client(monkeypatch):

with (
patch("google.cloud.spanner_v1.metrics.metrics_exporter.MetricServiceClient"),
patch(
"google.cloud.spanner_v1.metrics.metrics_exporter.CloudMonitoringMetricsExporter"
),
patch("opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader"),
patch("google.cloud.spanner_v1.client.CloudMonitoringMetricsExporter"),
patch("google.cloud.spanner_v1.client.PeriodicExportingMetricReader"),
):
client = Client(
project="test",
Expand Down
26 changes: 22 additions & 4 deletions packages/google-cloud-spanner/tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1714,9 +1714,18 @@ def unit_of_work(txn, *args, **kw):
called_with.append((txn, args, kw))
txn.insert(TABLE_NAME, COLUMNS, VALUES)

import threading

main_thread = threading.current_thread()
_results = [1, 1.5]

# retry once w/ timeout_secs=1
def _time(_results=[1, 1.5]):
return _results.pop(0)
def _time():
if threading.current_thread() is main_thread:
if len(_results) > 1:
return _results.pop(0)
return _results[0]
return 1.0

with mock.patch("time.time", _time):
with mock.patch("time.sleep") as sleep_mock:
Expand Down Expand Up @@ -1783,9 +1792,18 @@ def unit_of_work(txn, *args, **kw):
called_with.append((txn, args, kw))
txn.insert(TABLE_NAME, COLUMNS, VALUES)

import threading
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments about imports


main_thread = threading.current_thread()
_results = [1, 2, 4, 8]

# retry several times to check backoff
def _time(_results=[1, 2, 4, 8]):
return _results.pop(0)
def _time():
if threading.current_thread() is main_thread:
if len(_results) > 1:
return _results.pop(0)
return _results[0]
return 1.0

with (
mock.patch("time.time", _time),
Expand Down
Loading
Loading