Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 38 additions & 66 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class TaskBackend {
ScanPackagesUpdatedState _scanPackagesUpdatedState =
ScanPackagesUpdatedState.init();
DeleteInstancesState _deleteInstancesState = DeleteInstancesState.init();
CreateInstancesState _createInstanesState = CreateInstancesState.init();

TaskBackend(this._db, this._bucket);

Expand All @@ -127,28 +128,22 @@ class TaskBackend {
final scanLoop = _createLoop(
name: 'scan-packages',
aborted: aborted,
fn: (claim, aborted) async {
await _scanForPackageUpdates(claim, abort: aborted);
},
fn: _runOneScanPackagesUpdate,
);
final deleteLoop = _createLoop(
name: 'delete-instances',
aborted: aborted,
fn: (claim, aborted) async {
await _scanForInstanceDeletion(claim, abort: aborted);
},
fn: _runOneInstanceDeletion,
);
final scheduleLoop = _createLoop(
name: 'schedule',
final createLoop = _createLoop(
name: 'create-instances',
aborted: aborted,
fn: (claim, aborted) async {
await schedule(claim, taskWorkerCloudCompute, _db, abort: aborted);
},
fn: _runOneInstanceCreation,
);

scheduleMicrotask(() async {
// Wait for background process to finish
await Future.wait([scanLoop, deleteLoop, scheduleLoop]);
await Future.wait([scanLoop, deleteLoop, createLoop]);

// Report background processes as stopped
stopped.complete();
Expand All @@ -158,7 +153,7 @@ class TaskBackend {
Future<void> _createLoop({
required String name,
required Completer aborted,
required Future<void> Function(GlobalLockClaim claim, Completer aborted) fn,
required Future<Duration> Function(bool Function() isAbortedFn) fn,
}) {
return Future.microtask(() async {
try {
Expand All @@ -172,7 +167,19 @@ class TaskBackend {
while (!aborted.isCompleted) {
try {
await lock.withClaim((claim) async {
await fn(claim, aborted);
bool isAbortedFn() => !claim.valid || aborted.isCompleted;
while (!isAbortedFn()) {
final delay = await fn(isAbortedFn);

if (isAbortedFn()) {
return;
}
// Wait until aborted or [delay] before doing it again!
await aborted.future.timeoutWithClock(
delay,
onTimeout: () => null,
);
}
}, abort: aborted);
} catch (e, st) {
// Log this as very bad, and then move on. Nothing good can come
Expand Down Expand Up @@ -289,32 +296,9 @@ class TaskBackend {
}
}

/// Scan for updates from packages until [abort] is resolved, or [claim]
/// is lost.
Future<void> _scanForPackageUpdates(
GlobalLockClaim claim, {
Completer<void>? abort,
}) async {
abort ??= Completer<void>();

bool isAbortedFn() => !claim.valid || abort!.isCompleted;
while (!isAbortedFn()) {
await _runOneScanPackagesUpdate(isAbortedFn: isAbortedFn);

if (isAbortedFn()) {
return;
}
// Wait until aborted or 10 minutes before scanning again!
await abort.future.timeoutWithClock(
Duration(minutes: 10),
onTimeout: () => null,
);
}
}

Future<void> _runOneScanPackagesUpdate({
required bool Function() isAbortedFn,
}) async {
Future<Duration> _runOneScanPackagesUpdate(
bool Function() isAbortedFn,
) async {
final next = await calculateScanPackagesUpdatedLoop(
_scanPackagesUpdatedState,
_db.packages.listUpdatedSince(_scanPackagesUpdatedState.since),
Expand All @@ -324,45 +308,33 @@ class TaskBackend {

for (final p in next.packages) {
if (isAbortedFn()) {
return;
return Duration(minutes: 10);
}
// Check the package
await trackPackage(p, updateDependents: true);
}
}

/// Scan for compute instances that can be deleted until [abort] is resolved,
/// or [claim] is lost.
Future<void> _scanForInstanceDeletion(
GlobalLockClaim claim, {
Completer<void>? abort,
}) async {
abort ??= Completer<void>();

bool isAbortedFn() => !claim.valid || abort!.isCompleted;
while (!isAbortedFn()) {
await _runOneInstanceDeletion(isAbortedFn: isAbortedFn);

if (isAbortedFn()) {
return;
}
// Wait until aborted or 10 minutes before scanning again!
await abort.future.timeoutWithClock(
Duration(minutes: 10),
onTimeout: () => null,
);
}
return Duration(minutes: 10);
}

Future<void> _runOneInstanceDeletion({
required bool Function() isAbortedFn,
}) async {
Future<Duration> _runOneInstanceDeletion(bool Function() isAbortedFn) async {
_deleteInstancesState = await scanAndDeleteInstances(
_deleteInstancesState,
taskWorkerCloudCompute,
isAbortedFn,
maxTaskRunHours: activeConfiguration.maxTaskRunHours,
);
return Duration(minutes: 10);
}

Future<Duration> _runOneInstanceCreation(bool Function() isAbortedFn) async {
final result = await schedule(
taskWorkerCloudCompute,
_db,
state: _createInstanesState,
);
_createInstanesState = result.$1;
return result.$2;
}

Future<void> trackPackage(
Expand Down
Loading