Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2a58c2d
CIRCSTORE-642 Implement cache for locations and service-points API calls
roman-barannyk Mar 4, 2026
094efd0
CIRCSTORE-642 Fix logging
roman-barannyk Mar 4, 2026
f8026b6
CIRCSTORE-642 Add tests
roman-barannyk Mar 4, 2026
f726b68
CIRCSTORE-642 Revert unnecessary changes
roman-barannyk Mar 4, 2026
cd518ed
CIRCSTORE-642 update testcontainers dependency
roman-barannyk Mar 5, 2026
15a8d78
CIRCSTORE-642 remove unnecessary dependency
roman-barannyk Mar 6, 2026
01ebcaa
CIRCSTORE-642 fix code smells
roman-barannyk Mar 6, 2026
cd0fd64
CIRCSTORE-642 fix logging
roman-barannyk Mar 6, 2026
852a0c9
CIRCSTORE-642 refactoring
roman-barannyk Mar 6, 2026
37ad88f
CIRCSTORE-642 fix code smell
roman-barannyk Mar 9, 2026
b03cd42
CIRCSTORE-642 move cache to client, update handlers
roman-barannyk Mar 10, 2026
462450d
CIRCSTORE-642 remove unnecessary changes
roman-barannyk Mar 10, 2026
55bb49b
CIRCSTORE-642 add delete location event handler
roman-barannyk Mar 10, 2026
25e5ea6
CIRCSTORE-642 add delete location event handler
roman-barannyk Mar 10, 2026
f49874b
CIRCSTORE-642 update test
roman-barannyk Mar 10, 2026
74e1810
CIRCSTORE-642 update test
roman-barannyk Mar 12, 2026
6d4d135
CIRCSTORE-642 remove cache from the repository
roman-barannyk Mar 12, 2026
f4d2606
Merge branch 'master' into CIRCSTORE-642
roman-barannyk Mar 19, 2026
5ae125a
CIRCSTORE-642 add null check
roman-barannyk Mar 19, 2026
669a1dd
Merge remote-tracking branch 'origin/CIRCSTORE-642' into CIRCSTORE-642
roman-barannyk Mar 19, 2026
a806e46
Fix test suite port binding and initialization issues
roman-barannyk Mar 19, 2026
1125988
CIRCSTORE-642 fix code smell
roman-barannyk Mar 19, 2026
128e955
CIRCSTORE-643 fix logging
roman-barannyk Mar 23, 2026
9b0d7c6
CIRCSTORE-642 fix cache, refactoring
roman-barannyk Mar 23, 2026
5bc394a
CIRCSTORE-642 add null check
roman-barannyk Mar 23, 2026
81107e3
CIRCSTORE-642 refactoring
roman-barannyk Mar 24, 2026
c710a86
Merge branch 'master' into CIRCSTORE-642
roman-barannyk Mar 24, 2026
33ef799
CIRCSTORE-642 add TTL for caches
roman-barannyk Mar 26, 2026
419e112
CIRCSTORE-642 refactoring
roman-barannyk Mar 26, 2026
bd430df
Merge branch 'master' into CIRCSTORE-642
roman-barannyk Mar 26, 2026
ee74404
CIRCSTORE-642 Remove future
alexanderkurash Mar 26, 2026
0b1074b
CIRCSTORE-642 Use otherwise
alexanderkurash Mar 26, 2026
a573251
CIRCSTORE-642 Update cache on update event
alexanderkurash Mar 26, 2026
69fdd70
CIRCSTORE-642 Move cache update
alexanderkurash Mar 26, 2026
b72f1f2
CIRCSTORE-642 fix code smell
roman-barannyk Mar 26, 2026
436858a
Merge branch 'master' into CIRCSTORE-642
roman-barannyk Mar 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.21.0</version>
<version>2.0.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -198,7 +198,7 @@
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<artifactId>testcontainers-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -240,6 +240,11 @@
<version>1.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>
</dependencies>

<distributionManagement>
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/folio/EventConsumerVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static org.folio.rest.tools.utils.ModuleName.getModuleName;
import static org.folio.rest.tools.utils.ModuleName.getModuleVersion;
import static org.folio.service.event.InventoryEventType.INVENTORY_ITEM_UPDATED;
import static org.folio.service.event.InventoryEventType.INVENTORY_LOCATION_DELETED;
import static org.folio.service.event.InventoryEventType.INVENTORY_LOCATION_DELETED_ALL;
import static org.folio.service.event.InventoryEventType.INVENTORY_LOCATION_UPDATED;
import static org.folio.service.event.InventoryEventType.INVENTORY_SERVICE_POINT_DELETED;
import static org.folio.service.event.InventoryEventType.INVENTORY_SERVICE_POINT_UPDATED;
Expand All @@ -27,6 +29,7 @@
import org.folio.kafka.services.KafkaTopic;
import org.folio.service.event.InventoryEventType;
import org.folio.service.event.handler.ItemUpdateEventHandler;
import org.folio.service.event.handler.LocationDeleteEventHandler;
import org.folio.service.event.handler.LocationUpdateEventHandler;
import org.folio.service.event.handler.ServicePointDeleteEventHandler;
import org.folio.service.event.handler.ServicePointUpdateEventHandler;
Expand Down Expand Up @@ -77,6 +80,7 @@ private Future<Void> stopConsumers() {

private Future<Void> createConsumers() {
final KafkaConfig config = getKafkaConfig();
var locationDeleteHandler = new LocationDeleteEventHandler();

return createInventoryEventConsumer(INVENTORY_ITEM_UPDATED, config,
new ItemUpdateEventHandler(context))
Expand All @@ -86,6 +90,10 @@ private Future<Void> createConsumers() {
new ServicePointDeleteEventHandler(context)))
.compose(r -> createInventoryEventConsumer(INVENTORY_LOCATION_UPDATED, config,
new LocationUpdateEventHandler(context)))
.compose(r -> createInventoryEventConsumer(INVENTORY_LOCATION_DELETED, config,
locationDeleteHandler))
.compose(r -> createInventoryEventConsumer(INVENTORY_LOCATION_DELETED_ALL, config,
locationDeleteHandler))
.mapEmpty();
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/folio/persist/RequestRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ public RequestRepository(Context context, Map<String, String> okapiHeaders) {
super(postgresClient(context, okapiHeaders), REQUEST_TABLE, REQUEST_CLASS);
}

}
}
120 changes: 118 additions & 2 deletions src/main/java/org/folio/rest/client/InventoryStorageClient.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,147 @@
package org.folio.rest.client;

import static io.vertx.core.Future.succeededFuture;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.rest.jaxrs.model.Location;
import org.folio.rest.jaxrs.model.Servicepoint;

import java.util.concurrent.TimeUnit;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.vertx.core.Future;
import io.vertx.core.Vertx;

public class InventoryStorageClient extends OkapiClient {
private static final Logger log = LogManager.getLogger(InventoryStorageClient.class);

public record CacheKey(String tenantId, String recordId) {}

private static final String SERVICE_POINTS_URL = "/service-points";
private static final String SERVICE_POINTS_COLLECTION_NAME = "servicepoints";

private static final String LOCATION_URL = "/locations";
private static final String LOCATION_COLLECTION_NAME = "locations";

private static final Cache<CacheKey, Location> locationCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(1, TimeUnit.HOURS)
.build();

private static final Cache<CacheKey, Servicepoint> servicePointCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(1, TimeUnit.HOURS)
.build();

public InventoryStorageClient(Vertx vertx, Map<String, String> okapiHeaders) {
super(vertx, okapiHeaders);
}

public static Cache<CacheKey, Location> locationCache() {
return locationCache;
}

public static Cache<CacheKey, Servicepoint> servicePointCache() {
return servicePointCache;
}

public static void invalidateLocation(String tenantId, String locationId) {
log.info("invalidateLocation:: tenantId: {}, locationId: {}", tenantId, locationId);
locationCache.invalidate(new CacheKey(tenantId, locationId));
}

public static void updateLocationCache(String tenantId, String locationId,
Location location) {

log.info("updateLocationCache:: tenantId: {}, locationId: {}", tenantId, locationId);
locationCache.put(new CacheKey(tenantId, locationId), location);
}

public static void invalidateAllLocations() {
log.info("invalidateAllLocations:: invalidating all location cache entries");
locationCache.invalidateAll();
}

public static void invalidateServicePoint(String tenantId, String servicePointId) {
log.info("invalidateServicePoint:: tenantId: {}, servicePointId: {}", tenantId, servicePointId);
servicePointCache.invalidate(new CacheKey(tenantId, servicePointId));
}

public static void updateServicePointCache(String tenantId, String servicePointId,
Servicepoint servicePoint) {

log.info("updateServicePointCache:: tenantId: {}, servicePointId: {}", tenantId,
servicePointId);
servicePointCache.put(new CacheKey(tenantId, servicePointId), servicePoint);
}

public static void invalidateAllServicePoints() {
log.info("invalidateAllServicePoints:: invalidating all service point cache entries");
servicePointCache.invalidateAll();
}

public Future<Collection<Servicepoint>> getServicePoints(Collection<String> ids) {
return get(SERVICE_POINTS_URL, ids, SERVICE_POINTS_COLLECTION_NAME, Servicepoint.class);
return fetchWithCache(ids, SERVICE_POINTS_URL, SERVICE_POINTS_COLLECTION_NAME,
Servicepoint.class, servicePointCache, Servicepoint::getId);
}

public Future<Collection<Location>> getLocations(Collection<String> ids) {
return get(LOCATION_URL, ids, LOCATION_COLLECTION_NAME, Location.class);
return fetchWithCache(ids, LOCATION_URL, LOCATION_COLLECTION_NAME,
Location.class, locationCache, Location::getId);
}

private <T> Future<Collection<T>> fetchWithCache(Collection<String> ids, String url,
String collectionName, Class<T> type, Cache<CacheKey, T> cache,
Function<T, String> idExtractor) {

if (ids == null || ids.isEmpty()) {
log.info("fetchWithCache:: ids are null or empty, returning empty collection");
return succeededFuture(new ArrayList<>());
}

List<T> cachedResults = new ArrayList<>();
List<String> missingIds = new ArrayList<>();

for (String id : ids) {
if (id == null) {
log.warn("fetchWithCache:: null id encountered, skipping");
continue;
}
T cached = cache.getIfPresent(new CacheKey(getTenant(), id));
if (cached != null) {
log.info("fetchWithCache:: Cache hit for tenantId: {}, id: {}", getTenant(), id);
cachedResults.add(cached);
} else {
log.info("fetchWithCache:: Cache miss for tenantId: {}, id: {}", getTenant(), id);
missingIds.add(id);
}
}

if (missingIds.isEmpty()) {
return succeededFuture(cachedResults);
}

return get(url, missingIds, collectionName, type)
.onSuccess(fetched -> fetched.forEach(item -> {
String id = idExtractor.apply(item);
if (id != null) {
cache.put(new CacheKey(getTenant(), id), item);
log.info("fetchWithCache:: Cached item with tenantId: {}, id: {}", getTenant(), id);
}
}))
.map(fetched -> {
List<T> merged = new ArrayList<>(cachedResults);
merged.addAll(fetched);
return merged;
});
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/folio/rest/client/OkapiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public OkapiClient(Vertx vertx, Map<String, String> okapiHeaders) {
token = okapiHeaders.get(OKAPI_HEADER_TOKEN);
}

protected String getTenant() {
return tenant;
}

Future<HttpResponse<Buffer>> okapiGet(String path) {
log.debug("okapiGet:: path: {}", path);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.service.event;

import static org.folio.service.event.InventoryEventType.PayloadType.DELETE;
import static org.folio.service.event.InventoryEventType.PayloadType.DELETE_ALL;
import static org.folio.service.event.InventoryEventType.PayloadType.UPDATE;
import static org.folio.support.kafka.topic.InventoryKafkaTopic.ITEM;
import static org.folio.support.kafka.topic.InventoryKafkaTopic.LOCATION;
Expand All @@ -17,7 +18,9 @@ public enum InventoryEventType {
INVENTORY_ITEM_UPDATED(ITEM, UPDATE),
INVENTORY_SERVICE_POINT_UPDATED(SERVICE_POINT, UPDATE),
INVENTORY_SERVICE_POINT_DELETED(SERVICE_POINT, DELETE),
INVENTORY_LOCATION_UPDATED(LOCATION, UPDATE);
INVENTORY_LOCATION_UPDATED(LOCATION, UPDATE),
INVENTORY_LOCATION_DELETED(LOCATION, DELETE),
INVENTORY_LOCATION_DELETED_ALL(LOCATION, DELETE_ALL);

private final KafkaTopic kafkaTopic;
private final PayloadType payloadType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

public class ItemUpdateEventHandler implements AsyncRecordHandler<String, String> {
private final Context context;

public ItemUpdateEventHandler(Context context) {
this.context = context;
}
Expand All @@ -24,8 +25,9 @@ public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRe
JsonObject payload = new JsonObject(kafkaConsumerRecord.value());
CaseInsensitiveMap<String, String> headers =
new CaseInsensitiveMap<>(kafkaHeadersToMap(kafkaConsumerRecord.headers()));
ItemUpdateProcessorForRequest itemUpdateProcessorForRequest =
new ItemUpdateProcessorForRequest(new RequestRepository(context, headers), new InventoryStorageClient(context.owner(), headers));

var itemUpdateProcessorForRequest = new ItemUpdateProcessorForRequest(
new RequestRepository(context, headers), new InventoryStorageClient(context.owner(), headers));

return itemUpdateProcessorForRequest.run(kafkaConsumerRecord.key(), payload);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.folio.service.event.handler;

import static io.vertx.core.Future.succeededFuture;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.RestVerticle.OKAPI_HEADER_TENANT;

import org.apache.commons.collections4.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.rest.client.InventoryStorageClient;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

public class LocationDeleteEventHandler implements AsyncRecordHandler<String, String> {
private static final Logger log = LogManager.getLogger(LocationDeleteEventHandler.class);

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
JsonObject payload = new JsonObject(kafkaConsumerRecord.value());
String tenantId = new CaseInsensitiveMap<>(kafkaHeadersToMap(kafkaConsumerRecord.headers()))
.get(OKAPI_HEADER_TENANT);
String eventType = payload.getString("type");

if ("DELETE_ALL".equals(eventType)) {
log.info("handle:: Received DELETE_ALL event, invalidating all location cache entries");
InventoryStorageClient.invalidateAllLocations();
} else {
JsonObject oldObject = payload.getJsonObject("old");
if (oldObject != null && oldObject.containsKey("id")) {
String locationId = oldObject.getString("id");
log.info("handle:: Received DELETE event for location id: {}", locationId);
InventoryStorageClient.invalidateLocation(tenantId, locationId);
}
}

return succeededFuture(kafkaConsumerRecord.key());
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package org.folio.service.event.handler;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import static org.folio.rest.RestVerticle.OKAPI_HEADER_TENANT;

import org.apache.commons.collections4.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.persist.RequestRepository;
import org.folio.rest.client.InventoryStorageClient;
import org.folio.rest.jaxrs.model.Location;
import org.folio.service.event.handler.processor.ItemLocationUpdateProcessorForRequest;

import static org.folio.kafka.KafkaHeaderUtils.kafkaHeadersToMap;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

public class LocationUpdateEventHandler implements AsyncRecordHandler<String, String> {
private static final Logger log = LogManager.getLogger();

private final Context context;

public LocationUpdateEventHandler(Context context) {
this.context = context;
}
Expand All @@ -22,10 +31,21 @@ public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRe
JsonObject payload = new JsonObject(kafkaConsumerRecord.value());
CaseInsensitiveMap<String, String> headers =
new CaseInsensitiveMap<>(kafkaHeadersToMap(kafkaConsumerRecord.headers()));
String tenantId = headers.get(OKAPI_HEADER_TENANT);

ItemLocationUpdateProcessorForRequest itemLocationUpdateProcessorForRequest =
new ItemLocationUpdateProcessorForRequest(new RequestRepository(context, headers));
new ItemLocationUpdateProcessorForRequest(new RequestRepository(context, headers));

return itemLocationUpdateProcessorForRequest.run(kafkaConsumerRecord.key(), payload);
return itemLocationUpdateProcessorForRequest.run(kafkaConsumerRecord.key(), payload)
.onComplete(notUsed -> {
// Update location cache
JsonObject newObject = payload.getJsonObject("new");
if (newObject != null && newObject.containsKey("id")) {
log.info("handle:: updating location cache for tenantId: {}, locationId: {}",
tenantId, newObject.getString("id"));
InventoryStorageClient.updateLocationCache(tenantId, newObject.getString("id"),
newObject.mapTo(Location.class));
}
});
}
}
Loading
Loading