Skip to content

Commit 8c62332

Browse files
committed
update file based session service
1 parent 65df715 commit 8c62332

1 file changed

Lines changed: 101 additions & 47 deletions

File tree

session/file/src/main/java/com/javaaidev/adk/session/file/FileBasedSessionService.java

Lines changed: 101 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
import io.reactivex.rxjava3.core.Completable;
1919
import io.reactivex.rxjava3.core.Maybe;
2020
import io.reactivex.rxjava3.core.Single;
21-
import java.io.File;
2221
import java.io.IOException;
2322
import java.nio.file.Files;
2423
import java.nio.file.Path;
24+
import java.nio.file.StandardOpenOption;
2525
import java.time.Instant;
2626
import java.util.ArrayList;
2727
import java.util.List;
@@ -31,6 +31,7 @@
3131
import java.util.UUID;
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ConcurrentMap;
34+
import java.util.concurrent.atomic.AtomicInteger;
3435
import org.jspecify.annotations.Nullable;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ public class FileBasedSessionService implements BaseSessionService {
4445
.findAndRegisterModules()
4546
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
4647

48+
private final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Session>>>
49+
sessions;
4750
private final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>
4851
userState;
4952
private final ConcurrentMap<String, ConcurrentMap<String, Object>> appState;
@@ -59,8 +62,10 @@ public FileBasedSessionService(Path root) {
5962
}
6063
}
6164
this.root = root.normalize().toAbsolutePath();
65+
this.sessions = new ConcurrentHashMap<>();
6266
this.userState = new ConcurrentHashMap<>();
6367
this.appState = new ConcurrentHashMap<>();
68+
readAllSessions();
6469
readAppState();
6570
readUserState();
6671
LOGGER.info("Session data saved to {}", this.root);
@@ -94,10 +99,15 @@ public Single<Session> createSession(
9499
.lastUpdateTime(Instant.now())
95100
.build();
96101

102+
sessions
103+
.computeIfAbsent(appName, k -> new ConcurrentHashMap<>())
104+
.computeIfAbsent(userId, k -> new ConcurrentHashMap<>())
105+
.put(resolvedSessionId, newSession);
106+
97107
try {
98108
writeSession(appName, userId, resolvedSessionId, newSession);
99109
} catch (IOException e) {
100-
return Single.error(e);
110+
LOGGER.error("Failed to write session {}", sessionId, e);
101111
}
102112

103113
Session returnCopy = copySession(newSession);
@@ -112,12 +122,11 @@ public Maybe<Session> getSession(
112122
Objects.requireNonNull(sessionId, "sessionId cannot be null");
113123
Objects.requireNonNull(configOpt, "configOpt cannot be null");
114124

115-
Session storedSession;
116-
try {
117-
storedSession = getSession(appName, userId, sessionId);
118-
} catch (IOException e) {
119-
return Maybe.empty();
120-
}
125+
Session storedSession =
126+
sessions
127+
.getOrDefault(appName, new ConcurrentHashMap<>())
128+
.getOrDefault(userId, new ConcurrentHashMap<>())
129+
.get(sessionId);
121130

122131
if (storedSession == null) {
123132
return Maybe.empty();
@@ -157,18 +166,34 @@ public Single<ListSessionsResponse> listSessions(String appName, String userId)
157166
Objects.requireNonNull(appName, "appName cannot be null");
158167
Objects.requireNonNull(userId, "userId cannot be null");
159168

160-
var sessions = doListSessions(appName, userId);
161-
if (sessions.isEmpty()) {
169+
Map<String, Session> userSessionsMap =
170+
sessions.getOrDefault(appName, new ConcurrentHashMap<>()).get(userId);
171+
172+
if (userSessionsMap == null || userSessionsMap.isEmpty()) {
162173
return Single.just(ListSessionsResponse.builder().build());
163174
}
175+
164176
List<Session> sessionCopies =
165-
sessions.stream().map(this::copySessionMetadata).collect(toCollection(ArrayList::new));
177+
userSessionsMap.values().stream()
178+
.map(this::copySessionMetadata)
179+
.collect(toCollection(ArrayList::new));
166180

167181
return Single.just(ListSessionsResponse.builder().sessions(sessionCopies).build());
168182
}
169183

170184
@Override
171185
public Completable deleteSession(String appName, String userId, String sessionId) {
186+
Objects.requireNonNull(appName, "appName cannot be null");
187+
Objects.requireNonNull(userId, "userId cannot be null");
188+
Objects.requireNonNull(sessionId, "sessionId cannot be null");
189+
190+
ConcurrentMap<String, Session> userSessionsMap =
191+
sessions.getOrDefault(appName, new ConcurrentHashMap<>()).get(userId);
192+
193+
if (userSessionsMap != null) {
194+
userSessionsMap.remove(sessionId);
195+
}
196+
172197
var sessionPath = filePath(appName, userId, sessionId);
173198
try {
174199
Files.deleteIfExists(sessionPath);
@@ -185,12 +210,11 @@ public Single<ListEventsResponse> listEvents(String appName, String userId, Stri
185210
Objects.requireNonNull(userId, "userId cannot be null");
186211
Objects.requireNonNull(sessionId, "sessionId cannot be null");
187212

188-
Session storedSession = null;
189-
try {
190-
storedSession = getSession(appName, userId, sessionId);
191-
} catch (IOException e) {
192-
LOGGER.error("Failed to get session {}", sessionId, e);
193-
}
213+
Session storedSession =
214+
sessions
215+
.getOrDefault(appName, new ConcurrentHashMap<>())
216+
.getOrDefault(userId, new ConcurrentHashMap<>())
217+
.get(sessionId);
194218

195219
if (storedSession == null) {
196220
return Single.just(ListEventsResponse.builder().build());
@@ -240,16 +264,21 @@ public Single<Event> appendEvent(Session session, Event event) {
240264
BaseSessionService.super.appendEvent(session, event);
241265
session.lastUpdateTime(getInstantFromEvent(event));
242266

267+
sessions
268+
.getOrDefault(appName, new ConcurrentHashMap<>())
269+
.getOrDefault(userId, new ConcurrentHashMap<>())
270+
.put(sessionId, session);
271+
272+
mergeWithGlobalState(appName, userId, session);
273+
243274
try {
244275
writeSession(appName, userId, sessionId, session);
245276
writeAppState();
246277
writeUserState();
247278
} catch (IOException e) {
248-
return Single.error(e);
279+
LOGGER.error("Failed to write session {}", sessionId, e);
249280
}
250281

251-
mergeWithGlobalState(appName, userId, session);
252-
253282
return Single.just(event);
254283
}
255284

@@ -283,7 +312,7 @@ private Session mergeWithGlobalState(String appName, String userId, Session sess
283312
Map<String, Object> sessionState = session.state();
284313

285314
appState
286-
.getOrDefault(appName, new ConcurrentHashMap<String, Object>())
315+
.getOrDefault(appName, new ConcurrentHashMap<>())
287316
.forEach((key, value) -> sessionState.put(State.APP_PREFIX + key, value));
288317

289318
userState
@@ -294,34 +323,55 @@ private Session mergeWithGlobalState(String appName, String userId, Session sess
294323
return session;
295324
}
296325

297-
private List<Session> doListSessions(String appName, String userId) {
298-
var sessions = new ArrayList<Session>();
299-
var userDir = filePath(appName, userId);
300-
if (Files.exists(userDir)) {
301-
var files = userDir.toFile().listFiles();
302-
if (files != null) {
303-
for (var file : files) {
304-
try {
305-
sessions.add(readSession(file));
306-
} catch (IOException e) {
307-
LOGGER.error("Failed to read session file {}", file);
308-
}
309-
}
310-
}
311-
}
312-
return sessions;
326+
private Session readSession(Path path) throws IOException {
327+
return Session.fromJson(Files.readString(path));
313328
}
314329

315-
private Session getSession(String appName, String userId, String sessionId) throws IOException {
316-
var sessionPath = filePath(appName, userId, sessionId);
317-
if (!Files.exists(sessionPath)) {
318-
return null;
330+
private void readAllSessions() {
331+
var sessionsCount = new AtomicInteger(0);
332+
try (var appPaths = Files.list(root)) {
333+
appPaths
334+
.filter(p -> p.toFile().isDirectory())
335+
.forEach(
336+
appPath -> {
337+
var appName = appPath.getFileName().toString();
338+
try (var userPaths = Files.list(appPath)) {
339+
userPaths
340+
.filter(p -> p.toFile().isDirectory())
341+
.forEach(
342+
userPath -> {
343+
var userId = userPath.getFileName().toString();
344+
try (var sessionPaths = Files.list(userPath)) {
345+
sessionPaths
346+
.filter(p -> p.toFile().isFile())
347+
.forEach(
348+
sessionPath -> {
349+
var sessionId = sessionPath.getFileName().toString();
350+
try {
351+
var session = readSession(sessionPath);
352+
sessions
353+
.computeIfAbsent(
354+
appName, k -> new ConcurrentHashMap<>())
355+
.computeIfAbsent(
356+
userId, k -> new ConcurrentHashMap<>())
357+
.put(sessionId, session);
358+
sessionsCount.incrementAndGet();
359+
} catch (IOException e) {
360+
LOGGER.error("Ignore invalid session {}", sessionId);
361+
}
362+
});
363+
} catch (IOException e) {
364+
LOGGER.error("Ignore sessions of user id {}", userId, e);
365+
}
366+
});
367+
} catch (IOException e) {
368+
LOGGER.error("Ignore sessions of app name {}", appName, e);
369+
}
370+
});
371+
} catch (IOException e) {
372+
LOGGER.error("Failed to read all sessions", e);
319373
}
320-
return readSession(sessionPath.toFile());
321-
}
322-
323-
private Session readSession(File file) throws IOException {
324-
return objectMapper.readValue(file, Session.class);
374+
LOGGER.info("Loaded {} sessions", sessionsCount.get());
325375
}
326376

327377
private void readAppState() {
@@ -369,7 +419,11 @@ private void writeSession(String appName, String userId, String sessionId, Sessi
369419
var userDir = filePath(appName, userId);
370420
Files.createDirectories(userDir);
371421
var sessionPath = filePath(appName, userId, sessionId);
372-
objectMapper.writeValue(sessionPath.toFile(), session);
422+
Files.writeString(
423+
sessionPath,
424+
session.toJson(),
425+
StandardOpenOption.CREATE,
426+
StandardOpenOption.TRUNCATE_EXISTING);
373427
}
374428

375429
private Path appStatePath() {

0 commit comments

Comments
 (0)