-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquarantine.ts
More file actions
138 lines (124 loc) · 3.88 KB
/
quarantine.ts
File metadata and controls
138 lines (124 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import { get, put } from "@tigrisdata/storage";
import { teardownForks } from "@tigrisdata/agent-kit";
import type { TigrisAgentKitConfig } from "@tigrisdata/agent-kit";
import type { QuarantineRecord } from "./types.js";
const QUARANTINE_KEY = ".tigris-mastra/quarantines.json";
interface SerializedRecord {
bucket: string;
reason: string;
quarantinedAt: string;
parentBucket: string;
parentCheckpointId?: string;
}
interface QuarantineFile {
records: SerializedRecord[];
}
function serialize(r: QuarantineRecord): SerializedRecord {
return {
bucket: r.bucket,
reason: r.reason,
quarantinedAt: r.quarantinedAt.toISOString(),
parentBucket: r.parentBucket,
parentCheckpointId: r.parentCheckpointId,
};
}
function deserialize(s: SerializedRecord): QuarantineRecord {
return {
bucket: s.bucket,
reason: s.reason,
quarantinedAt: new Date(s.quarantinedAt),
parentBucket: s.parentBucket,
parentCheckpointId: s.parentCheckpointId,
};
}
async function readRegistry(
source: string,
config?: TigrisAgentKitConfig,
): Promise<QuarantineFile> {
const result = await get(QUARANTINE_KEY, "string", {
config: { ...config, bucket: source },
});
if (result.error || !result.data) {
return { records: [] };
}
try {
const parsed = JSON.parse(result.data) as QuarantineFile;
if (!parsed.records || !Array.isArray(parsed.records)) {
return { records: [] };
}
return parsed;
} catch {
return { records: [] };
}
}
async function writeRegistry(
source: string,
file: QuarantineFile,
config?: TigrisAgentKitConfig,
): Promise<void> {
const result = await put(QUARANTINE_KEY, JSON.stringify(file, null, 2), {
contentType: "application/json",
allowOverwrite: true,
config: { ...config, bucket: source },
});
if (result.error) {
throw new Error(`Quarantine registry write failed: ${result.error.message}`, {
cause: result.error,
});
}
}
/** Append a record to the quarantine registry stored in the source bucket. */
export async function recordQuarantine(
record: QuarantineRecord,
config?: TigrisAgentKitConfig,
): Promise<void> {
const file = await readRegistry(record.parentBucket, config);
file.records.push(serialize(record));
await writeRegistry(record.parentBucket, file, config);
}
export interface ListQuarantinesOptions {
/** Source bucket whose registry to read. Required — quarantine registries are per-source. */
source: string;
config?: TigrisAgentKitConfig;
}
/** Read the quarantine registry from a source bucket. */
export async function listQuarantines(
opts: ListQuarantinesOptions,
): Promise<QuarantineRecord[]> {
const file = await readRegistry(opts.source, opts.config);
return file.records.map(deserialize);
}
export interface TeardownQuarantineOptions {
/** Source bucket whose registry holds this entry. */
source: string;
config?: TigrisAgentKitConfig;
}
/**
* Delete a quarantined fork bucket and remove it from the registry.
*
* Note: this needs the original `Forks` object for `teardownForks`. Since we only
* persisted the bucket name in the registry, we reconstruct a minimal `Forks` shape.
* The agent-kit `teardownForks` only inspects `forks[*].bucket` and `baseBucket`.
*/
export async function teardownQuarantine(
bucket: string,
opts: TeardownQuarantineOptions,
): Promise<void> {
const file = await readRegistry(opts.source, opts.config);
const remaining = file.records.filter((r) => r.bucket !== bucket);
// Reconstruct a Forks-shaped object for teardownForks.
const td = await teardownForks(
{
baseBucket: opts.source,
snapshotId: "",
forks: [{ bucket }],
},
{ config: opts.config },
);
if (td.error) {
throw new Error(`teardownForks failed for quarantined bucket '${bucket}': ${td.error.message}`, {
cause: td.error,
});
}
await writeRegistry(opts.source, { records: remaining }, opts.config);
}