diff --git a/postgres/changelog.d/23640.fixed b/postgres/changelog.d/23640.fixed new file mode 100644 index 0000000000000..92778f0dd54ad --- /dev/null +++ b/postgres/changelog.d/23640.fixed @@ -0,0 +1 @@ +Close dangling connections and break reference cycles on check cancel to reduce memory retention when checks are restarted or rescheduled \ No newline at end of file diff --git a/postgres/datadog_checks/postgres/data_observability.py b/postgres/datadog_checks/postgres/data_observability.py index 6f4107b29e5ab..5dce7eabb5b01 100644 --- a/postgres/datadog_checks/postgres/data_observability.py +++ b/postgres/datadog_checks/postgres/data_observability.py @@ -38,6 +38,9 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): job_name="data-observability", ) + def _shutdown(self): + self._check = None + @property def _do_config(self): return self._config.data_observability diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index cd341d12b23e5..d8a2959754291 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -108,7 +108,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): ) self._check = check self._config = config - self.db_pool = self._check.db_pool self._collect_pg_settings_enabled = config.collect_settings.enabled self._collect_extensions_enabled = self._collect_pg_settings_enabled self._collect_schemas_enabled = config.collect_schemas.enabled @@ -122,6 +121,11 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): self._tags_no_db = None self.tags = None + def _shutdown(self): + self._check = None + self._schema_collector = None + self._compiled_patterns_cache = None + def _dbtags(self, db, *extra_tags): """ Returns the default instance tags with the initial "db" tag replaced with the provided tag diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 0d9223662d657..caab366180792 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -17,6 +17,7 @@ from datadog_checks.base.utils.db.core import QueryManager from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus from datadog_checks.base.utils.db.utils import ( + DBMAsyncJob, default_json_event_encoding, tracked_query, ) @@ -301,6 +302,15 @@ def execute_query_raw(self, query, db): rows = cursor.fetchall() return rows + def _close_db(self): + if self._db: + try: + self._db.close() + except Exception: + pass + finally: + self._db = None + @contextlib.contextmanager def db(self): """ @@ -321,12 +331,7 @@ def db(self): self.log.warning( "Connection to the database %s has been interrupted, closing connection", self._config.dbname ) - try: - self._db.close() - except Exception: - pass - finally: - self._db = None + self._close_db() raise except Exception: self.log.exception("Unhandled exception while using database connection %s", self._config.dbname) @@ -468,29 +473,38 @@ def dynamic_queries(self): return self._dynamic_queries + @staticmethod + def _cancel_async_job(job: DBMAsyncJob): + job.cancel() + if job._job_loop_future: + job._job_loop_future.result() + job._job_loop_future = None + job._shutdown() + def cancel(self): """ Cancels and sends cancel signal to all threads. """ if self._config.dbm: - self.statement_samples.cancel() - self.statement_metrics.cancel() - self.metadata_samples.cancel() - if self.statement_metrics._job_loop_future: - self.statement_metrics._job_loop_future.result() - if self.statement_samples._job_loop_future: - self.statement_samples._job_loop_future.result() - if self.metadata_samples._job_loop_future: - self.metadata_samples._job_loop_future.result() + self._cancel_async_job(self.statement_metrics) + self._cancel_async_job(self.statement_samples) + self._cancel_async_job(self.metadata_samples) elif self._config.data_observability.enabled: - self.metadata_samples.cancel() - if self.metadata_samples._job_loop_future: - self.metadata_samples._job_loop_future.result() + self._cancel_async_job(self.metadata_samples) if self._config.data_observability.enabled: - self.data_observability.cancel() - if self.data_observability._job_loop_future: - self.data_observability._job_loop_future.result() + self._cancel_async_job(self.data_observability) + self._clean_state() + self._query_manager = None + self.health = None + self.check_initializations.clear() + # TODO: move diagnosis cleanup into AgentCheck.cancel() in the base class + self._diagnosis = None + self._close_db() self._close_db_pool() + # CheckLoggingAdapter holds self.check until check_id is resolved via + # process(), which only happens after the agent scheduler calls run(). + # If cancel() is called before that, the back-reference is never cleared. + self.log.check = None def _clean_state(self): self.log.debug("Cleaning state") diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index e8b45dd1ef891..b4c731a270cc3 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -152,8 +152,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): if not config.query_samples.enabled: collection_interval = config.query_activity.collection_interval - self.db_pool = check.db_pool - super(PostgresStatementSamples, self).__init__( check, rate_limit=1 / collection_interval, @@ -222,6 +220,15 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): self._time_since_last_activity_event = 0 self._pg_stat_activity_cols = None + def _shutdown(self): + self._check = None + self._explain_parameterized_queries = None + self._collection_strategy_cache = None + self._explain_errors_cache = None + self._explained_statements_ratelimiter = None + self._seen_samples_ratelimiter = None + self._raw_statement_text_cache = None + def _dbtags(self, db, *extra_tags): """ Returns the default instance tags with the initial "db" tag replaced with the provided tag @@ -646,7 +653,7 @@ def _can_explain_statement(self, obfuscated_statement): def _get_db_explain_setup_state(self, dbname): # type: (str) -> Tuple[Optional[DBExplainError], Optional[Exception]] try: - self.db_pool.get_connection(dbname) + self._check.db_pool.get_connection(dbname) except psycopg.OperationalError as e: self._log.warning( "cannot collect execution plans due to failed DB connection to dbname=%s: %s", dbname, repr(e) @@ -712,7 +719,7 @@ def _run_explain(self, dbname, statement, obfuscated_statement): start_time = time.time() if self._cancel_event.is_set(): raise Exception("Job loop cancelled. Aborting query.") - with self.db_pool.get_connection(dbname) as conn: + with self._check.db_pool.get_connection(dbname) as conn: # When sending potentially non-ascii data, e.g. UTF8, we need to force # the client encoding to UTF-8 to match Python string encoding if conn.info.encoding.lower() in ["ascii", "sqlascii", "sql_ascii"]: diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 8934812ecc79f..b515b717c599d 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -202,6 +202,13 @@ def __init__(self, check, config: InstanceConfig): ttl=60 * 60 / config.query_metrics.full_statement_text_samples_per_hour_per_query, ) + def _shutdown(self): + self._check = None + self._full_statement_text_cache = None + self._state = None + self._query_calls_cache = None + self._baseline_metrics = None + def _execute_query(self, query, params=(), binary=False, row_factory=None) -> Tuple[list, list]: if self._cancel_event.is_set(): raise Exception("Job loop cancelled. Aborting query.") diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index 18aa7a510d4a8..6e27a1f974fa4 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -857,7 +857,7 @@ def test_database_instance_metadata(aggregator, pg_instance, dbm_enabled, report 'database_instance:{}'.format(expected_database_instance), ] check = integration_check(pg_instance) - run_one_check(check) + run_one_check(check, cancel=False) # These tags are a bit dynamic in value, so we get them from the check and ensure they are present expected_tags.append('postgresql_version:{}'.format(check.raw_version)) diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 10c3c363f7f49..f2983eae8befe 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -1215,7 +1215,7 @@ def test_activity_reported_hostname( check = integration_check(dbm_instance) check._connect() - run_one_check(check) + run_one_check(check, cancel=False) run_one_check(check) dbm_activity = aggregator.get_event_platform_events("dbm-activity") @@ -1480,7 +1480,7 @@ def test_async_job_enabled( dbm_instance['query_metrics'] = {'enabled': statement_metrics_enabled, 'run_sync': False} check = integration_check(dbm_instance) check._connect() - run_one_check(check) + run_one_check(check, cancel=False) if statement_samples_enabled or statement_activity_enabled: assert check.statement_samples._job_loop_future is not None else: @@ -1713,8 +1713,8 @@ def test_async_job_cancel_cancel(aggregator, integration_check, dbm_instance): check = integration_check(dbm_instance) check._connect() run_one_check(check) - assert not check.statement_samples._job_loop_future.running(), "samples thread should be stopped" - assert not check.statement_metrics._job_loop_future.running(), "metrics thread should be stopped" + assert check.statement_samples._job_loop_future is None, "samples future should be cleaned up after cancel" + assert check.statement_metrics._job_loop_future is None, "metrics future should be cleaned up after cancel" # if the thread doesn't start until after the cancel signal is set then the db connection will never # be created in the first place assert check.db_pool.pools.get(dbm_instance['dbname']) is None, "db connection should be gone" diff --git a/postgres/tests/test_unit.py b/postgres/tests/test_unit.py index ce7d0505880df..decd10285d07d 100644 --- a/postgres/tests/test_unit.py +++ b/postgres/tests/test_unit.py @@ -2,6 +2,8 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import copy +import gc +import weakref import mock import psycopg @@ -9,7 +11,7 @@ from pytest import fail from semver import VersionInfo -from datadog_checks.postgres import util +from datadog_checks.postgres import PostgreSql, util from datadog_checks.postgres.schemas import PostgresSchemaCollector pytestmark = pytest.mark.unit @@ -315,7 +317,7 @@ def test_run_explain_uses_parameterized_statement(pg_instance, integration_check conn_cm = mock.MagicMock() conn_cm.__enter__.return_value = mock_conn - with mock.patch.object(check.statement_samples.db_pool, 'get_connection', return_value=conn_cm): + with mock.patch.object(check.db_pool, 'get_connection', return_value=conn_cm): with mock.patch.object(check, 'histogram'): check.statement_samples._run_explain('testdb', statement, statement) @@ -336,3 +338,89 @@ def test_new_connection_closes_conn_when_configure_raises(integration_check, pg_ with pytest.raises(psycopg.Error): check._new_connection(check._config.dbname) conn.close.assert_called_once() + + +def test_close_db_closes_open_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + conn = mock.MagicMock() + conn.closed = False + check._db = conn + + check._close_db() + + conn.close.assert_called_once() + assert check._db is None + + +def test_close_db_handles_already_closed_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + conn = mock.MagicMock() + conn.close.side_effect = Exception("already closed") + check._db = conn + + check._close_db() + + assert check._db is None + + +def test_close_db_noop_when_no_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + check._db = None + + check._close_db() + + assert check._db is None + + +def test_cancel_closes_main_db_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + conn = mock.MagicMock() + check._db = conn + + check.cancel() + + conn.close.assert_called_once() + assert check._db is None + + +def test_check_gc_after_cancel(pg_instance): + """Verify cancel() breaks all reference cycles so refcount alone reclaims the check. + + If this test fails, the assertion message lists the types still holding a + reference to the check. To fix it: + + 1. Identify the referrer type in the failure message (e.g. ``QueryManager``). + 2. Find which attribute on that object points back to the check (usually + ``self.check`` or ``self._check``). + 3. Null that attribute in ``cancel()`` or add it to the relevant + ``_shutdown()`` method. + 4. If the referrer is a closure or ``functools.partial``, find the + registration site and null or clear the container that holds it. + """ + pg_instance['dbm'] = True + pg_instance['query_samples'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1} + pg_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 10} + pg_instance['query_activity'] = {'enabled': True, 'collection_interval': 1} + pg_instance['data_observability'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1} + + check = PostgreSql('postgres', {}, [pg_instance]) + ref = weakref.ref(check) + + check.cancel() + + gc.collect() + gc.disable() + try: + del check + obj = ref() + if obj is not None: + import inspect + + referrers = [ + f"bound method {r.__qualname__}" if inspect.ismethod(r) else type(r).__name__ + for r in gc.get_referrers(obj) + ] + del obj + fail(f"Check still alive after cancel() + del -- pinned by: {referrers}") + finally: + gc.enable() diff --git a/postgres/tests/utils.py b/postgres/tests/utils.py index 30d0181ec3ad1..fd948c4781ba0 100644 --- a/postgres/tests/utils.py +++ b/postgres/tests/utils.py @@ -137,17 +137,11 @@ def run_vacuum_thread(pg_instance, vacuum_query, application_name='test'): def run_one_check(check: AgentCheck, cancel=True): """ Run check and immediately cancel. - Waits for all threads to close before continuing. + cancel() joins all threads and nulls futures, so no extra .result() calls needed. """ check.run() if cancel: check.cancel() - if check.statement_samples._job_loop_future is not None: - check.statement_samples._job_loop_future.result() - if check.statement_metrics._job_loop_future is not None: - check.statement_metrics._job_loop_future.result() - if check.metadata_samples._job_loop_future is not None: - check.metadata_samples._job_loop_future.result() def normalize_object(obj):