From 31b631a2ea08cb8dcf41f2db162c06dc71763926 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 5 Dec 2025 16:03:11 +0100 Subject: [PATCH] Refactoring cloud compute instance creation with external state of banned zones. --- app/lib/task/backend.dart | 104 +++++------- app/lib/task/scheduler.dart | 325 ++++++++++++++++++------------------ 2 files changed, 197 insertions(+), 232 deletions(-) diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index e7a002a80..a1549dc8c 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -104,6 +104,7 @@ class TaskBackend { ScanPackagesUpdatedState _scanPackagesUpdatedState = ScanPackagesUpdatedState.init(); DeleteInstancesState _deleteInstancesState = DeleteInstancesState.init(); + CreateInstancesState _createInstanesState = CreateInstancesState.init(); TaskBackend(this._db, this._bucket); @@ -127,28 +128,22 @@ class TaskBackend { final scanLoop = _createLoop( name: 'scan-packages', aborted: aborted, - fn: (claim, aborted) async { - await _scanForPackageUpdates(claim, abort: aborted); - }, + fn: _runOneScanPackagesUpdate, ); final deleteLoop = _createLoop( name: 'delete-instances', aborted: aborted, - fn: (claim, aborted) async { - await _scanForInstanceDeletion(claim, abort: aborted); - }, + fn: _runOneInstanceDeletion, ); - final scheduleLoop = _createLoop( - name: 'schedule', + final createLoop = _createLoop( + name: 'create-instances', aborted: aborted, - fn: (claim, aborted) async { - await schedule(claim, taskWorkerCloudCompute, _db, abort: aborted); - }, + fn: _runOneInstanceCreation, ); scheduleMicrotask(() async { // Wait for background process to finish - await Future.wait([scanLoop, deleteLoop, scheduleLoop]); + await Future.wait([scanLoop, deleteLoop, createLoop]); // Report background processes as stopped stopped.complete(); @@ -158,7 +153,7 @@ class TaskBackend { Future _createLoop({ required String name, required Completer aborted, - required Future Function(GlobalLockClaim claim, Completer aborted) fn, + required Future Function(bool Function() isAbortedFn) fn, }) { return Future.microtask(() async { try { @@ -172,7 +167,19 @@ class TaskBackend { while (!aborted.isCompleted) { try { await lock.withClaim((claim) async { - await fn(claim, aborted); + bool isAbortedFn() => !claim.valid || aborted.isCompleted; + while (!isAbortedFn()) { + final delay = await fn(isAbortedFn); + + if (isAbortedFn()) { + return; + } + // Wait until aborted or [delay] before doing it again! + await aborted.future.timeoutWithClock( + delay, + onTimeout: () => null, + ); + } }, abort: aborted); } catch (e, st) { // Log this as very bad, and then move on. Nothing good can come @@ -289,32 +296,9 @@ class TaskBackend { } } - /// Scan for updates from packages until [abort] is resolved, or [claim] - /// is lost. - Future _scanForPackageUpdates( - GlobalLockClaim claim, { - Completer? abort, - }) async { - abort ??= Completer(); - - bool isAbortedFn() => !claim.valid || abort!.isCompleted; - while (!isAbortedFn()) { - await _runOneScanPackagesUpdate(isAbortedFn: isAbortedFn); - - if (isAbortedFn()) { - return; - } - // Wait until aborted or 10 minutes before scanning again! - await abort.future.timeoutWithClock( - Duration(minutes: 10), - onTimeout: () => null, - ); - } - } - - Future _runOneScanPackagesUpdate({ - required bool Function() isAbortedFn, - }) async { + Future _runOneScanPackagesUpdate( + bool Function() isAbortedFn, + ) async { final next = await calculateScanPackagesUpdatedLoop( _scanPackagesUpdatedState, _db.packages.listUpdatedSince(_scanPackagesUpdatedState.since), @@ -324,45 +308,33 @@ class TaskBackend { for (final p in next.packages) { if (isAbortedFn()) { - return; + return Duration(minutes: 10); } // Check the package await trackPackage(p, updateDependents: true); } - } - - /// Scan for compute instances that can be deleted until [abort] is resolved, - /// or [claim] is lost. - Future _scanForInstanceDeletion( - GlobalLockClaim claim, { - Completer? abort, - }) async { - abort ??= Completer(); - bool isAbortedFn() => !claim.valid || abort!.isCompleted; - while (!isAbortedFn()) { - await _runOneInstanceDeletion(isAbortedFn: isAbortedFn); - - if (isAbortedFn()) { - return; - } - // Wait until aborted or 10 minutes before scanning again! - await abort.future.timeoutWithClock( - Duration(minutes: 10), - onTimeout: () => null, - ); - } + return Duration(minutes: 10); } - Future _runOneInstanceDeletion({ - required bool Function() isAbortedFn, - }) async { + Future _runOneInstanceDeletion(bool Function() isAbortedFn) async { _deleteInstancesState = await scanAndDeleteInstances( _deleteInstancesState, taskWorkerCloudCompute, isAbortedFn, maxTaskRunHours: activeConfiguration.maxTaskRunHours, ); + return Duration(minutes: 10); + } + + Future _runOneInstanceCreation(bool Function() isAbortedFn) async { + final result = await schedule( + taskWorkerCloudCompute, + _db, + state: _createInstanesState, + ); + _createInstanesState = result.$1; + return result.$2; } Future trackPackage( diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index a2c8f1b39..a1488b30f 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -11,41 +11,33 @@ import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/task/backend.dart'; -import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; -import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/models.dart'; final _log = Logger('pub.task.schedule'); const _maxInstancesPerIteration = 50; // Later consider something like: 50; -/// Schedule tasks from [PackageState] while [claim] is valid, and [abort] have -/// not been resolved. -Future schedule( - GlobalLockClaim claim, +/// The internal state for creating new instances. +final class CreateInstancesState { + final Map zoneBannedUntil; + + CreateInstancesState({required this.zoneBannedUntil}); + + factory CreateInstancesState.init() => + CreateInstancesState(zoneBannedUntil: {}); +} + +/// Schedule tasks from [PackageState]. +Future<(CreateInstancesState, Duration)> schedule( CloudCompute compute, DatastoreDB db, { - required Completer abort, + required CreateInstancesState state, }) async { - /// Sleep [delay] time [since] timestamp, or now if not given. - Future sleepOrAborted(Duration delay, {DateTime? since}) async { - ArgumentError.checkNotNull(delay, 'delay'); - final now = clock.now(); - since ??= now; - - delay = delay - now.difference(since); - if (delay.isNegative) { - // Await a micro task to ensure consistent behavior - await Future.microtask(() {}); - } else { - await abort.future.timeoutWithClock(delay, onTimeout: () => null); - } - } - // Map from zone to DateTime when zone is allowed again final zoneBannedUntil = { for (final zone in compute.zones) zone: DateTime(0), + ...state.zoneBannedUntil, }; void banZone(String zone, {int minutes = 0, int hours = 0, int days = 0}) { if (!zoneBannedUntil.containsKey(zone)) { @@ -63,168 +55,169 @@ Future schedule( // Create a fast RNG with random seed for picking zones. final rng = Random(Random.secure().nextInt(2 << 31)); - bool isAbortedFn() => !claim.valid || abort.isCompleted; - // Run scheduling iterations, so long as we have a valid claim - while (!isAbortedFn()) { - final iterationStart = clock.now(); - _log.info('Starting scheduling cycle'); + _log.info('Starting scheduling cycle'); - final instances = await compute.listInstances().toList(); - if (isAbortedFn()) { - break; - } + final instances = await compute.listInstances().toList(); + _log.info('Found $instances instances'); - _log.info('Found $instances instances'); + // If we are not allowed to create new instances within the allowed quota, + if (activeConfiguration.maxTaskInstances <= instances.length) { + _log.info('Reached instance limit, trying again in 30s'); + return ( + CreateInstancesState(zoneBannedUntil: zoneBannedUntil), + Duration(seconds: 30), + ); + } - // If we are not allowed to create new instances within the allowed quota, - if (activeConfiguration.maxTaskInstances <= instances.length) { - _log.info('Reached instance limit, trying again in 30s'); - // Wait 30 seconds then list instances again, so that we can count them - await sleepOrAborted(Duration(seconds: 30), since: iterationStart); - continue; // skip the rest of the iteration - } + // Determine which zones are not banned + final allowedZones = + zoneBannedUntil.entries + .where((e) => e.value.isBefore(clock.now())) + .map((e) => e.key) + .toList() + ..shuffle(rng); + var nextZoneIndex = 0; + String pickZone() => allowedZones[nextZoneIndex++ % allowedZones.length]; - // Determine which zones are not banned - final allowedZones = - zoneBannedUntil.entries - .where((e) => e.value.isBefore(clock.now())) - .map((e) => e.key) - .toList() - ..shuffle(rng); - var nextZoneIndex = 0; - String pickZone() => allowedZones[nextZoneIndex++ % allowedZones.length]; + // If no zones are available, we sleep and try again later. + if (allowedZones.isEmpty) { + _log.info('All compute-engine zones are banned, trying again in 30s'); + return ( + CreateInstancesState(zoneBannedUntil: zoneBannedUntil), + Duration(seconds: 30), + ); + } - // If no zones are available, we sleep and try again later. - if (allowedZones.isEmpty) { - _log.info('All compute-engine zones are banned, trying again in 30s'); - await sleepOrAborted(Duration(seconds: 30), since: iterationStart); - continue; - } + // Schedule analysis for some packages + var pendingPackagesReviewed = 0; + final selectLimit = min( + _maxInstancesPerIteration, + max(0, activeConfiguration.maxTaskInstances - instances.length), + ); - // Schedule analysis for some packages - var pendingPackagesReviewed = 0; - final selectLimit = min( - _maxInstancesPerIteration, - max(0, activeConfiguration.maxTaskInstances - instances.length), - ); + Future scheduleInstance(({String package}) selected) async { + pendingPackagesReviewed += 1; - Future scheduleInstance(({String package}) selected) async { - pendingPackagesReviewed += 1; + final instanceName = compute.generateInstanceName(); + final zone = pickZone(); - final instanceName = compute.generateInstanceName(); - final zone = pickZone(); + final updated = await updatePackageStateWithPendingVersions( + db, + selected.package, + zone, + instanceName, + ); + final payload = updated?.$1; + if (payload == null) { + return; + } + // Create human readable description for GCP console. + final description = + 'package:${payload.package} analysis of ${payload.versions.length} ' + 'versions.'; - final updated = await updatePackageStateWithPendingVersions( - db, - selected.package, - zone, - instanceName, + var rollbackPackageState = true; + try { + // Purging cache is important for the edge case, where the new upload happens + // on a different runtime version, and the current one's cache is still stale + // and does not have the version yet. + // TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed. + await purgePackageCache(payload.package); + _log.info( + 'creating instance $instanceName in $zone for ' + 'package:${selected.package}', + ); + await compute.createInstance( + zone: zone, + instanceName: instanceName, + dockerImage: activeConfiguration.taskWorkerImage!, + arguments: [json.encode(payload)], + description: description, + ); + rollbackPackageState = false; + } on ZoneExhaustedException catch (e, st) { + // A zone being exhausted is normal operations, we just use another + // zone for 15 minutes. + _log.info( + 'zone resources exhausted, banning ${e.zone} for 30 minutes', + e, + st, + ); + // Ban usage of zone for 30 minutes + banZone(e.zone, minutes: 30); + } on QuotaExhaustedException catch (e, st) { + // Quota exhausted, this can happen, but it shouldn't. We'll just stop + // doing anything for 10 minutes. Hopefully that'll resolve the issue. + // We log severe, because this is a reason to adjust the quota or + // instance limits. + _log.severe( + 'Quota exhausted trying to create $instanceName, banning all zones ' + 'for 10 minutes', + e, + st, ); - final payload = updated?.$1; - if (payload == null) { - return; - } - // Create human readable description for GCP console. - final description = - 'package:${payload.package} analysis of ${payload.versions.length} ' - 'versions.'; - - var rollbackPackageState = true; - try { - // Purging cache is important for the edge case, where the new upload happens - // on a different runtime version, and the current one's cache is still stale - // and does not have the version yet. - // TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed. - await purgePackageCache(payload.package); - _log.info( - 'creating instance $instanceName in $zone for ' - 'package:${selected.package}', - ); - await compute.createInstance( - zone: zone, - instanceName: instanceName, - dockerImage: activeConfiguration.taskWorkerImage!, - arguments: [json.encode(payload)], - description: description, - ); - rollbackPackageState = false; - } on ZoneExhaustedException catch (e, st) { - // A zone being exhausted is normal operations, we just use another - // zone for 15 minutes. - _log.info( - 'zone resources exhausted, banning ${e.zone} for 30 minutes', - e, - st, - ); - // Ban usage of zone for 30 minutes - banZone(e.zone, minutes: 30); - } on QuotaExhaustedException catch (e, st) { - // Quota exhausted, this can happen, but it shouldn't. We'll just stop - // doing anything for 10 minutes. Hopefully that'll resolve the issue. - // We log severe, because this is a reason to adjust the quota or - // instance limits. - _log.severe( - 'Quota exhausted trying to create $instanceName, banning all zones ' - 'for 10 minutes', - e, - st, - ); - // Ban all zones for 10 minutes - for (final zone in compute.zones) { - banZone(zone, minutes: 10); - } - } on Exception catch (e, st) { - // No idea what happened, but for robustness we'll stop using the zone - // and shout into the logs - _log.shout( - 'Failed to create instance $instanceName, banning zone "$zone" for ' - '15 minutes', - e, - st, - ); - // Ban usage of zone for 15 minutes - banZone(zone, minutes: 15); - } - if (rollbackPackageState) { - final oldVersionsMap = updated?.$2 ?? const {}; - // Restore the state of the PackageState for versions that were - // suppose to run on the instance we just failed to create. - // If this doesn't work, we'll eventually retry. Hence, correctness - // does not hinge on this transaction being successful. - await db.tasks.restorePreviousVersionsState( - selected.package, - instanceName, - oldVersionsMap, - ); + // Ban all zones for 10 minutes + for (final zone in compute.zones) { + banZone(zone, minutes: 10); } + } on Exception catch (e, st) { + // No idea what happened, but for robustness we'll stop using the zone + // and shout into the logs + _log.shout( + 'Failed to create instance $instanceName, banning zone "$zone" for ' + '15 minutes', + e, + st, + ); + // Ban usage of zone for 15 minutes + banZone(zone, minutes: 15); } - - // Creating an instance can be slow, we want to schedule them concurrently. - await Future.wait( - (await db.tasks.selectSomePending(selectLimit).toList()).map( - scheduleInstance, - ), - ); - - // If there was no pending packages reviewed, and no instances currently - // running, then we can easily sleep 5 minutes before we poll again. - if (instances.isEmpty && pendingPackagesReviewed == 0) { - await sleepOrAborted(Duration(minutes: 5)); - continue; + if (rollbackPackageState) { + final oldVersionsMap = updated?.$2 ?? const {}; + // Restore the state of the PackageState for versions that were + // suppose to run on the instance we just failed to create. + // If this doesn't work, we'll eventually retry. Hence, correctness + // does not hinge on this transaction being successful. + await db.tasks.restorePreviousVersionsState( + selected.package, + instanceName, + oldVersionsMap, + ); } + } - // If more tasks is available and quota wasn't used up, we only sleep 10s - if (pendingPackagesReviewed >= _maxInstancesPerIteration && - activeConfiguration.maxTaskInstances > instances.length) { - await sleepOrAborted(Duration(seconds: 10), since: iterationStart); - continue; - } + // Creating an instance can be slow, we want to schedule them concurrently. + await Future.wait( + (await db.tasks.selectSomePending(selectLimit).toList()).map( + scheduleInstance, + ), + ); - // If we are waiting for quota, then we sleep a minute before checking again - await sleepOrAborted(Duration(minutes: 1), since: iterationStart); + // If there was no pending packages reviewed, and no instances currently + // running, then we can easily sleep 5 minutes before we poll again. + if (instances.isEmpty && pendingPackagesReviewed == 0) { + return ( + CreateInstancesState(zoneBannedUntil: zoneBannedUntil), + Duration(minutes: 5), + ); } + + // If more tasks is available and quota wasn't used up, we only sleep 10s + if (pendingPackagesReviewed >= _maxInstancesPerIteration && + activeConfiguration.maxTaskInstances > instances.length) { + return ( + CreateInstancesState(zoneBannedUntil: zoneBannedUntil), + Duration(seconds: 10), + ); + } + + // If we are waiting for quota, then we sleep a minute before checking again + return ( + CreateInstancesState(zoneBannedUntil: zoneBannedUntil), + Duration(minutes: 1), + ); } /// Updates the package state with versions that are already pending or