@@ -12,9 +12,11 @@ import (
1212 "net/http"
1313 "strconv"
1414 "strings"
15+ "sync/atomic"
1516 "time"
1617
1718 "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/core/statejson"
19+ "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror"
1820 "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
1921 "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore"
2022 "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone"
@@ -28,6 +30,19 @@ type CustomInteropServer struct {
2830 localStackAdapter * LocalStackAdapter
2931 port string
3032 upstreamEndpoint string
33+ // eventsAPI renders the synthetic START/INIT_REPORT log lines from rapid's lifecycle
34+ // events and records the init outcome (error type, cold-start duration) — see events.go.
35+ eventsAPI * LocalStackEventsAPI
36+ // initErrorPayload stashes the structured error payload the runtime reported via
37+ // /init/error ([]byte), so ReportInitFailure can forward the runtime's own error to
38+ // LocalStack instead of a synthesized one. Written from the runtime API handler flow and
39+ // read from the main flow after init failed, hence atomic.
40+ initErrorPayload atomic.Value
41+ // resetDone signals (non-blocking, buffered-1) that a Reset has fully completed
42+ // — i.e. delegate.Reset returned after the underlying Server.Release. The main flow
43+ // waits on this when a hot-reload reset aborts the init phase, so the ready signal is
44+ // ordered after the reset's Release and cannot cancel the first invoke's reservation.
45+ resetDone chan struct {}
3146}
3247
3348type LocalStackAdapter struct {
@@ -42,23 +57,31 @@ const (
4257 Error LocalStackStatus = "error"
4358)
4459
45- func (l * LocalStackAdapter ) SendStatus (status LocalStackStatus , payload []byte ) error {
46- statusUrl := fmt .Sprintf ("%s/status/%s/%s" , l .UpstreamEndpoint , l .RuntimeId , status )
47- _ , err := http .Post (statusUrl , "application/json" , bytes .NewReader (payload ))
60+ // post sends a JSON payload to the given LocalStack endpoint path and fails on non-2xx
61+ // responses (e.g. LocalStack rejects a duplicate /status/error with 400).
62+ func (l * LocalStackAdapter ) post (path string , payload []byte ) error {
63+ resp , err := http .Post (l .UpstreamEndpoint + path , "application/json" , bytes .NewReader (payload ))
4864 if err != nil {
4965 return err
5066 }
67+ defer resp .Body .Close ()
68+ if resp .StatusCode < 200 || resp .StatusCode >= 300 {
69+ return fmt .Errorf ("POST %s returned status %d" , path , resp .StatusCode )
70+ }
5171 return nil
5272}
5373
74+ func (l * LocalStackAdapter ) SendStatus (status LocalStackStatus , payload []byte ) error {
75+ return l .post (fmt .Sprintf ("/status/%s/%s" , l .RuntimeId , status ), payload )
76+ }
77+
5478// SendLogs posts the captured invocation logs to LocalStack.
5579func (l * LocalStackAdapter ) SendLogs (invokeId string , logs lsapi.LogResponse ) error {
5680 serialized , err := json .Marshal (logs )
5781 if err != nil {
5882 return err
5983 }
60- _ , err = http .Post (l .UpstreamEndpoint + "/invocations/" + invokeId + "/logs" , "application/json" , bytes .NewReader (serialized ))
61- return err
84+ return l .post ("/invocations/" + invokeId + "/logs" , serialized )
6285}
6386
6487// SendResult posts the invocation result body to LocalStack.
@@ -78,11 +101,10 @@ func (l *LocalStackAdapter) SendResult(invokeId string, body []byte, isError boo
78101 } else {
79102 log .Infoln ("Sending to /response" )
80103 }
81- _ , err := http .Post (l .UpstreamEndpoint + endpoint , "application/json" , bytes .NewReader (body ))
82- return err
104+ return l .post (endpoint , body )
83105}
84106
85- func NewCustomInteropServer (lsOpts * LsOpts , delegate interop.Server , logCollector * LogCollector ) (server * CustomInteropServer ) {
107+ func NewCustomInteropServer (lsOpts * LsOpts , delegate interop.Server , logCollector * LogCollector , eventsAPI * LocalStackEventsAPI ) (server * CustomInteropServer ) {
86108 server = & CustomInteropServer {
87109 delegate : delegate .(* rapidcore.Server ),
88110 port : lsOpts .InteropPort ,
@@ -91,6 +113,8 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
91113 UpstreamEndpoint : lsOpts .RuntimeEndpoint ,
92114 RuntimeId : lsOpts .RuntimeId ,
93115 },
116+ eventsAPI : eventsAPI ,
117+ resetDone : make (chan struct {}, 1 ),
94118 }
95119
96120 // TODO: extract this
@@ -110,8 +134,14 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
110134 }
111135
112136 invokeResp := & standalone.ResponseWriterProxy {}
113- functionVersion := GetEnvOrDie ("AWS_LAMBDA_FUNCTION_VERSION" ) // default $LATEST
114- _ , _ = fmt .Fprintf (logCollector , "START RequestId: %s Version: %s\n " , invokeR .InvokeId , functionVersion )
137+ // The synthetic START and INIT_REPORT lines are emitted via LocalStackEventsAPI
138+ // from rapid's lifecycle events, so they land at the AWS-faithful points (e.g.
139+ // after an inline suppressed init's own logs) — see events.go.
140+
141+ // First invocation into a successfully initialized on-demand environment: REPORT
142+ // carries the Init phase duration as measured by rapid (take-once; absent on warm
143+ // starts, failed/timed-out inits, and non-on-demand environments).
144+ initDurationMS , hasInitDuration := server .eventsAPI .TakeColdStartInitDuration ()
115145
116146 invokeStart := time .Now ()
117147 err = server .Invoke (invokeResp , & interop.Invoke {
@@ -134,15 +164,18 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
134164 })
135165 timeout := int (server .delegate .GetInvokeTimeout ().Seconds ())
136166 isErr := false
167+ status := ""
168+ errorType := ""
137169 if err != nil {
138170 switch {
139171 case errors .Is (err , rapidcore .ErrInvokeTimeout ):
140172 log .Debugf ("Got invoke timeout" )
141173 isErr = true
174+ status = "timeout"
142175 errorResponse := lsapi.ErrorResponse {
176+ ErrorType : "Sandbox.Timedout" ,
143177 ErrorMessage : fmt .Sprintf (
144- "%s %s Task timed out after %d.00 seconds" ,
145- time .Now ().Format ("2006-01-02T15:04:05Z" ),
178+ "RequestId: %s Error: Task timed out after %d.00 seconds" ,
146179 invokeR .InvokeId ,
147180 timeout ,
148181 ),
@@ -156,7 +189,15 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
156189 log .Fatalln ("unable to write to response" )
157190 }
158191 case errors .Is (err , rapidcore .ErrInvokeDoneFailed ):
159- // we can actually just continue here, error message is sent below
192+ // The error response body was already written by rapid and is sent below.
193+ // When an init failure was folded into this invocation (AWS suppressed
194+ // init), the REPORT additionally carries the failure status and the
195+ // scrubbed fatal error type (e.g. Runtime.Unknown).
196+ if errType := server .eventsAPI .InitErrorType (); errType != "" {
197+ isErr = true
198+ status = "error"
199+ errorType = errType
200+ }
160201 default :
161202 log .Fatalln (err )
162203 }
@@ -171,7 +212,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
171212 }
172213 timeoutDuration := time .Duration (timeout ) * time .Second
173214 memorySize := GetEnvOrDie ("AWS_LAMBDA_FUNCTION_MEMORY_SIZE" )
174- PrintEndReports (invokeR .InvokeId , "" , memorySize , invokeStart , timeoutDuration , logCollector )
215+ PrintEndReports (invokeR .InvokeId , initDurationMS , hasInitDuration , status , errorType , memorySize , invokeStart , timeoutDuration , logCollector )
175216
176217 if err2 := server .localStackAdapter .SendLogs (invokeR .InvokeId , logCollector .getLogs ()); err2 != nil {
177218 log .Error ("failed to send logs to LocalStack: " , err2 )
@@ -204,15 +245,74 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E
204245 return c .delegate .SendErrorResponse (invokeID , resp )
205246}
206247
207- // SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT.
248+ // SendInitErrorResponse stashes the init error reported by the runtime (via /init/error) for
249+ // ReportInitFailure and propagates it to the delegate, which caches it so the first invoke
250+ // can surface it. The delegate's error is returned because the /runtime/init/error handler
251+ // renders an interop error to the runtime based on it (e.g. ErrResponseSent during a
252+ // suppressed init).
208253func (c * CustomInteropServer ) SendInitErrorResponse (resp * interop.ErrorInvokeResponse ) error {
209254 log .Traceln ("SendInitErrorResponse called" )
210- if err := c .localStackAdapter .SendStatus (Error , resp .Payload ); err != nil {
211- log .Fatalln ("Failed to send init error to LocalStack " + err .Error () + ". Exiting." )
212- }
255+ c .initErrorPayload .Store (resp .Payload )
213256 return c .delegate .SendInitErrorResponse (resp )
214257}
215258
259+ // ReportInitFailure reports a failed initialization to LocalStack via /status/error, failing
260+ // the environment's startup. It forwards the runtime's own /init/error payload when one was
261+ // reported, and synthesizes a structured error from the given type and message otherwise
262+ // (e.g. when the runtime crashed, called sys.exit, or had an invalid entrypoint).
263+ // Only main.go calls this, and only for environments that fail provisioning-time (extended
264+ // init: provisioned concurrency / Managed Instances); on-demand environments fold init
265+ // failures into the first invocation instead.
266+ func (c * CustomInteropServer ) ReportInitFailure (errType fatalerror.ErrorType , message string ) {
267+ payload , _ := c .initErrorPayload .Load ().([]byte )
268+ if payload == nil {
269+ // Match AWS's fault message format "RequestId: <id> Error: <msg>". No invocation is
270+ // active during the init phase (LocalStack only dispatches invokes after the runtime
271+ // reports ready), so the request id is blank — matching the /init/error path below,
272+ // which forwards AWS's blank init-phase requestId.
273+ body , err := json .Marshal (lsapi.ErrorResponse {
274+ ErrorType : string (errType ),
275+ ErrorMessage : fmt .Sprintf ("RequestId: %s Error: %s" , c .delegate .GetCurrentInvokeID (), message ),
276+ })
277+ if err != nil {
278+ log .WithError (err ).Error ("Failed to marshal init error response" )
279+ return
280+ }
281+ payload = body
282+ } else if adapted := adaptInitErrorPayload (payload , c .delegate .GetCurrentInvokeID ()); adapted != nil {
283+ payload = adapted
284+ }
285+
286+ if err := c .localStackAdapter .SendStatus (Error , payload ); err != nil {
287+ log .WithError (err ).WithField ("runtime-id" , c .localStackAdapter .RuntimeId ).
288+ Error ("Failed to send init error to LocalStack" )
289+ }
290+ }
291+
292+ // adaptInitErrorPayload injects the requestId into the runtime's structured /init/error
293+ // payload, preserving all other fields exactly as the runtime emitted them — in particular an
294+ // empty but present "stackTrace": [] (e.g. Runtime.HandlerNotFound), which a typed struct with
295+ // omitempty would drop on re-marshal.
296+ //
297+ // AWS includes a (blank) "requestId" in runtime-reported init error payloads but NOT in
298+ // platform-synthesized ones (e.g. Runtime.ExitError) — see the lsapi.ErrorResponse doc for the
299+ // AWS-snapshot evidence. The two ReportInitFailure paths intentionally differ in this regard.
300+ // Returns nil if the payload cannot be adapted (it is then forwarded unmodified).
301+ func adaptInitErrorPayload (payload []byte , requestID string ) []byte {
302+ var fields map [string ]any
303+ if err := json .Unmarshal (payload , & fields ); err != nil {
304+ log .WithError (err ).Warn ("Failed to parse init error payload; forwarding raw payload" )
305+ return nil
306+ }
307+ fields ["requestId" ] = requestID
308+ adapted , err := json .Marshal (fields )
309+ if err != nil {
310+ log .WithError (err ).Error ("Failed to marshal adapted init error payload" )
311+ return nil
312+ }
313+ return adapted
314+ }
315+
216316func (c * CustomInteropServer ) GetCurrentInvokeID () string {
217317 log .Traceln ("GetCurrentInvokeID called" )
218318 return c .delegate .GetCurrentInvokeID ()
@@ -245,7 +345,15 @@ func (c *CustomInteropServer) Reserve(id string, traceID, lambdaSegmentID string
245345
246346func (c * CustomInteropServer ) Reset (reason string , timeoutMs int64 ) (* statejson.ResetDescription , error ) {
247347 log .Traceln ("Reset called" )
248- return c .delegate .Reset (reason , timeoutMs )
348+ resp , err := c .delegate .Reset (reason , timeoutMs )
349+ // delegate.Reset has returned, so the reset (including Server.Release) is complete.
350+ // Signal a waiter (the main flow on a hot-reload reset during init) without blocking
351+ // normal post-init reloads, which have no reader.
352+ select {
353+ case c .resetDone <- struct {}{}:
354+ default :
355+ }
356+ return resp , err
249357}
250358
251359func (c * CustomInteropServer ) AwaitRelease () (* statejson.ReleaseResponse , error ) {
0 commit comments