diff --git a/dapr/actor/runtime/config.py b/dapr/actor/runtime/config.py index 8d42ebda4..4e64e5fdb 100644 --- a/dapr/actor/runtime/config.py +++ b/dapr/actor/runtime/config.py @@ -117,7 +117,7 @@ def __init__( self, actor_idle_timeout: Optional[timedelta] = timedelta(hours=1), actor_scan_interval: Optional[timedelta] = timedelta(seconds=30), - drain_ongoing_call_timeout: Optional[timedelta] = timedelta(minutes=1), + drain_ongoing_call_timeout: Optional[timedelta] = None, drain_rebalanced_actors: Optional[bool] = True, reentrancy: Optional[ActorReentrancyConfig] = None, reminders_storage_partitions: Optional[int] = None, @@ -130,9 +130,13 @@ def __init__( actor_scan_interval (datetime.timedelta): The duration which specifies how often to scan for actors to deactivate idle actors. Actors that have been idle longer than actor_idle_timeout will be deactivated. - drain_ongoing_call_timeout (datetime.timedelta): The duration which specifies the - timeout for the current active actor method to finish before actor deactivation. - If there is no current actor method call, this is ignored. + drain_ongoing_call_timeout (Optional[datetime.timedelta]): The duration which + specifies the timeout for the current active actor method to finish before + actor deactivation. If there is no current actor method call, this is + ignored. Defaults to None, which omits the field from the configuration + sent to daprd so the runtime applies its own default. An explicit value + must be shorter than the daprd placement dissemination timeout, otherwise + daprd will clamp it. drain_rebalanced_actors (bool): If true, Dapr will wait for drain_ongoing_call_timeout to allow a current actor call to complete before trying to deactivate an actor. reentrancy (ActorReentrancyConfig): Configure the reentrancy behavior for an actor. @@ -175,10 +179,12 @@ def as_dict(self) -> Dict[str, Any]: configDict: Dict[str, Any] = { 'actorIdleTimeout': self._actor_idle_timeout, 'actorScanInterval': self._actor_scan_interval, - 'drainOngoingCallTimeout': self._drain_ongoing_call_timeout, 'drainRebalancedActors': self._drain_rebalanced_actors, } + if self._drain_ongoing_call_timeout is not None: + configDict['drainOngoingCallTimeout'] = self._drain_ongoing_call_timeout + if self._reentrancy: configDict.update({'reentrancy': self._reentrancy.as_dict()}) diff --git a/tests/actor/test_actor.py b/tests/actor/test_actor.py index 7a7bee2d2..3fb1c46b0 100644 --- a/tests/actor/test_actor.py +++ b/tests/actor/test_actor.py @@ -64,7 +64,7 @@ def test_actor_config(self): self.assertTrue(config._drain_rebalanced_actors) self.assertEqual(timedelta(hours=1), config._actor_idle_timeout) self.assertEqual(timedelta(seconds=30), config._actor_scan_interval) - self.assertEqual(timedelta(minutes=1), config._drain_ongoing_call_timeout) + self.assertIsNone(config._drain_ongoing_call_timeout) self.assertEqual(2, len(config._entities)) # apply new config diff --git a/tests/actor/test_actor_runtime.py b/tests/actor/test_actor_runtime.py index 7725c3728..c07085ef1 100644 --- a/tests/actor/test_actor_runtime.py +++ b/tests/actor/test_actor_runtime.py @@ -60,7 +60,7 @@ def test_actor_config(self): self.assertTrue(config._drain_rebalanced_actors) self.assertEqual(timedelta(hours=1), config._actor_idle_timeout) self.assertEqual(timedelta(seconds=30), config._actor_scan_interval) - self.assertEqual(timedelta(minutes=1), config._drain_ongoing_call_timeout) + self.assertIsNone(config._drain_ongoing_call_timeout) self.assertEqual(3, len(config._entities)) # apply new config diff --git a/tests/actor/test_actor_runtime_config.py b/tests/actor/test_actor_runtime_config.py index e39894c77..534d0ce6f 100644 --- a/tests/actor/test_actor_runtime_config.py +++ b/tests/actor/test_actor_runtime_config.py @@ -69,13 +69,14 @@ def test_default_config(self): self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600)) self.assertEqual(config._actor_scan_interval, timedelta(seconds=30)) - self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60)) + self.assertIsNone(config._drain_ongoing_call_timeout) self.assertEqual(config._drain_rebalanced_actors, True) self.assertEqual(config._reentrancy, None) self.assertEqual(config._entities, set()) self.assertEqual(config._entitiesConfig, []) self.assertNotIn('reentrancy', config.as_dict().keys()) self.assertNotIn('remindersStoragePartitions', config.as_dict().keys()) + self.assertNotIn('drainOngoingCallTimeout', config.as_dict().keys()) self.assertEqual(config.as_dict()['entitiesConfig'], []) def test_default_config_with_reentrancy(self): @@ -84,7 +85,7 @@ def test_default_config_with_reentrancy(self): self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600)) self.assertEqual(config._actor_scan_interval, timedelta(seconds=30)) - self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60)) + self.assertIsNone(config._drain_ongoing_call_timeout) self.assertEqual(config._drain_rebalanced_actors, True) self.assertEqual(config._reentrancy, reentrancyConfig) self.assertEqual(config._entities, set()) @@ -110,7 +111,8 @@ def test_config_with_actor_type_config(self): self.assertEqual(config._actor_scan_interval, timedelta(seconds=30)) d = config.as_dict() - self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60)) + self.assertIsNone(config._drain_ongoing_call_timeout) + self.assertNotIn('drainOngoingCallTimeout', d) self.assertEqual(d['entitiesConfig'][0]['entities'], ['testactor1']) self.assertEqual(d['entitiesConfig'][0]['actorScanInterval'], timedelta(seconds=10)) self.assertEqual(d['entitiesConfig'][0]['reentrancy']['enabled'], True) @@ -130,7 +132,7 @@ def test_update_entities(self): self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600)) self.assertEqual(config._actor_scan_interval, timedelta(seconds=30)) - self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60)) + self.assertIsNone(config._drain_ongoing_call_timeout) self.assertEqual(config._drain_rebalanced_actors, True) self.assertEqual(config._entities, {'actortype1'}) self.assertEqual(config._entitiesConfig, []) @@ -141,7 +143,7 @@ def test_update_entities_two_types(self): config.update_entities(['actortype1', 'actortype1']) self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600)) self.assertEqual(config._actor_scan_interval, timedelta(seconds=30)) - self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60)) + self.assertIsNone(config._drain_ongoing_call_timeout) self.assertEqual(config._drain_rebalanced_actors, True) self.assertEqual(config._entities, {'actortype1', 'actortype1'}) self.assertEqual(config._entitiesConfig, []) @@ -164,12 +166,17 @@ def test_set_reminders_storage_partitions(self): config = ActorRuntimeConfig(reminders_storage_partitions=12) self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600)) self.assertEqual(config._actor_scan_interval, timedelta(seconds=30)) - self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60)) + self.assertIsNone(config._drain_ongoing_call_timeout) self.assertEqual(config._drain_rebalanced_actors, True) self.assertNotIn('reentrancy', config.as_dict().keys()) self.assertEqual(config._reminders_storage_partitions, 12) self.assertEqual(config.as_dict()['remindersStoragePartitions'], 12) + def test_explicit_drain_ongoing_call_timeout(self): + config = ActorRuntimeConfig(drain_ongoing_call_timeout=timedelta(seconds=10)) + self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=10)) + self.assertEqual(config.as_dict()['drainOngoingCallTimeout'], timedelta(seconds=10)) + if __name__ == '__main__': unittest.main()