diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index e69e87ad7..4ebf26790 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:math'; import 'package:_pub_shared/data/task_payload.dart'; +import 'package:basics/map_basics.dart'; import 'package:clock/clock.dart'; import 'package:logging/logging.dart' show Logger; import 'package:meta/meta.dart'; @@ -11,7 +12,6 @@ import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/task/backend.dart'; -import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/models.dart'; @@ -22,30 +22,38 @@ const _maxInstancesPerIteration = 50; // Later consider something like: 50; /// Schedule tasks from [PackageState] while [claim] is valid, and [abort] have /// not been resolved. -Future schedule( +/// +/// This will run at-most one iteration of the scheduling loop. +/// It's always safe to call this with two empty maps for [state], but next +/// iteration should run with [state] returned from previous iteration. +/// +/// This returns a `delay` that should be awaited before the next iteration +/// of the scheduling loop is started. +Future< + ({ + ({ + Map zoneBannedUntil, + Map deletionsInProgress, + }) + state, + Duration delay, + }) +> +runSchedulingIteration( GlobalLockClaim claim, CloudCompute compute, DatastoreDB db, { + required ({ + Map zoneBannedUntil, + Map deletionsInProgress, + }) + state, required Completer abort, }) async { - /// Sleep [delay] time [since] timestamp, or now if not given. - Future sleepOrAborted(Duration delay, {DateTime? since}) async { - ArgumentError.checkNotNull(delay, 'delay'); - final now = clock.now(); - since ??= now; - - delay = delay - now.difference(since); - if (delay.isNegative) { - // Await a micro task to ensure consistent behavior - await Future.microtask(() {}); - } else { - await abort.future.timeoutWithClock(delay, onTimeout: () => null); - } - } - // Map from zone to DateTime when zone is allowed again final zoneBannedUntil = { - for (final zone in compute.zones) zone: DateTime(0), + for (final zone in compute.zones) + zone: state.zoneBannedUntil[zone] ?? DateTime(0), }; void banZone(String zone, {int minutes = 0, int hours = 0, int days = 0}) { if (!zoneBannedUntil.containsKey(zone)) { @@ -63,13 +71,20 @@ Future schedule( // Set of `CloudInstance.instanceName`s currently being deleted. // This to avoid deleting instances where the deletion process is still // running. - final deletionInProgress = {}; + final deletionsInProgress = state.deletionsInProgress.whereValue( + (d) => d.isBefore(clock.ago(minutes: 5)), + ); + + final nextState = ( + zoneBannedUntil: zoneBannedUntil, + deletionsInProgress: deletionsInProgress, + ); // Create a fast RNG with random seed for picking zones. final rng = Random(Random.secure().nextInt(2 << 31)); // Run scheduling iterations, so long as we have a valid claim - while (claim.valid && !abort.isCompleted) { + if (claim.valid && !abort.isCompleted) { final iterationStart = clock.now(); _log.info('Starting scheduling cycle'); @@ -85,7 +100,9 @@ Future schedule( .isBefore(clock.now()); // Also check deletionInProgress to prevent multiple calls to delete the // same instance - final isBeingDeleted = deletionInProgress.contains(instance.instanceName); + final isBeingDeleted = deletionsInProgress.containsKey( + instance.instanceName, + ); if ((isTerminated || isTooOld) && !isBeingDeleted) { if (isTooOld) { // This indicates that something is wrong the with the instance, @@ -97,21 +114,19 @@ Future schedule( _log.info('deleting $instance as it has terminated.'); } - deletionInProgress.add(instance.instanceName); - scheduleMicrotask(() async { - final deletionStart = clock.now(); - try { - await compute.delete(instance.zone, instance.instanceName); - } catch (e, st) { - _log.severe('Failed to delete $instance', e, st); - } finally { - // Wait at-least 5 minutes from start of deletion until we remove - // it from [deletionInProgress] that way we give the API some time - // reconcile state. - await sleepOrAborted(Duration(minutes: 5), since: deletionStart); - deletionInProgress.remove(instance.instanceName); - } - }); + deletionsInProgress[instance.instanceName] = clock.now(); + try { + final f = compute.delete(instance.zone, instance.instanceName); + scheduleMicrotask(() async { + try { + await f; + } catch (e, st) { + _log.severe('Failed to delete $instance', e, st); + } + }); + } catch (e, st) { + _log.severe('Failed to delete $instance', e, st); + } } } _log.info('Found $instances instances'); @@ -120,8 +135,10 @@ Future schedule( if (activeConfiguration.maxTaskInstances <= instances) { _log.info('Reached instance limit, trying again in 30s'); // Wait 30 seconds then list instances again, so that we can count them - await sleepOrAborted(Duration(seconds: 30), since: iterationStart); - continue; // skip the rest of the iteration + return ( + state: nextState, + delay: computeDelaySince(Duration(seconds: 30), since: iterationStart), + ); } // Determine which zones are not banned @@ -137,8 +154,10 @@ Future schedule( // If no zones are available, we sleep and try again later. if (allowedZones.isEmpty) { _log.info('All compute-engine zones are banned, trying again in 30s'); - await sleepOrAborted(Duration(seconds: 30), since: iterationStart); - continue; + return ( + state: nextState, + delay: computeDelaySince(Duration(seconds: 30), since: iterationStart), + ); } // Schedule analysis for some packages @@ -253,20 +272,39 @@ Future schedule( // If there was no pending packages reviewed, and no instances currently // running, then we can easily sleep 5 minutes before we poll again. if (instances == 0 && pendingPackagesReviewed == 0) { - await sleepOrAborted(Duration(minutes: 5)); - continue; + return (state: nextState, delay: computeDelaySince(Duration(minutes: 5))); } // If more tasks is available and quota wasn't used up, we only sleep 10s if (pendingPackagesReviewed >= _maxInstancesPerIteration && activeConfiguration.maxTaskInstances > instances) { - await sleepOrAborted(Duration(seconds: 10), since: iterationStart); - continue; + return ( + state: nextState, + delay: computeDelaySince(Duration(seconds: 10), since: iterationStart), + ); } // If we are waiting for quota, then we sleep a minute before checking again - await sleepOrAborted(Duration(minutes: 1), since: iterationStart); + return ( + state: nextState, + delay: computeDelaySince(Duration(minutes: 1), since: iterationStart), + ); + } + + return (state: nextState, delay: Duration.zero); +} + +/// Sleep [delay] time [since] timestamp, or now if not given. +Duration computeDelaySince(Duration delay, {DateTime? since}) { + ArgumentError.checkNotNull(delay, 'delay'); + final now = clock.now(); + since ??= now; + + delay = delay - now.difference(since); + if (delay.isNegative) { + return Duration.zero; } + return delay; } /// Updates the package state with versions that are already pending or