Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 83 additions & 45 deletions app/lib/task/scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:convert';
import 'dart:math';

import 'package:_pub_shared/data/task_payload.dart';
import 'package:basics/map_basics.dart';
import 'package:clock/clock.dart';
import 'package:logging/logging.dart' show Logger;
import 'package:meta/meta.dart';
Expand All @@ -11,7 +12,6 @@ import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/shared/utils.dart';
import 'package:pub_dev/task/backend.dart';
import 'package:pub_dev/task/clock_control.dart';
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
import 'package:pub_dev/task/global_lock.dart';
import 'package:pub_dev/task/models.dart';
Expand All @@ -22,30 +22,38 @@ const _maxInstancesPerIteration = 50; // Later consider something like: 50;

/// Schedule tasks from [PackageState] while [claim] is valid, and [abort] have
/// not been resolved.
Future<void> schedule(
///
/// This will run at-most one iteration of the scheduling loop.
/// It's always safe to call this with two empty maps for [state], but next
/// iteration should run with [state] returned from previous iteration.
///
/// This returns a `delay` that should be awaited before the next iteration
/// of the scheduling loop is started.
Future<
({
({
Map<String, DateTime> zoneBannedUntil,
Map<String, DateTime> deletionsInProgress,
})
state,
Duration delay,
})
>
runSchedulingIteration(
GlobalLockClaim claim,
CloudCompute compute,
DatastoreDB db, {
required ({
Map<String, DateTime> zoneBannedUntil,
Map<String, DateTime> deletionsInProgress,
})
state,
required Completer<void> abort,
}) async {
/// Sleep [delay] time [since] timestamp, or now if not given.
Future<void> sleepOrAborted(Duration delay, {DateTime? since}) async {
ArgumentError.checkNotNull(delay, 'delay');
final now = clock.now();
since ??= now;

delay = delay - now.difference(since);
if (delay.isNegative) {
// Await a micro task to ensure consistent behavior
await Future.microtask(() {});
} else {
await abort.future.timeoutWithClock(delay, onTimeout: () => null);
}
}

// Map from zone to DateTime when zone is allowed again
final zoneBannedUntil = <String, DateTime>{
for (final zone in compute.zones) zone: DateTime(0),
for (final zone in compute.zones)
zone: state.zoneBannedUntil[zone] ?? DateTime(0),
};
void banZone(String zone, {int minutes = 0, int hours = 0, int days = 0}) {
if (!zoneBannedUntil.containsKey(zone)) {
Expand All @@ -63,13 +71,20 @@ Future<void> schedule(
// Set of `CloudInstance.instanceName`s currently being deleted.
// This to avoid deleting instances where the deletion process is still
// running.
final deletionInProgress = <String>{};
final deletionsInProgress = state.deletionsInProgress.whereValue(
(d) => d.isBefore(clock.ago(minutes: 5)),
);

final nextState = (
zoneBannedUntil: zoneBannedUntil,
deletionsInProgress: deletionsInProgress,
);

// Create a fast RNG with random seed for picking zones.
final rng = Random(Random.secure().nextInt(2 << 31));

// Run scheduling iterations, so long as we have a valid claim
while (claim.valid && !abort.isCompleted) {
if (claim.valid && !abort.isCompleted) {
final iterationStart = clock.now();
_log.info('Starting scheduling cycle');

Expand All @@ -85,7 +100,9 @@ Future<void> schedule(
.isBefore(clock.now());
// Also check deletionInProgress to prevent multiple calls to delete the
// same instance
final isBeingDeleted = deletionInProgress.contains(instance.instanceName);
final isBeingDeleted = deletionsInProgress.containsKey(
instance.instanceName,
);
if ((isTerminated || isTooOld) && !isBeingDeleted) {
if (isTooOld) {
// This indicates that something is wrong the with the instance,
Expand All @@ -97,21 +114,19 @@ Future<void> schedule(
_log.info('deleting $instance as it has terminated.');
}

deletionInProgress.add(instance.instanceName);
scheduleMicrotask(() async {
final deletionStart = clock.now();
try {
await compute.delete(instance.zone, instance.instanceName);
} catch (e, st) {
_log.severe('Failed to delete $instance', e, st);
} finally {
// Wait at-least 5 minutes from start of deletion until we remove
// it from [deletionInProgress] that way we give the API some time
// reconcile state.
await sleepOrAborted(Duration(minutes: 5), since: deletionStart);
deletionInProgress.remove(instance.instanceName);
}
});
deletionsInProgress[instance.instanceName] = clock.now();
try {
final f = compute.delete(instance.zone, instance.instanceName);
scheduleMicrotask(() async {
try {
await f;
} catch (e, st) {
_log.severe('Failed to delete $instance', e, st);
}
});
} catch (e, st) {
_log.severe('Failed to delete $instance', e, st);
}
}
}
_log.info('Found $instances instances');
Expand All @@ -120,8 +135,10 @@ Future<void> schedule(
if (activeConfiguration.maxTaskInstances <= instances) {
_log.info('Reached instance limit, trying again in 30s');
// Wait 30 seconds then list instances again, so that we can count them
await sleepOrAborted(Duration(seconds: 30), since: iterationStart);
continue; // skip the rest of the iteration
return (
state: nextState,
delay: computeDelaySince(Duration(seconds: 30), since: iterationStart),
);
}

// Determine which zones are not banned
Expand All @@ -137,8 +154,10 @@ Future<void> schedule(
// If no zones are available, we sleep and try again later.
if (allowedZones.isEmpty) {
_log.info('All compute-engine zones are banned, trying again in 30s');
await sleepOrAborted(Duration(seconds: 30), since: iterationStart);
continue;
return (
state: nextState,
delay: computeDelaySince(Duration(seconds: 30), since: iterationStart),
);
}

// Schedule analysis for some packages
Expand Down Expand Up @@ -253,20 +272,39 @@ Future<void> schedule(
// If there was no pending packages reviewed, and no instances currently
// running, then we can easily sleep 5 minutes before we poll again.
if (instances == 0 && pendingPackagesReviewed == 0) {
await sleepOrAborted(Duration(minutes: 5));
continue;
return (state: nextState, delay: computeDelaySince(Duration(minutes: 5)));
}

// If more tasks is available and quota wasn't used up, we only sleep 10s
if (pendingPackagesReviewed >= _maxInstancesPerIteration &&
activeConfiguration.maxTaskInstances > instances) {
await sleepOrAborted(Duration(seconds: 10), since: iterationStart);
continue;
return (
state: nextState,
delay: computeDelaySince(Duration(seconds: 10), since: iterationStart),
);
}

// If we are waiting for quota, then we sleep a minute before checking again
await sleepOrAborted(Duration(minutes: 1), since: iterationStart);
return (
state: nextState,
delay: computeDelaySince(Duration(minutes: 1), since: iterationStart),
);
}

return (state: nextState, delay: Duration.zero);
}

/// Sleep [delay] time [since] timestamp, or now if not given.
Duration computeDelaySince(Duration delay, {DateTime? since}) {
ArgumentError.checkNotNull(delay, 'delay');
final now = clock.now();
since ??= now;

delay = delay - now.difference(since);
if (delay.isNegative) {
return Duration.zero;
}
return delay;
}

/// Updates the package state with versions that are already pending or
Expand Down
Loading