Skip to content

Commit 7964e93

Browse files
google-genai-botcopybara-github
authored andcommitted
fix: Fix race condition and stale session in ADK Runner
PiperOrigin-RevId: 896517211
1 parent 51f4d1f commit 7964e93

File tree

4 files changed

+340
-19
lines changed

4 files changed

+340
-19
lines changed

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.google.adk.utils.CollectionUtils;
4646
import com.google.common.base.Preconditions;
4747
import com.google.common.collect.ImmutableList;
48+
import com.google.common.collect.MapMaker;
4849
import com.google.errorprone.annotations.CanIgnoreReturnValue;
4950
import com.google.genai.types.AudioTranscriptionConfig;
5051
import com.google.genai.types.Content;
@@ -57,13 +58,15 @@
5758
import io.reactivex.rxjava3.core.Flowable;
5859
import io.reactivex.rxjava3.core.Maybe;
5960
import io.reactivex.rxjava3.core.Single;
61+
import io.reactivex.rxjava3.subjects.CompletableSubject;
6062
import java.util.ArrayList;
6163
import java.util.Arrays;
6264
import java.util.Collections;
6365
import java.util.List;
6466
import java.util.Map;
6567
import java.util.Optional;
6668
import java.util.concurrent.ConcurrentHashMap;
69+
import java.util.concurrent.ConcurrentMap;
6770
import org.jspecify.annotations.Nullable;
6871

6972
/** The main class for the GenAI Agents runner. */
@@ -76,6 +79,8 @@ public class Runner {
7679
private final PluginManager pluginManager;
7780
@Nullable private final EventsCompactionConfig eventsCompactionConfig;
7881
@Nullable private final ContextCacheConfig contextCacheConfig;
82+
private final ConcurrentMap<String, Completable> activeSessionCompletables =
83+
new MapMaker().weakValues().makeMap();
7984

8085
/** Builder for {@link Runner}. */
8186
public static class Builder {
@@ -380,25 +385,57 @@ public Flowable<Event> runAsync(
380385
Content newMessage,
381386
RunConfig runConfig,
382387
@Nullable Map<String, Object> stateDelta) {
388+
Flowable<Event> result =
389+
Flowable.defer(
390+
() ->
391+
this.sessionService
392+
.getSession(appName, userId, sessionId, Optional.empty())
393+
.switchIfEmpty(
394+
Single.defer(
395+
() -> {
396+
if (runConfig.autoCreateSession()) {
397+
return this.sessionService.createSession(
398+
appName, userId, (Map<String, Object>) null, sessionId);
399+
}
400+
return Single.error(
401+
new IllegalArgumentException(
402+
String.format(
403+
"Session not found: %s for user %s",
404+
sessionId, userId)));
405+
}))
406+
.flatMapPublisher(
407+
session ->
408+
this.runAsyncImpl(session, newMessage, runConfig, stateDelta)))
409+
.compose(Tracing.trace("invocation"));
410+
383411
return Flowable.defer(
384-
() ->
385-
this.sessionService
386-
.getSession(appName, userId, sessionId, Optional.empty())
387-
.switchIfEmpty(
388-
Single.defer(
389-
() -> {
390-
if (runConfig.autoCreateSession()) {
391-
return this.sessionService.createSession(
392-
appName, userId, (Map<String, Object>) null, sessionId);
393-
}
394-
return Single.error(
395-
new IllegalArgumentException(
396-
String.format(
397-
"Session not found: %s for user %s", sessionId, userId)));
398-
}))
399-
.flatMapPublisher(
400-
session -> this.runAsyncImpl(session, newMessage, runConfig, stateDelta)))
401-
.compose(Tracing.trace("invocation"));
412+
() -> {
413+
if (sessionId == null) {
414+
return result;
415+
}
416+
417+
CompletableSubject requestCompletion = CompletableSubject.create();
418+
419+
Completable[] previousHolder = new Completable[1];
420+
421+
activeSessionCompletables.compute(
422+
sessionId,
423+
(key, current) -> {
424+
previousHolder[0] = current;
425+
return requestCompletion;
426+
});
427+
428+
Completable previous = previousHolder[0];
429+
430+
Flowable<Event> sequenced =
431+
(previous == null) ? result : previous.onErrorComplete().andThen(result);
432+
433+
return sequenced.doFinally(
434+
() -> {
435+
requestCompletion.onComplete();
436+
activeSessionCompletables.remove(sessionId, requestCompletion);
437+
});
438+
});
402439
}
403440

404441
/** See {@link #runAsync(String, String, Content, RunConfig, Map)}. */
@@ -740,6 +777,9 @@ private BaseAgent findAgentToRun(Session session, BaseAgent rootAgent) {
740777

741778
for (Event event : events) {
742779
String author = event.author();
780+
if (author == null) {
781+
continue;
782+
}
743783
if (author.equals("user")) {
744784
continue;
745785
}

core/src/main/java/com/google/adk/sessions/Session.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public Builder userId(String userId) {
123123
@CanIgnoreReturnValue
124124
@JsonProperty("events")
125125
public Builder events(List<Event> events) {
126-
this.events = Collections.synchronizedList(events);
126+
this.events = Collections.synchronizedList(new ArrayList<>(events));
127127
return this;
128128
}
129129

0 commit comments

Comments
 (0)