|
| 1 | +using System.Diagnostics; |
| 2 | +using Docker.DotNet; |
| 3 | +using Docker.DotNet.Models; |
| 4 | +using Microsoft.Extensions.Logging; |
| 5 | +using RockBot.Host; |
| 6 | +using RockBot.Messaging; |
| 7 | + |
| 8 | +namespace RockBot.Scripts.Docker; |
| 9 | + |
| 10 | +/// <summary> |
| 11 | +/// Handles script invocation requests by creating ephemeral Docker containers. |
| 12 | +/// </summary> |
| 13 | +internal sealed class DockerScriptHandler( |
| 14 | + IDockerClient docker, |
| 15 | + IMessagePublisher publisher, |
| 16 | + DockerScriptOptions options, |
| 17 | + AgentIdentity agent, |
| 18 | + ILogger<DockerScriptHandler> logger) : IMessageHandler<ScriptInvokeRequest> |
| 19 | +{ |
| 20 | + public async Task HandleAsync(ScriptInvokeRequest request, MessageHandlerContext context) |
| 21 | + { |
| 22 | + var replyTo = context.Envelope.ReplyTo ?? options.DefaultResultTopic; |
| 23 | + var correlationId = context.Envelope.CorrelationId; |
| 24 | + string? containerId = null; |
| 25 | + |
| 26 | + try |
| 27 | + { |
| 28 | + var createParams = BuildCreateParameters(request); |
| 29 | + var sw = Stopwatch.StartNew(); |
| 30 | + |
| 31 | + logger.LogDebug("Creating script container for call {ToolCallId}", request.ToolCallId); |
| 32 | + |
| 33 | + var createResponse = await docker.Containers.CreateContainerAsync(createParams, context.CancellationToken); |
| 34 | + containerId = createResponse.ID; |
| 35 | + |
| 36 | + await docker.Containers.StartContainerAsync(containerId, new ContainerStartParameters(), context.CancellationToken); |
| 37 | + |
| 38 | + using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken); |
| 39 | + cts.CancelAfter(TimeSpan.FromSeconds(request.TimeoutSeconds + 5)); |
| 40 | + |
| 41 | + string? stdout = null; |
| 42 | + string? stderr = null; |
| 43 | + int exitCode; |
| 44 | + |
| 45 | + try |
| 46 | + { |
| 47 | + var waitResponse = await docker.Containers.WaitContainerAsync(containerId, cts.Token); |
| 48 | + sw.Stop(); |
| 49 | + |
| 50 | + (stdout, stderr) = await ReadLogsAsync(containerId, context.CancellationToken); |
| 51 | + exitCode = (int)waitResponse.StatusCode; |
| 52 | + } |
| 53 | + catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested) |
| 54 | + { |
| 55 | + throw; |
| 56 | + } |
| 57 | + catch (OperationCanceledException) |
| 58 | + { |
| 59 | + sw.Stop(); |
| 60 | + exitCode = -1; |
| 61 | + stderr = $"Container timed out after {request.TimeoutSeconds}s"; |
| 62 | + } |
| 63 | + |
| 64 | + var response = new ScriptInvokeResponse |
| 65 | + { |
| 66 | + ToolCallId = request.ToolCallId, |
| 67 | + Output = stdout, |
| 68 | + Stderr = stderr, |
| 69 | + ExitCode = exitCode, |
| 70 | + ElapsedMs = sw.ElapsedMilliseconds |
| 71 | + }; |
| 72 | + |
| 73 | + var envelope = response.ToEnvelope<ScriptInvokeResponse>( |
| 74 | + source: agent.Name, |
| 75 | + correlationId: correlationId); |
| 76 | + |
| 77 | + await publisher.PublishAsync(replyTo, envelope, context.CancellationToken); |
| 78 | + } |
| 79 | + catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested) |
| 80 | + { |
| 81 | + throw; |
| 82 | + } |
| 83 | + catch (Exception ex) |
| 84 | + { |
| 85 | + logger.LogWarning(ex, "Script execution failed for call {ToolCallId}", request.ToolCallId); |
| 86 | + |
| 87 | + var response = new ScriptInvokeResponse |
| 88 | + { |
| 89 | + ToolCallId = request.ToolCallId, |
| 90 | + Stderr = ex.Message, |
| 91 | + ExitCode = -1, |
| 92 | + ElapsedMs = 0 |
| 93 | + }; |
| 94 | + |
| 95 | + var envelope = response.ToEnvelope<ScriptInvokeResponse>( |
| 96 | + source: agent.Name, |
| 97 | + correlationId: correlationId); |
| 98 | + |
| 99 | + await publisher.PublishAsync(replyTo, envelope, context.CancellationToken); |
| 100 | + } |
| 101 | + finally |
| 102 | + { |
| 103 | + if (containerId is not null) |
| 104 | + { |
| 105 | + try |
| 106 | + { |
| 107 | + await docker.Containers.RemoveContainerAsync(containerId, |
| 108 | + new ContainerRemoveParameters { Force = true, RemoveVolumes = true }); |
| 109 | + } |
| 110 | + catch (Exception ex) |
| 111 | + { |
| 112 | + logger.LogDebug(ex, "Failed to remove script container {ContainerId}", containerId); |
| 113 | + } |
| 114 | + } |
| 115 | + } |
| 116 | + } |
| 117 | + |
| 118 | + private CreateContainerParameters BuildCreateParameters(ScriptInvokeRequest request) |
| 119 | + { |
| 120 | + var scriptCommand = ""; |
| 121 | + if (request.PipPackages is { Count: > 0 }) |
| 122 | + { |
| 123 | + scriptCommand += $"pip install --quiet --target /tmp/pypackages {string.Join(' ', request.PipPackages)} 2>&1 && "; |
| 124 | + scriptCommand += "PYTHONPATH=/tmp/pypackages "; |
| 125 | + } |
| 126 | + scriptCommand += "python -c \"$ROCKBOT_SCRIPT\" 2>&1"; |
| 127 | + |
| 128 | + var env = new List<string> |
| 129 | + { |
| 130 | + $"ROCKBOT_SCRIPT={request.Script}", |
| 131 | + $"ROCKBOT_INPUT={request.InputData}" |
| 132 | + }; |
| 133 | + |
| 134 | + if (!string.IsNullOrEmpty(options.StagingUrl)) |
| 135 | + env.Add($"ROCKBOT_STAGING_URL={options.StagingUrl}"); |
| 136 | + if (!string.IsNullOrEmpty(options.StagingToken)) |
| 137 | + env.Add($"ROCKBOT_STAGING_TOKEN={options.StagingToken}"); |
| 138 | + |
| 139 | + return new CreateContainerParameters |
| 140 | + { |
| 141 | + Image = options.Image, |
| 142 | + Cmd = ["sh", "-c", scriptCommand], |
| 143 | + User = "1000", |
| 144 | + Env = env, |
| 145 | + Labels = new Dictionary<string, string> |
| 146 | + { |
| 147 | + ["app"] = "rockbot-script", |
| 148 | + ["rockbot.dev/tool-call-id"] = request.ToolCallId |
| 149 | + }, |
| 150 | + HostConfig = new HostConfig |
| 151 | + { |
| 152 | + NetworkMode = options.NetworkMode, |
| 153 | + ReadonlyRootfs = true, |
| 154 | + Tmpfs = new Dictionary<string, string> { ["/tmp"] = "" }, |
| 155 | + NanoCPUs = options.GetNanoCpus(), |
| 156 | + Memory = options.GetMemoryBytes(), |
| 157 | + SecurityOpt = ["no-new-privileges"], |
| 158 | + AutoRemove = false, |
| 159 | + RestartPolicy = new RestartPolicy { Name = RestartPolicyKind.No } |
| 160 | + } |
| 161 | + }; |
| 162 | + } |
| 163 | + |
| 164 | + private async Task<(string stdout, string stderr)> ReadLogsAsync(string containerId, CancellationToken ct) |
| 165 | + { |
| 166 | + var logStream = await docker.Containers.GetContainerLogsAsync( |
| 167 | + containerId, |
| 168 | + tty: false, |
| 169 | + new ContainerLogsParameters { ShowStdout = true, ShowStderr = true }, |
| 170 | + ct); |
| 171 | + |
| 172 | + return await logStream.ReadOutputToEndAsync(ct); |
| 173 | + } |
| 174 | +} |
0 commit comments