Skip to content

Commit ce3bc4d

Browse files
Grouped and packed encoding of task submissions (#25)
* Grouped and packed encoding of task submissions * Variable naming to indicate pointer/value
1 parent bfb44a2 commit ce3bc4d

2 files changed

Lines changed: 71 additions & 17 deletions

File tree

apis/workflows/v1/core.proto

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,6 @@ message Tasks {
182182

183183
// TaskSubmission is a message of a task that is just about to be submitted, either by submitting a job or as a subtask.
184184
message TaskSubmission {
185-
option (buf.validate.message).oneof = {
186-
fields: [
187-
"input",
188-
"inputs"
189-
]
190-
required: true
191-
};
192-
193185
// The cluster that this task should be run on
194186
string cluster_slug = 1;
195187
// The task identifier
@@ -206,15 +198,72 @@ message TaskSubmission {
206198

207199
// The serialized task instance, if there is only a single instance.
208200
bytes input = 3 [(buf.validate.field).bytes.max_len = 2048];
201+
}
209202

210-
// A list of serialized task instances, all sharing the same task properties. This is useful for cases where we have
211-
// a larger number of very similar subtasks, but only the input parameters vary.
212-
repeated bytes inputs = 7 [(buf.validate.field).repeated = {
213-
items: {
214-
bytes: {max_len: 2048}
215-
}
216-
max_items: 100000 // maximum of 100k subtasks in a single subtask tree
217-
}];
203+
// TaskSubmissions is a structure for representing a set of tasks about to be submitted, either as a job or as subtasks.
204+
// It is optimized for efficient serialization for cases where a large number of very similar tasks are submitted,
205+
// with potentially only the individual input parameters varying.
206+
// To reduce the serialization size, we keep a separate list/lookup table of unique task properties, that can then be
207+
// referenced by their index.
208+
message TaskSubmissions {
209+
// Concrete instantiations of tasks, grouped by their dependencies and dependants. Each group is uniquely defined by
210+
// the set of groups that it depends on (dependencies_on_other_groups) and the set of groups that depend on it,
211+
// (which is implicitly given by the inverse of the dependencies on other groups).
212+
repeated TaskSubmissionGroup task_groups = 1;
213+
// Unique values of cluster slugs, referenced by index in the task instantiations.
214+
repeated string cluster_slug_lookup = 2;
215+
// Unique values of task identifiers, referenced by index in the task instantiations.
216+
repeated TaskIdentifier identifier_lookup = 3;
217+
// Unique values of display names, referenced by index in the task instantiations.
218+
repeated string display_lookup = 4 [(buf.validate.field).repeated.items.string.min_len = 1];
219+
}
220+
221+
// TaskSubmissionGroup is a structure for representing a list of submitted tasks, that all share the exact same
222+
// dependencies and dependants. Grouping tasks by their dependency edges, and then converting task dependencies to
223+
// group dependencies can help to drastically reduce the number of edges we need to serialize and transmit.
224+
// Dependants are not explicitly specified, since they can be inferred from the dependencies of the other groups
225+
// in the containing TaskSubmissions message. This means that the `dependencies_on_other_groups` field is not unique,
226+
// across groups, since there may be two groups sharing the same dependencies but having different dependants.
227+
message TaskSubmissionGroup {
228+
option (buf.validate.message).cel = {
229+
id: "task_submission_group.identifiers_size_match"
230+
message: "The number of inputs must match the number of task identifiers."
231+
expression: "this.inputs.size() == this.identifier_pointers.size()"
232+
};
233+
option (buf.validate.message).cel = {
234+
id: "task_submission_group.cluster_slugs_size_match"
235+
message: "The number of cluster slugs must match the number of inputs."
236+
expression: "this.inputs.size() == this.cluster_slug_pointers.size()"
237+
};
238+
option (buf.validate.message).cel = {
239+
id: "task_submission_group.displays_size_match"
240+
message: "The number of display pointers must match the number of inputs."
241+
expression: "this.inputs.size() == this.display_pointers.size()"
242+
};
243+
option (buf.validate.message).cel = {
244+
id: "task_submission_group.max_retries_size_match"
245+
message: "The number of max_retries_values must match the number of inputs."
246+
expression: "this.inputs.size() == this.max_retries_values.size()"
247+
};
248+
249+
// The indices of the groups that this submission group depends on. Indices refer to the groups field of the
250+
// containing TaskSubmissions message.
251+
repeated uint32 dependencies_on_other_groups = 1;
252+
// The input parameters for each task.
253+
// We explicitly don't group the fields into a submessage and then have a single repeated field for that submessage,
254+
// to enable packed encoding of the repeated fields.
255+
repeated bytes inputs = 2 [(buf.validate.field).repeated.items.bytes.max_len = 2048];
256+
// Index of the task identifier in the identifier_lookup field of the containing TaskSubmissions message
257+
// for each task.
258+
repeated uint64 identifier_pointers = 3;
259+
// Index of the cluster slug in the cluster_slug_lookup field of the containing TaskSubmissions message for each task,
260+
// indicating the cluster that the task should be run on.
261+
repeated uint64 cluster_slug_pointers = 4;
262+
// Index of the display name in the display_lookup field of the containing TaskSubmissions message for each task,
263+
// specifying a human-readable description of the task.
264+
repeated uint64 display_pointers = 5;
265+
// The maximum number of retries for each task. Not a pointer to a lookup table, since we just inline the values.
266+
repeated int64 max_retries_values = 6;
218267
}
219268

220269
// A lease for a task.

apis/workflows/v1/task.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@ message ComputedTask {
3636
// If not set, the display message specified upon task submission will be kept.
3737
string display = 2;
3838
// A list of sub-tasks that the just computed task spawned.
39-
repeated TaskSubmission sub_tasks = 3 [(buf.validate.field).repeated.max_items = 64];
39+
repeated TaskSubmission legacy_sub_tasks = 3 [
40+
deprecated = true,
41+
(buf.validate.field).repeated.max_items = 64
42+
];
43+
// A list of sub-tasks that the just computed task spawned.
44+
TaskSubmissions sub_tasks = 5;
4045
// A list of progress updates that the computed task wants to report.
4146
repeated Progress progress_updates = 4;
4247
}

0 commit comments

Comments
 (0)