Skip to content

Commit 9f8a25f

Browse files
authored
Store Benchmark - Part 4 (#356)
1 parent 3082ac0 commit 9f8a25f

File tree

27 files changed

+2032
-362
lines changed

27 files changed

+2032
-362
lines changed

.github/workflows/benchmark.yml

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ jobs:
127127
sleep 1
128128
done
129129
echo "Store did not become ready in time" >&2
130+
docker compose -f "$COMPOSE_FILE" logs app
130131
exit 1
131132
132133
- name: Prepare artifact directory
@@ -177,6 +178,9 @@ jobs:
177178
if [ -d docker/data/prometheus ]; then
178179
tar -C docker/data -czf "$ARTIFACT_DIR/prometheus-${SCENARIO_ID}-${BACKEND_ID}.tar.gz" prometheus
179180
fi
181+
if docker compose -f "$COMPOSE_FILE" ps --format '{{.Name}}' >/dev/null 2>&1; then
182+
docker compose -f "$COMPOSE_FILE" logs app > "$ARTIFACT_DIR/docker-${SCENARIO_ID}-${BACKEND_ID}.log" || true
183+
fi
180184
181185
- name: Upload benchmark artifacts
182186
if: ${{ always() }}
@@ -185,3 +189,146 @@ jobs:
185189
name: benchmark-${{ matrix.scenario.id }}-${{ matrix.backend.id }}
186190
path: ${{ env.ARTIFACT_DIR }}
187191
if-no-files-found: error
192+
193+
micro-benchmark:
194+
name: Micro-benchmark (${{ matrix.backend.id }}, ${{ matrix.mode.display }})
195+
runs-on: ubuntu-latest
196+
timeout-minutes: 30
197+
strategy:
198+
fail-fast: false
199+
matrix:
200+
backend:
201+
- id: memory
202+
compose_file: compose.prometheus-memory-store.yml
203+
- id: mongo
204+
compose_file: compose.prometheus-mongo-store.yml
205+
mode:
206+
- id: worker
207+
display: Update worker throughput
208+
cli: worker
209+
- id: dequeue-empty
210+
display: Dequeue empty throughput
211+
cli: dequeue-empty
212+
- id: rollout
213+
display: Rollout + span throughput
214+
cli: rollout
215+
env:
216+
STORE_URL: http://localhost:4747
217+
STORE_API_URL: http://localhost:4747/v1/agl
218+
PROM_URL: http://localhost:9090
219+
BACKEND_ID: ${{ matrix.backend.id }}
220+
MODE_ID: ${{ matrix.mode.id }}
221+
ARTIFACT_DIR: artifacts/micro-${{ matrix.mode.id }}-${{ matrix.backend.id }}
222+
COMPOSE_FILE: ${{ matrix.backend.compose_file }}
223+
AGL_STORE_N_WORKERS: 8
224+
steps:
225+
- uses: actions/checkout@v4
226+
227+
- uses: astral-sh/setup-uv@v7
228+
with:
229+
enable-cache: true
230+
python-version: '3.12'
231+
232+
- name: Sync dependencies
233+
run: uv sync --frozen --extra mongo --group core-stable --group dev
234+
235+
- name: Reset benchmark data directories
236+
run: |
237+
set -euo pipefail
238+
cd docker
239+
rm -rf data
240+
bash setup.sh
241+
242+
- name: Launch ${{ matrix.backend.id }} Prometheus stack
243+
run: |
244+
set -euo pipefail
245+
cd docker
246+
docker compose -f "$COMPOSE_FILE" down -v || true
247+
docker compose -f "$COMPOSE_FILE" up -d --quiet-pull
248+
249+
- name: Wait for store readiness
250+
run: |
251+
set -euo pipefail
252+
for attempt in {1..60}; do
253+
if curl -fsS "$STORE_API_URL/health" >/dev/null 2>&1; then
254+
exit 0
255+
fi
256+
sleep 1
257+
done
258+
echo "Store did not become ready in time" >&2
259+
cd docker && docker compose -f "$COMPOSE_FILE" logs app
260+
exit 1
261+
262+
- name: Prepare artifact directory
263+
run: mkdir -p "$ARTIFACT_DIR"
264+
265+
- name: Record micro benchmark start
266+
run: echo "BENCHMARK_START=$(date -u +%FT%TZ)" >> "$GITHUB_ENV"
267+
268+
- name: Run ${{ matrix.mode.display }}
269+
run: |
270+
set -euo pipefail
271+
mkdir -p "$ARTIFACT_DIR"
272+
uv run --locked --no-sync python -m tests.benchmark.micro_benchmark \
273+
--store-url "$STORE_URL" \
274+
--summary-file "$ARTIFACT_DIR/summary-${MODE_ID}.txt" \
275+
"${{ matrix.mode.cli }}" | tee "$ARTIFACT_DIR/micro-${MODE_ID}.txt"
276+
277+
- name: Record micro benchmark end
278+
if: ${{ always() }}
279+
run: echo "BENCHMARK_END=$(date -u +%FT%TZ)" >> "$GITHUB_ENV"
280+
281+
- name: Run micro benchmark analysis
282+
if: ${{ always() }}
283+
run: |
284+
set -euo pipefail
285+
mkdir -p "$ARTIFACT_DIR"
286+
if [ -z "${BENCHMARK_START:-}" ] || [ -z "${BENCHMARK_END:-}" ]; then
287+
echo "Analysis skipped: benchmark window not recorded." > "$ARTIFACT_DIR/analysis-${MODE_ID}.txt"
288+
exit 1
289+
fi
290+
uv run --locked --no-sync python -m tests.benchmark.analysis \
291+
--prom-url "$PROM_URL" \
292+
--store-url "$STORE_API_URL" \
293+
--start "$BENCHMARK_START" \
294+
--end "$BENCHMARK_END" \
295+
| tee "$ARTIFACT_DIR/analysis-${MODE_ID}.txt"
296+
297+
- name: Show micro benchmark summary
298+
if: ${{ always() }}
299+
run: |
300+
set -euo pipefail
301+
summary_file="$ARTIFACT_DIR/summary-${MODE_ID}.txt"
302+
if [ -f "$summary_file" ]; then
303+
echo "Micro benchmark summary ($MODE_ID/$BACKEND_ID):"
304+
cat "$summary_file"
305+
else
306+
echo "Summary file not found: $summary_file"
307+
fi
308+
309+
- name: Stop ${{ matrix.backend.id }} Prometheus stack
310+
if: ${{ always() }}
311+
run: |
312+
set -euo pipefail
313+
cd docker
314+
docker compose -f "$COMPOSE_FILE" down -v || true
315+
316+
- name: Archive Prometheus metrics
317+
if: ${{ always() }}
318+
run: |
319+
set -euo pipefail
320+
mkdir -p "$ARTIFACT_DIR"
321+
if [ -d docker/data/prometheus ]; then
322+
tar -C docker/data -czf "$ARTIFACT_DIR/prometheus-micro-${MODE_ID}-${BACKEND_ID}.tar.gz" prometheus
323+
fi
324+
if docker compose -f "$COMPOSE_FILE" ps --format '{{.Name}}' >/dev/null 2>&1; then
325+
docker compose -f "$COMPOSE_FILE" logs app > "$ARTIFACT_DIR/docker-micro-${MODE_ID}-${BACKEND_ID}.log" || true
326+
fi
327+
328+
- name: Upload micro benchmark artifacts
329+
if: ${{ always() }}
330+
uses: actions/upload-artifact@v4
331+
with:
332+
name: micro-benchmark-${{ matrix.mode.id }}-${{ matrix.backend.id }}
333+
path: ${{ env.ARTIFACT_DIR }}
334+
if-no-files-found: error

agentlightning/cli/store.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ def main(argv: Iterable[str] | None = None) -> int:
6464
setup_logging(args.log_level)
6565

6666
if args.backend == "memory":
67-
store = InMemoryLightningStore(prometheus=args.prometheus)
67+
store = InMemoryLightningStore(
68+
prometheus=args.prometheus, thread_safe=True
69+
) # Using thread_safe store for server
6870
elif args.backend == "mongo":
6971
from agentlightning.store.mongo import MongoLightningStore
7072

agentlightning/runner/agent.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def __init__(
7373
max_rollouts: Optional[int] = None,
7474
poll_interval: float = 5.0,
7575
heartbeat_interval: float = 10.0,
76-
interval_jitter: float = 0.1,
76+
interval_jitter: float = 0.5,
7777
heartbeat_launch_mode: Literal["asyncio", "thread"] = "asyncio",
7878
) -> None:
7979
"""Initialize the agent runner.
@@ -577,16 +577,6 @@ async def iter(self, *, event: Optional[ExecutionEvent] = None) -> None:
577577
if next_rollout is None:
578578
return
579579

580-
try:
581-
# Claim the rollout but updating the current worker id
582-
await store.update_attempt(
583-
next_rollout.rollout_id, next_rollout.attempt.attempt_id, worker_id=self.get_worker_id()
584-
)
585-
except Exception:
586-
# This exception could happen if the rollout is dequeued and the other end died for some reason
587-
logger.exception(f"{self._log_prefix()} Exception during update_attempt, giving up the rollout.")
588-
continue
589-
590580
# Execute the step
591581
await self._step_impl(next_rollout)
592582

@@ -640,12 +630,8 @@ async def step(
640630
else:
641631
resources_id = None
642632

643-
attempted_rollout = await self.get_store().start_rollout(input=input, mode=mode, resources_id=resources_id)
644-
# Register the attempt as running by the current worker
645-
await self.get_store().update_attempt(
646-
attempted_rollout.rollout_id,
647-
attempted_rollout.attempt.attempt_id,
648-
worker_id=self.get_worker_id(),
633+
attempted_rollout = await self.get_store().start_rollout(
634+
input=input, mode=mode, resources_id=resources_id, worker_id=self.get_worker_id()
649635
)
650636
rollout_id = await self._step_impl(attempted_rollout, raise_on_exception=True)
651637

agentlightning/store/base.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
Attempt,
1111
AttemptedRollout,
1212
AttemptStatus,
13+
EnqueueRolloutRequest,
1314
NamedResources,
1415
ResourcesUpdate,
1516
Rollout,
@@ -100,19 +101,6 @@ class LightningStoreStatistics(TypedDict, total=False):
100101
"""Memory capacity of the store in bytes."""
101102

102103

103-
class _EnqueueRolloutRequestRequired(TypedDict):
104-
input: TaskInput
105-
106-
107-
class EnqueueRolloutRequest(_EnqueueRolloutRequestRequired, total=False):
108-
"""Payload describing a rollout to be queued via `enqueue_rollout`."""
109-
110-
mode: Optional[RolloutMode]
111-
resources_id: Optional[str]
112-
config: Optional[RolloutConfig]
113-
metadata: Optional[Dict[str, Any]]
114-
115-
116104
class LightningStore:
117105
"""Contract for the persistent control-plane that coordinates training rollouts.
118106
@@ -174,6 +162,7 @@ async def start_rollout(
174162
resources_id: str | None = None,
175163
config: RolloutConfig | None = None,
176164
metadata: Dict[str, Any] | None = None,
165+
worker_id: str | None = None,
177166
) -> AttemptedRollout:
178167
"""Register a rollout and immediately create its first attempt.
179168
@@ -196,6 +185,7 @@ async def start_rollout(
196185
resources_id: Concrete resource snapshot to execute against; defaults to the latest stored snapshot.
197186
config: Rollout retry/timeout policy. Should default to a fresh [`RolloutConfig`][agentlightning.RolloutConfig].
198187
metadata: Free-form metadata persisted verbatim with the rollout.
188+
worker_id: Optional worker identifier to associate the new attempt with.
199189
200190
Returns:
201191
The fully-populated [`AttemptedRollout`][agentlightning.AttemptedRollout] including
@@ -241,19 +231,19 @@ async def enqueue_rollout(
241231
"""
242232
raise NotImplementedError()
243233

244-
async def enqueue_many_rollouts(self, inputs: Sequence[EnqueueRolloutRequest]) -> Sequence[Rollout]:
234+
async def enqueue_many_rollouts(self, rollouts: Sequence[EnqueueRolloutRequest]) -> Sequence[Rollout]:
245235
"""Persist multiple rollouts in `queuing` state.
246236
247237
The implementation can delegate to [`enqueue_rollout()`][agentlightning.LightningStore.enqueue_rollout]
248238
per request and preserves the input ordering. Subclasses can override to provide
249239
more efficient bulk enqueue semantics.
250240
251241
Args:
252-
inputs: Rollout submission payloads mirroring [`enqueue_rollout()`][agentlightning.LightningStore.enqueue_rollout]'s
242+
rollouts: Rollout submission payloads mirroring [`enqueue_rollout()`][agentlightning.LightningStore.enqueue_rollout]'s
253243
parameters. Each entry requires `input` and can optionally include other fields.
254244
255245
Returns:
256-
Rollouts enqueued in the same order as `inputs`.
246+
Rollouts enqueued in the same order as `rollouts`.
257247
"""
258248
raise NotImplementedError()
259249

@@ -273,6 +263,9 @@ async def dequeue_rollout(self, worker_id: Optional[str] = None) -> Optional[Att
273263
* Optionally refresh the caller's [`Worker`][agentlightning.Worker] telemetry
274264
(e.g., `last_dequeue_time`) when `worker_id` is provided.
275265
266+
Args:
267+
worker_id: Optional worker identifier to associate the claimed attempt with.
268+
276269
Returns:
277270
The next attempt to execute, or `None` when no eligible rollouts are queued.
278271
@@ -304,7 +297,7 @@ async def dequeue_many_rollouts(
304297
"""
305298
raise NotImplementedError()
306299

307-
async def start_attempt(self, rollout_id: str) -> AttemptedRollout:
300+
async def start_attempt(self, rollout_id: str, worker_id: Optional[str] = None) -> AttemptedRollout:
308301
"""Create a manual retry attempt for an existing rollout.
309302
310303
This is typically invoked by runners that wish to retry outside of the
@@ -315,6 +308,7 @@ async def start_attempt(self, rollout_id: str) -> AttemptedRollout:
315308
316309
Args:
317310
rollout_id: Unique identifier of the rollout receiving a new attempt.
311+
worker_id: Optional worker identifier to associate the new attempt with.
318312
319313
Returns:
320314
The rollout paired with its newly-created attempt.

0 commit comments

Comments
 (0)