Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions dapr/actor/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __init__(
self,
actor_idle_timeout: Optional[timedelta] = timedelta(hours=1),
actor_scan_interval: Optional[timedelta] = timedelta(seconds=30),
drain_ongoing_call_timeout: Optional[timedelta] = timedelta(minutes=1),
drain_ongoing_call_timeout: Optional[timedelta] = None,
drain_rebalanced_actors: Optional[bool] = True,
reentrancy: Optional[ActorReentrancyConfig] = None,
reminders_storage_partitions: Optional[int] = None,
Expand All @@ -130,9 +130,13 @@ def __init__(
actor_scan_interval (datetime.timedelta): The duration which specifies how often to scan
for actors to deactivate idle actors. Actors that have been idle longer than
actor_idle_timeout will be deactivated.
drain_ongoing_call_timeout (datetime.timedelta): The duration which specifies the
timeout for the current active actor method to finish before actor deactivation.
If there is no current actor method call, this is ignored.
drain_ongoing_call_timeout (Optional[datetime.timedelta]): The duration which
specifies the timeout for the current active actor method to finish before
actor deactivation. If there is no current actor method call, this is
ignored. Defaults to None, which omits the field from the configuration
sent to daprd so the runtime applies its own default. An explicit value
must be shorter than the daprd placement dissemination timeout, otherwise
daprd will clamp it.
drain_rebalanced_actors (bool): If true, Dapr will wait for drain_ongoing_call_timeout
to allow a current actor call to complete before trying to deactivate an actor.
reentrancy (ActorReentrancyConfig): Configure the reentrancy behavior for an actor.
Expand Down Expand Up @@ -175,10 +179,12 @@ def as_dict(self) -> Dict[str, Any]:
configDict: Dict[str, Any] = {
'actorIdleTimeout': self._actor_idle_timeout,
'actorScanInterval': self._actor_scan_interval,
'drainOngoingCallTimeout': self._drain_ongoing_call_timeout,
'drainRebalancedActors': self._drain_rebalanced_actors,
}

if self._drain_ongoing_call_timeout is not None:
configDict['drainOngoingCallTimeout'] = self._drain_ongoing_call_timeout

if self._reentrancy:
configDict.update({'reentrancy': self._reentrancy.as_dict()})

Expand Down
2 changes: 1 addition & 1 deletion tests/actor/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_actor_config(self):
self.assertTrue(config._drain_rebalanced_actors)
self.assertEqual(timedelta(hours=1), config._actor_idle_timeout)
self.assertEqual(timedelta(seconds=30), config._actor_scan_interval)
self.assertEqual(timedelta(minutes=1), config._drain_ongoing_call_timeout)
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertEqual(2, len(config._entities))

# apply new config
Expand Down
2 changes: 1 addition & 1 deletion tests/actor/test_actor_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_actor_config(self):
self.assertTrue(config._drain_rebalanced_actors)
self.assertEqual(timedelta(hours=1), config._actor_idle_timeout)
self.assertEqual(timedelta(seconds=30), config._actor_scan_interval)
self.assertEqual(timedelta(minutes=1), config._drain_ongoing_call_timeout)
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertEqual(3, len(config._entities))

# apply new config
Expand Down
19 changes: 13 additions & 6 deletions tests/actor/test_actor_runtime_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ def test_default_config(self):

self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertEqual(config._drain_rebalanced_actors, True)
self.assertEqual(config._reentrancy, None)
self.assertEqual(config._entities, set())
self.assertEqual(config._entitiesConfig, [])
self.assertNotIn('reentrancy', config.as_dict().keys())
self.assertNotIn('remindersStoragePartitions', config.as_dict().keys())
self.assertNotIn('drainOngoingCallTimeout', config.as_dict().keys())
self.assertEqual(config.as_dict()['entitiesConfig'], [])

def test_default_config_with_reentrancy(self):
Expand All @@ -84,7 +85,7 @@ def test_default_config_with_reentrancy(self):

self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertEqual(config._drain_rebalanced_actors, True)
self.assertEqual(config._reentrancy, reentrancyConfig)
self.assertEqual(config._entities, set())
Expand All @@ -110,7 +111,8 @@ def test_config_with_actor_type_config(self):
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))

d = config.as_dict()
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertNotIn('drainOngoingCallTimeout', d)
self.assertEqual(d['entitiesConfig'][0]['entities'], ['testactor1'])
self.assertEqual(d['entitiesConfig'][0]['actorScanInterval'], timedelta(seconds=10))
self.assertEqual(d['entitiesConfig'][0]['reentrancy']['enabled'], True)
Expand All @@ -130,7 +132,7 @@ def test_update_entities(self):

self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertEqual(config._drain_rebalanced_actors, True)
self.assertEqual(config._entities, {'actortype1'})
self.assertEqual(config._entitiesConfig, [])
Expand All @@ -141,7 +143,7 @@ def test_update_entities_two_types(self):
config.update_entities(['actortype1', 'actortype1'])
self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertEqual(config._drain_rebalanced_actors, True)
self.assertEqual(config._entities, {'actortype1', 'actortype1'})
self.assertEqual(config._entitiesConfig, [])
Expand All @@ -164,12 +166,17 @@ def test_set_reminders_storage_partitions(self):
config = ActorRuntimeConfig(reminders_storage_partitions=12)
self.assertEqual(config._actor_idle_timeout, timedelta(seconds=3600))
self.assertEqual(config._actor_scan_interval, timedelta(seconds=30))
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=60))
self.assertIsNone(config._drain_ongoing_call_timeout)
self.assertEqual(config._drain_rebalanced_actors, True)
self.assertNotIn('reentrancy', config.as_dict().keys())
self.assertEqual(config._reminders_storage_partitions, 12)
self.assertEqual(config.as_dict()['remindersStoragePartitions'], 12)

def test_explicit_drain_ongoing_call_timeout(self):
config = ActorRuntimeConfig(drain_ongoing_call_timeout=timedelta(seconds=10))
self.assertEqual(config._drain_ongoing_call_timeout, timedelta(seconds=10))
self.assertEqual(config.as_dict()['drainOngoingCallTimeout'], timedelta(seconds=10))


if __name__ == '__main__':
unittest.main()
Loading