Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions postgres/changelog.d/23640.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Close dangling connections and break reference cycles on check cancel to reduce memory retention when checks are restarted or rescheduled
3 changes: 3 additions & 0 deletions postgres/datadog_checks/postgres/data_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
job_name="data-observability",
)

def _shutdown(self):
self._check = None

@property
def _do_config(self):
return self._config.data_observability
Expand Down
6 changes: 5 additions & 1 deletion postgres/datadog_checks/postgres/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
)
self._check = check
self._config = config
self.db_pool = self._check.db_pool
self._collect_pg_settings_enabled = config.collect_settings.enabled
self._collect_extensions_enabled = self._collect_pg_settings_enabled
self._collect_schemas_enabled = config.collect_schemas.enabled
Expand All @@ -122,6 +121,11 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
self._tags_no_db = None
self.tags = None

def _shutdown(self):
self._check = None
self._schema_collector = None
self._compiled_patterns_cache = None

def _dbtags(self, db, *extra_tags):
"""
Returns the default instance tags with the initial "db" tag replaced with the provided tag
Expand Down
56 changes: 35 additions & 21 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datadog_checks.base.utils.db.core import QueryManager
from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus
from datadog_checks.base.utils.db.utils import (
DBMAsyncJob,
default_json_event_encoding,
tracked_query,
)
Expand Down Expand Up @@ -301,6 +302,15 @@ def execute_query_raw(self, query, db):
rows = cursor.fetchall()
return rows

def _close_db(self):
if self._db:
try:
self._db.close()
except Exception:
pass
finally:
self._db = None

@contextlib.contextmanager
def db(self):
"""
Expand All @@ -321,12 +331,7 @@ def db(self):
self.log.warning(
"Connection to the database %s has been interrupted, closing connection", self._config.dbname
)
try:
self._db.close()
except Exception:
pass
finally:
self._db = None
self._close_db()
raise
except Exception:
self.log.exception("Unhandled exception while using database connection %s", self._config.dbname)
Expand Down Expand Up @@ -468,29 +473,38 @@ def dynamic_queries(self):

return self._dynamic_queries

@staticmethod
def _cancel_async_job(job: DBMAsyncJob):
job.cancel()
if job._job_loop_future:
job._job_loop_future.result()
job._job_loop_future = None
job._shutdown()

def cancel(self):
"""
Cancels and sends cancel signal to all threads.
"""
if self._config.dbm:
self.statement_samples.cancel()
self.statement_metrics.cancel()
self.metadata_samples.cancel()
if self.statement_metrics._job_loop_future:
self.statement_metrics._job_loop_future.result()
if self.statement_samples._job_loop_future:
self.statement_samples._job_loop_future.result()
if self.metadata_samples._job_loop_future:
self.metadata_samples._job_loop_future.result()
self._cancel_async_job(self.statement_metrics)
self._cancel_async_job(self.statement_samples)
self._cancel_async_job(self.metadata_samples)
elif self._config.data_observability.enabled:
self.metadata_samples.cancel()
if self.metadata_samples._job_loop_future:
self.metadata_samples._job_loop_future.result()
self._cancel_async_job(self.metadata_samples)
if self._config.data_observability.enabled:
self.data_observability.cancel()
if self.data_observability._job_loop_future:
self.data_observability._job_loop_future.result()
self._cancel_async_job(self.data_observability)
self._clean_state()
self._query_manager = None
self.health = None
self.check_initializations.clear()
# TODO: move diagnosis cleanup into AgentCheck.cancel() in the base class
self._diagnosis = None
self._close_db()
self._close_db_pool()
# CheckLoggingAdapter holds self.check until check_id is resolved via
# process(), which only happens after the agent scheduler calls run().
# If cancel() is called before that, the back-reference is never cleared.
self.log.check = None

def _clean_state(self):
self.log.debug("Cleaning state")
Expand Down
15 changes: 11 additions & 4 deletions postgres/datadog_checks/postgres/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
if not config.query_samples.enabled:
collection_interval = config.query_activity.collection_interval

self.db_pool = check.db_pool

super(PostgresStatementSamples, self).__init__(
check,
rate_limit=1 / collection_interval,
Expand Down Expand Up @@ -222,6 +220,15 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
self._time_since_last_activity_event = 0
self._pg_stat_activity_cols = None

def _shutdown(self):
self._check = None
self._explain_parameterized_queries = None
self._collection_strategy_cache = None
self._explain_errors_cache = None
self._explained_statements_ratelimiter = None
self._seen_samples_ratelimiter = None
self._raw_statement_text_cache = None

def _dbtags(self, db, *extra_tags):
"""
Returns the default instance tags with the initial "db" tag replaced with the provided tag
Expand Down Expand Up @@ -646,7 +653,7 @@ def _can_explain_statement(self, obfuscated_statement):
def _get_db_explain_setup_state(self, dbname):
# type: (str) -> Tuple[Optional[DBExplainError], Optional[Exception]]
try:
self.db_pool.get_connection(dbname)
self._check.db_pool.get_connection(dbname)
except psycopg.OperationalError as e:
self._log.warning(
"cannot collect execution plans due to failed DB connection to dbname=%s: %s", dbname, repr(e)
Expand Down Expand Up @@ -712,7 +719,7 @@ def _run_explain(self, dbname, statement, obfuscated_statement):
start_time = time.time()
if self._cancel_event.is_set():
raise Exception("Job loop cancelled. Aborting query.")
with self.db_pool.get_connection(dbname) as conn:
with self._check.db_pool.get_connection(dbname) as conn:
Comment thread
eric-weaver marked this conversation as resolved.
# When sending potentially non-ascii data, e.g. UTF8, we need to force
# the client encoding to UTF-8 to match Python string encoding
if conn.info.encoding.lower() in ["ascii", "sqlascii", "sql_ascii"]:
Expand Down
7 changes: 7 additions & 0 deletions postgres/datadog_checks/postgres/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ def __init__(self, check, config: InstanceConfig):
ttl=60 * 60 / config.query_metrics.full_statement_text_samples_per_hour_per_query,
)

def _shutdown(self):
self._check = None
self._full_statement_text_cache = None
self._state = None
self._query_calls_cache = None
self._baseline_metrics = None

def _execute_query(self, query, params=(), binary=False, row_factory=None) -> Tuple[list, list]:
if self._cancel_event.is_set():
raise Exception("Job loop cancelled. Aborting query.")
Expand Down
2 changes: 1 addition & 1 deletion postgres/tests/test_pg_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ def test_database_instance_metadata(aggregator, pg_instance, dbm_enabled, report
'database_instance:{}'.format(expected_database_instance),
]
check = integration_check(pg_instance)
run_one_check(check)
run_one_check(check, cancel=False)

# These tags are a bit dynamic in value, so we get them from the check and ensure they are present
expected_tags.append('postgresql_version:{}'.format(check.raw_version))
Expand Down
8 changes: 4 additions & 4 deletions postgres/tests/test_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ def test_activity_reported_hostname(
check = integration_check(dbm_instance)
check._connect()

run_one_check(check)
run_one_check(check, cancel=False)
run_one_check(check)

dbm_activity = aggregator.get_event_platform_events("dbm-activity")
Expand Down Expand Up @@ -1480,7 +1480,7 @@ def test_async_job_enabled(
dbm_instance['query_metrics'] = {'enabled': statement_metrics_enabled, 'run_sync': False}
check = integration_check(dbm_instance)
check._connect()
run_one_check(check)
run_one_check(check, cancel=False)
if statement_samples_enabled or statement_activity_enabled:
assert check.statement_samples._job_loop_future is not None
else:
Expand Down Expand Up @@ -1713,8 +1713,8 @@ def test_async_job_cancel_cancel(aggregator, integration_check, dbm_instance):
check = integration_check(dbm_instance)
check._connect()
run_one_check(check)
assert not check.statement_samples._job_loop_future.running(), "samples thread should be stopped"
assert not check.statement_metrics._job_loop_future.running(), "metrics thread should be stopped"
assert check.statement_samples._job_loop_future is None, "samples future should be cleaned up after cancel"
assert check.statement_metrics._job_loop_future is None, "metrics future should be cleaned up after cancel"
# if the thread doesn't start until after the cancel signal is set then the db connection will never
# be created in the first place
assert check.db_pool.pools.get(dbm_instance['dbname']) is None, "db connection should be gone"
Expand Down
92 changes: 90 additions & 2 deletions postgres/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import copy
import gc
import weakref

import mock
import psycopg
import pytest
from pytest import fail
from semver import VersionInfo

from datadog_checks.postgres import util
from datadog_checks.postgres import PostgreSql, util
from datadog_checks.postgres.schemas import PostgresSchemaCollector

pytestmark = pytest.mark.unit
Expand Down Expand Up @@ -315,7 +317,7 @@ def test_run_explain_uses_parameterized_statement(pg_instance, integration_check
conn_cm = mock.MagicMock()
conn_cm.__enter__.return_value = mock_conn

with mock.patch.object(check.statement_samples.db_pool, 'get_connection', return_value=conn_cm):
with mock.patch.object(check.db_pool, 'get_connection', return_value=conn_cm):
with mock.patch.object(check, 'histogram'):
check.statement_samples._run_explain('testdb', statement, statement)

Expand All @@ -336,3 +338,89 @@ def test_new_connection_closes_conn_when_configure_raises(integration_check, pg_
with pytest.raises(psycopg.Error):
check._new_connection(check._config.dbname)
conn.close.assert_called_once()


def test_close_db_closes_open_connection(integration_check, pg_instance):
check = integration_check(pg_instance)
conn = mock.MagicMock()
conn.closed = False
check._db = conn

check._close_db()

conn.close.assert_called_once()
assert check._db is None


def test_close_db_handles_already_closed_connection(integration_check, pg_instance):
check = integration_check(pg_instance)
conn = mock.MagicMock()
conn.close.side_effect = Exception("already closed")
check._db = conn

check._close_db()

assert check._db is None


def test_close_db_noop_when_no_connection(integration_check, pg_instance):
check = integration_check(pg_instance)
check._db = None

check._close_db()

assert check._db is None


def test_cancel_closes_main_db_connection(integration_check, pg_instance):
check = integration_check(pg_instance)
conn = mock.MagicMock()
check._db = conn

check.cancel()

conn.close.assert_called_once()
assert check._db is None


def test_check_gc_after_cancel(pg_instance):
"""Verify cancel() breaks all reference cycles so refcount alone reclaims the check.

If this test fails, the assertion message lists the types still holding a
reference to the check. To fix it:

1. Identify the referrer type in the failure message (e.g. ``QueryManager``).
2. Find which attribute on that object points back to the check (usually
``self.check`` or ``self._check``).
3. Null that attribute in ``cancel()`` or add it to the relevant
``_shutdown()`` method.
4. If the referrer is a closure or ``functools.partial``, find the
registration site and null or clear the container that holds it.
"""
pg_instance['dbm'] = True
pg_instance['query_samples'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1}
pg_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 10}
pg_instance['query_activity'] = {'enabled': True, 'collection_interval': 1}
pg_instance['data_observability'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1}

check = PostgreSql('postgres', {}, [pg_instance])
ref = weakref.ref(check)

check.cancel()

gc.collect()
gc.disable()
try:
del check
obj = ref()
if obj is not None:
import inspect

referrers = [
f"bound method {r.__qualname__}" if inspect.ismethod(r) else type(r).__name__
for r in gc.get_referrers(obj)
]
del obj
fail(f"Check still alive after cancel() + del -- pinned by: {referrers}")
finally:
gc.enable()
8 changes: 1 addition & 7 deletions postgres/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,11 @@ def run_vacuum_thread(pg_instance, vacuum_query, application_name='test'):
def run_one_check(check: AgentCheck, cancel=True):
"""
Run check and immediately cancel.
Waits for all threads to close before continuing.
cancel() joins all threads and nulls futures, so no extra .result() calls needed.
"""
check.run()
if cancel:
check.cancel()
if check.statement_samples._job_loop_future is not None:
check.statement_samples._job_loop_future.result()
if check.statement_metrics._job_loop_future is not None:
check.statement_metrics._job_loop_future.result()
if check.metadata_samples._job_loop_future is not None:
check.metadata_samples._job_loop_future.result()


def normalize_object(obj):
Expand Down
Loading