diff --git a/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStore.java b/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStore.java index 0598e89..efad50e 100644 --- a/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStore.java +++ b/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStore.java @@ -1,6 +1,7 @@ package gr.cytech.sendium.core.smpp.server; import gr.cytech.sendium.core.message.StandardMessage; +import gr.cytech.sendium.core.worker.InMemoryDlrService; import gr.cytech.sendium.core.worker.MessageState; import jakarta.inject.Inject; import org.slf4j.Logger; @@ -54,8 +55,48 @@ public Future persistMessages(List> eventsQueu @Override public boolean markAsUnpushed(StandardMessage msg) { - logger.info("markAsUnpushed message {}", msg); - return true; + if (msg == null || msg.type != StandardMessage.MSG_DLR) { + return false; + } + + try { + boolean saved = getDlrService().saveUnpushedDlr(msg); + if (!saved) { + logger.warn("Failed to save unpushed DLR: {}", msg); + } + return saved; + } catch (Exception e) { + logger.warn("Exception while saving unpushed DLR: {}", msg, e); + return false; + } + } + + @Override + public void onClientConnected(String systemId) { + InMemoryDlrService dlrService = getDlrService(); + List unpushedDlrs = dlrService.claimUnpushedDlrs(systemId); + if (unpushedDlrs.isEmpty()) { + logger.info("Unpushed DLR(s) not found for systemId:{}", systemId); + return; + } + + logger.info("Re-enqueuing {} unpushed DLR(s) for systemId:{}", unpushedDlrs.size(), systemId); + for (StandardMessage msg : unpushedDlrs) { + try { + if (worker.enqueueNoExceptions(msg)) { + dlrService.removeUnpushedDlr(msg); + } else { + dlrService.releaseUnpushedDlrClaim(msg); + } + } catch (Exception e) { + dlrService.releaseUnpushedDlrClaim(msg); + logger.warn("Failed to re-enqueue unpushed DLR: {}", msg, e); + } + } + } + + private InMemoryDlrService getDlrService() { + return worker.getWorkerResources().getDlrService(); } @Override diff --git a/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerBindHandler.java b/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerBindHandler.java index b74f9ef..8eae093 100644 --- a/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerBindHandler.java +++ b/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerBindHandler.java @@ -84,6 +84,13 @@ public void sessionCreated( logger.info("Session created for account ID: {}", accountId); session.serverReady(handler); + try { + if (worker.getMessageStore() != null) { + worker.getMessageStore().onClientConnected(handler.getSystemId()); + } + } catch (Exception e) { + logger.warn("Failed to process unpushed DLRs for systemId:{}", handler.getSystemId(), e); + } } public void sessionDestroyed(Long sessionId, SmppServerSession session) { diff --git a/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerMessageStore.java b/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerMessageStore.java index 106d429..775099b 100644 --- a/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerMessageStore.java +++ b/sendium-core/src/main/java/gr/cytech/sendium/core/smpp/server/SmppServerMessageStore.java @@ -28,6 +28,12 @@ public interface SmppServerMessageStore { */ boolean markAsUnpushed(M msg); + /** + * Called when a transmittable SMPP client session becomes available again. + */ + default void onClientConnected(String systemId) { + } + /** * Fetch the maximum allowed attempts for a message. */ diff --git a/sendium-core/src/main/java/gr/cytech/sendium/core/worker/InMemoryDlrService.java b/sendium-core/src/main/java/gr/cytech/sendium/core/worker/InMemoryDlrService.java index 2f20d34..96bdea1 100644 --- a/sendium-core/src/main/java/gr/cytech/sendium/core/worker/InMemoryDlrService.java +++ b/sendium-core/src/main/java/gr/cytech/sendium/core/worker/InMemoryDlrService.java @@ -1,7 +1,10 @@ package gr.cytech.sendium.core.worker; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import gr.cytech.sendium.core.message.StandardMessage; import io.quarkus.runtime.ShutdownEvent; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; @@ -12,36 +15,63 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +/** + * Stores DLR correlation state and unpushed SMPP DLRs. + * + *

+ * The service uses H2 MVStore when available and falls back to in-memory maps if the store cannot be opened. + * The primary/correlation maps track submitted messages until operator DLRs arrive. The unpushed-DLR maps + * persist DLRs that could not be delivered to a disconnected SMPP client, then replay them when the matching + * systemId reconnects. + * + *

+ * This is an application-scoped singleton. The primary/correlation state follows the existing model of map-level + * concurrency: each operation is safe to call from worker threads, but multi-step updates are not globally serialized. + * Unpushed DLRs have stronger consistency requirements because each entry is split across payload, timestamp, and + * systemId index maps. Those compound operations are guarded by {@code unpushedDlrLock}. Replay also claims keys + * before returning them so concurrent reconnect callbacks for the same systemId cannot enqueue the same DLR twice. + */ @ApplicationScoped -public class InMemoryDlrService implements Serializable { +public class InMemoryDlrService { private static final Logger logger = LoggerFactory.getLogger(InMemoryDlrService.class); private static final long SEVEN_DAYS_MILLIS = TimeUnit.DAYS.toMillis(7); private static final long THREE_DAYS_MILLIS = TimeUnit.DAYS.toMillis(3); - private static final long serialVersionUID = 1L; private static final long EXPIRY_CHECK_INTERVAL = TimeUnit.HOURS.toMillis(1); private static final String DB_PATH_PROPERTY = "sendium.dlr.db.path"; private static final String DEFAULT_DB_PATH = "data/dlr-mvstore.db"; - private static final ObjectMapper mapper = new ObjectMapper(); + private static final ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static final TypeReference> STRING_LIST_TYPE = new TypeReference<>() { + }; @Inject ForwardDlrService forwardDlrService; - private transient MVStore store; + private final Object unpushedDlrStateLock = new Object(); + private final Set claimedUnpushedDlrKeys = ConcurrentHashMap.newKeySet(); - private transient Map primaryStore; - private transient Map correlationIndex; - private transient Map primaryTimestamps; - private transient Map correlationTimestamps; - private transient volatile long lastExpiryCheck = 0; + private MVStore store; + + private Map primaryStore; + private Map correlationIndex; + private Map primaryTimestamps; + private Map correlationTimestamps; + private Map unpushedDlrStore; + private Map unpushedDlrTimestamps; + private Map unpushedDlrIndex; + + private volatile long lastExpiryCheck = 0; @SuppressWarnings("unused") - private transient volatile boolean initialized = false; + private volatile boolean initialized = false; @PostConstruct void init() { @@ -74,13 +104,16 @@ void init() { correlationIndex = store.openMap("correlationIndex"); primaryTimestamps = store.openMap("primaryTimestamps"); correlationTimestamps = store.openMap("correlationTimestamps"); + unpushedDlrStore = store.openMap("unpushedDlrStore"); + unpushedDlrTimestamps = store.openMap("unpushedDlrTimestamps"); + unpushedDlrIndex = store.openMap("unpushedDlrIndex"); - if (primaryStore == null || correlationIndex == null) { + if (primaryStore == null || correlationIndex == null || unpushedDlrStore == null || unpushedDlrIndex == null) { logger.error("Failed to load maps from DB, falling back to in-memory"); fallbackToInMemory(); } else { - logger.info("Loaded from DB - primaryStore: {}, correlationIndex: {}", - primaryStore.size(), correlationIndex.size()); + logger.info("Loaded from DB - primaryStore: {}, correlationIndex: {}, unpushedDlrStore: {}, unpushedDlrIndex: {}", + primaryStore.size(), correlationIndex.size(), unpushedDlrStore.size(), unpushedDlrIndex.size()); initialized = true; } } catch (Exception e) { @@ -99,6 +132,9 @@ private void fallbackToInMemory() { correlationIndex = new ConcurrentHashMap<>(); primaryTimestamps = new ConcurrentHashMap<>(); correlationTimestamps = new ConcurrentHashMap<>(); + unpushedDlrStore = new ConcurrentHashMap<>(); + unpushedDlrTimestamps = new ConcurrentHashMap<>(); + unpushedDlrIndex = new ConcurrentHashMap<>(); initialized = true; logger.info("Using in-memory mode (no persistence)"); } @@ -275,6 +311,197 @@ public boolean markAsFailed(String gatewayMsgId) { return false; } + /** + * Persist a DLR that could not be pushed to the SMPP client. + */ + public boolean saveUnpushedDlr(StandardMessage msg) { + checkExpiry(); + if (unpushedDlrStore == null || unpushedDlrIndex == null || msg == null || msg.type != StandardMessage.MSG_DLR || + msg.systemId == null || msg.systemId.isBlank()) { + return false; + } + + String key = getUnpushedDlrKey(msg); + synchronized (unpushedDlrStateLock) { + try { + unpushedDlrStore.put(key, mapper.writeValueAsString(UnpushedDlr.fromMessage(msg))); + unpushedDlrTimestamps.put(key, System.currentTimeMillis()); + addKeyToUnpushedDlrIndex(msg.systemId, key); + commitStore(); + logger.info("Saved unpushed DLR key: {}", key); + return true; + } catch (JsonProcessingException e) { + logger.error("Failed to serialize unpushed DLR key: {}", key, e); + return false; + } + } + } + + /** + * Load unpushed DLRs for one SMPP systemId without marking them for replay. + */ + public List getUnpushedDlrs(String systemId) { + return loadUnpushedDlrs(systemId, false); + } + + /** + * Load and claim unpushed DLRs for replay. Claimed entries are hidden from later claims until removed or released. + */ + public List claimUnpushedDlrs(String systemId) { + return loadUnpushedDlrs(systemId, true); + } + + private List loadUnpushedDlrs(String systemId, boolean claimForReplay) { + checkExpiry(); + List messages = new ArrayList<>(); + if (unpushedDlrStore == null || unpushedDlrIndex == null || systemId == null || systemId.isBlank()) { + return messages; + } + + boolean changed = false; + synchronized (unpushedDlrStateLock) { + for (String key : getUnpushedDlrKeys(systemId)) { + if (claimForReplay && claimedUnpushedDlrKeys.contains(key)) { + continue; + } + + String msgJson = unpushedDlrStore.get(key); + if (msgJson == null) { + removeKeyFromUnpushedDlrIndex(systemId, key); + changed = true; + continue; + } + try { + UnpushedDlr dlr = mapper.readValue(msgJson, UnpushedDlr.class); + if (isUnpushedDlrForConnection(dlr, systemId)) { + if (!claimForReplay || claimedUnpushedDlrKeys.add(key)) { + messages.add(dlr.toMessage()); + } + } else { + removeKeyFromUnpushedDlrIndex(systemId, key); + changed = true; + } + } catch (JsonProcessingException e) { + logger.error("Failed to deserialize unpushed DLR key: {}. Removing corrupt entry", key, e); + unpushedDlrStore.remove(key); + unpushedDlrTimestamps.remove(key); + claimedUnpushedDlrKeys.remove(key); + removeKeyFromUnpushedDlrIndex(systemId, key); + changed = true; + } + } + if (changed) { + commitStore(); + } + } + + return messages; + } + + /** + * Remove a replayed DLR from all unpushed-DLR maps. + */ + public boolean removeUnpushedDlr(StandardMessage msg) { + if (unpushedDlrStore == null || unpushedDlrIndex == null || msg == null || msg.systemId == null || msg.systemId.isBlank()) { + return false; + } + + String key = getUnpushedDlrKey(msg); + synchronized (unpushedDlrStateLock) { + final boolean removed = unpushedDlrStore.remove(key) != null; + unpushedDlrTimestamps.remove(key); + claimedUnpushedDlrKeys.remove(key); + removeKeyFromUnpushedDlrIndex(msg.systemId, key); + if (removed) { + commitStore(); + } + return removed; + } + } + + /** + * Make a claimed but not yet removed DLR eligible for a later replay attempt. + */ + public void releaseUnpushedDlrClaim(StandardMessage msg) { + if (msg == null || msg.systemId == null || msg.systemId.isBlank()) { + return; + } + + synchronized (unpushedDlrStateLock) { + claimedUnpushedDlrKeys.remove(getUnpushedDlrKey(msg)); + } + } + + private boolean isUnpushedDlrForConnection(UnpushedDlr dlr, String systemId) { + return dlr != null && dlr.systemId != null && dlr.systemId.equals(systemId); + } + + private String getUnpushedDlrKey(StandardMessage msg) { + return String.join("|", + nullToEmpty(msg.systemId), + nullToEmpty(msg.serial), + String.valueOf(msg.state), + nullToEmpty(msg.errcode), + String.valueOf(msg.msgId)); + } + + private void addKeyToUnpushedDlrIndex(String systemId, String key) throws JsonProcessingException { + List keys = getUnpushedDlrKeys(systemId); + if (!keys.contains(key)) { + keys.add(key); + unpushedDlrIndex.put(systemId, mapper.writeValueAsString(keys)); + } + } + + private List getUnpushedDlrKeys(String systemId) { + String keysJson = unpushedDlrIndex.get(systemId); + if (keysJson == null || keysJson.isBlank()) { + return new ArrayList<>(); + } + try { + return new ArrayList<>(mapper.readValue(keysJson, STRING_LIST_TYPE)); + } catch (JsonProcessingException e) { + logger.error("Failed to deserialize unpushed DLR index for systemId: {}. Clearing corrupt index", systemId, e); + unpushedDlrIndex.remove(systemId); + return new ArrayList<>(); + } + } + + private void removeKeyFromUnpushedDlrIndex(String systemId, String key) { + if (systemId == null || unpushedDlrIndex == null) { + return; + } + List keys = getUnpushedDlrKeys(systemId); + if (!keys.remove(key)) { + return; + } + if (keys.isEmpty()) { + unpushedDlrIndex.remove(systemId); + return; + } + try { + unpushedDlrIndex.put(systemId, mapper.writeValueAsString(keys)); + } catch (JsonProcessingException e) { + logger.error("Failed to serialize unpushed DLR index for systemId: {}. Clearing index", systemId, e); + unpushedDlrIndex.remove(systemId); + } + } + + private String getSystemIdFromUnpushedDlrKey(String key) { + int separator = key.indexOf('|'); + return separator >= 0 ? key.substring(0, separator) : key; + } + + private String nullToEmpty(String value) { + return value == null ? "" : value; + } + + private void commitStore() { + if (store != null && !store.isClosed()) { + store.commit(); + } + } + private synchronized void checkExpiry() { long now = System.currentTimeMillis(); if (now - lastExpiryCheck < EXPIRY_CHECK_INTERVAL) { @@ -305,6 +532,26 @@ private synchronized void checkExpiry() { } } } + + boolean removedExpired = false; + if (unpushedDlrStore != null && unpushedDlrTimestamps != null) { + synchronized (unpushedDlrStateLock) { + for (String key : unpushedDlrTimestamps.keySet()) { + Long ts = unpushedDlrTimestamps.get(key); + if (ts != null && (now - ts) > SEVEN_DAYS_MILLIS) { + unpushedDlrStore.remove(key); + unpushedDlrTimestamps.remove(key); + claimedUnpushedDlrKeys.remove(key); + removeKeyFromUnpushedDlrIndex(getSystemIdFromUnpushedDlrKey(key), key); + removedExpired = true; + logger.debug("Expired unpushed DLR entry: {}", key); + } + } + } + } + if (removedExpired) { + commitStore(); + } } public int getPrimaryStoreSize() { @@ -315,7 +562,15 @@ public int getCorrelationIndexSize() { return correlationIndex != null ? correlationIndex.size() : 0; } + public int getUnpushedDlrStoreSize() { + return unpushedDlrStore != null ? unpushedDlrStore.size() : 0; + } + + public int getUnpushedDlrIndexSize() { + return unpushedDlrIndex != null ? unpushedDlrIndex.size() : 0; + } + public boolean isPersistent() { return store != null && !store.isClosed(); } -} \ No newline at end of file +} diff --git a/sendium-core/src/main/java/gr/cytech/sendium/core/worker/UnpushedDlr.java b/sendium-core/src/main/java/gr/cytech/sendium/core/worker/UnpushedDlr.java new file mode 100644 index 0000000..b9e3d6d --- /dev/null +++ b/sendium-core/src/main/java/gr/cytech/sendium/core/worker/UnpushedDlr.java @@ -0,0 +1,61 @@ +package gr.cytech.sendium.core.worker; + +import gr.cytech.sendium.core.message.StandardMessage; +import io.quarkus.runtime.annotations.RegisterForReflection; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +@RegisterForReflection +public class UnpushedDlr implements Serializable { + private static final long serialVersionUID = 1L; + + public String systemId; + public String accountId; + public String from; + public String to; + public String serial; + public int msgId; + public int state; + public String errcode; + public boolean acked; + public int priority; + public List reassembledParts; + + public UnpushedDlr() { + } + + public static UnpushedDlr fromMessage(StandardMessage msg) { + UnpushedDlr dlr = new UnpushedDlr(); + dlr.systemId = msg.systemId; + dlr.accountId = msg.owner_id; + dlr.from = msg.from; + dlr.to = msg.to; + dlr.serial = msg.serial; + dlr.msgId = msg.msgId; + dlr.state = msg.state; + dlr.errcode = msg.errcode; + dlr.acked = msg.acked; + dlr.priority = msg.priority; + dlr.reassembledParts = msg.reassembledParts == null ? null : new ArrayList<>(msg.reassembledParts); + return dlr; + } + + public StandardMessage toMessage() { + StandardMessage msg = new StandardMessage(); + msg.type = StandardMessage.MSG_DLR; + msg.systemId = systemId; + msg.owner_id = accountId; + msg.from = from; + msg.to = to; + msg.serial = serial; + msg.msgId = msgId; + msg.state = state; + msg.errcode = errcode; + msg.acked = acked; + msg.priority = priority; + msg.reassembledParts = reassembledParts == null ? null : new ArrayList<>(reassembledParts); + return msg; + } +} diff --git a/sendium-core/src/test/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStoreTest.java b/sendium-core/src/test/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStoreTest.java index 9b800b0..529c377 100644 --- a/sendium-core/src/test/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStoreTest.java +++ b/sendium-core/src/test/java/gr/cytech/sendium/core/smpp/server/InMemorySmppServerMessageStoreTest.java @@ -106,4 +106,55 @@ void getMaxAttempts_DefaultsTo3_WhenNoWorker() { assertEquals(3, result); } + + @Test + void markAsUnpushed_Dlr_SavesToDlrService() { + StandardMessage msg = new StandardMessage(); + msg.type = StandardMessage.MSG_DLR; + when(dlrService.saveUnpushedDlr(msg)).thenReturn(true); + + boolean result = messageStore.markAsUnpushed(msg); + + assertTrue(result); + verify(dlrService).saveUnpushedDlr(msg); + } + + @Test + void markAsUnpushed_NonDlr_ReturnsFalse() { + StandardMessage msg = new StandardMessage(); + msg.type = StandardMessage.MSG_TEXT; + + boolean result = messageStore.markAsUnpushed(msg); + + assertFalse(result); + verify(dlrService, never()).saveUnpushedDlr(any()); + } + + @Test + void onClientConnected_ReEnqueuesAndRemovesMatchingDlrs() { + StandardMessage dlr = new StandardMessage(); + dlr.type = StandardMessage.MSG_DLR; + dlr.owner_id = "account1"; + dlr.systemId = "sys1"; + when(dlrService.claimUnpushedDlrs("sys1")).thenReturn(List.of(dlr)); + when(worker.enqueueNoExceptions(dlr)).thenReturn(true); + + messageStore.onClientConnected("sys1"); + + verify(worker).enqueueNoExceptions(dlr); + verify(dlrService).removeUnpushedDlr(dlr); + } + + @Test + void onClientConnected_LeavesDlrStoredWhenReEnqueueFails() { + StandardMessage dlr = new StandardMessage(); + dlr.type = StandardMessage.MSG_DLR; + when(dlrService.claimUnpushedDlrs("sys1")).thenReturn(List.of(dlr)); + when(worker.enqueueNoExceptions(dlr)).thenReturn(false); + + messageStore.onClientConnected("sys1"); + + verify(dlrService, never()).removeUnpushedDlr(any()); + verify(dlrService).releaseUnpushedDlrClaim(dlr); + } } diff --git a/sendium-core/src/test/java/gr/cytech/sendium/core/worker/InMemoryDlrServiceTest.java b/sendium-core/src/test/java/gr/cytech/sendium/core/worker/InMemoryDlrServiceTest.java index 5b2d131..3bd08dc 100644 --- a/sendium-core/src/test/java/gr/cytech/sendium/core/worker/InMemoryDlrServiceTest.java +++ b/sendium-core/src/test/java/gr/cytech/sendium/core/worker/InMemoryDlrServiceTest.java @@ -1,22 +1,50 @@ package gr.cytech.sendium.core.worker; +import gr.cytech.sendium.core.message.StandardMessage; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.*; class InMemoryDlrServiceTest { private InMemoryDlrService dlrService; + private Path dbPath; + private String oldDbPath; @BeforeEach - void setUp() { + void setUp() throws Exception { + oldDbPath = System.getProperty("sendium.dlr.db.path"); + dbPath = Files.createTempFile("dlr-service-test", ".db"); + Files.deleteIfExists(dbPath); + System.setProperty("sendium.dlr.db.path", dbPath.toString()); dlrService = new InMemoryDlrService(); dlrService.init(); } + @AfterEach + void tearDown() throws Exception { + if (dlrService != null) { + dlrService.onStop(null); + } + if (oldDbPath == null) { + System.clearProperty("sendium.dlr.db.path"); + } else { + System.setProperty("sendium.dlr.db.path", oldDbPath); + } + if (dbPath != null) { + Files.deleteIfExists(dbPath); + } + } + @Test void saveInitialState_StoresInPrimaryStore() { MessageState state = new MessageState("gw-123", "systemId", "from", "to", null); @@ -117,6 +145,100 @@ void markAsFailed_MissingId_ReturnsFalse() { assertFalse(result); } + @Test + void saveUnpushedDlr_StoresAndReturnsMatchingDlr() { + StandardMessage dlr = createDlr("account1", "sys1"); + + boolean result = dlrService.saveUnpushedDlr(dlr); + List dlrs = dlrService.getUnpushedDlrs("sys1"); + + assertTrue(result); + assertTrue(dlrs.stream().anyMatch(msg -> dlr.serial.equals(msg.serial))); + StandardMessage stored = dlrs.getFirst(); + assertEquals(dlr.state, stored.state); + assertEquals(dlr.errcode, stored.errcode); + assertEquals(dlr.acked, stored.acked); + assertEquals(dlr.priority, stored.priority); + assertEquals(dlr.reassembledParts, stored.reassembledParts); + assertEquals(1, dlrService.getUnpushedDlrIndexSize()); + } + + @Test + void saveUnpushedDlr_BlankSystemIdReturnsFalse() { + StandardMessage dlr = createDlr("account1", null); + + boolean result = dlrService.saveUnpushedDlr(dlr); + + assertFalse(result); + } + + @Test + void getUnpushedDlrs_DifferentSystemIdDoesNotMatch() { + StandardMessage dlr = createDlr("account1", "sys1"); + dlrService.saveUnpushedDlr(dlr); + + List dlrs = dlrService.getUnpushedDlrs("sys2"); + + assertFalse(dlrs.stream().anyMatch(msg -> dlr.serial.equals(msg.serial))); + } + + @Test + void getUnpushedDlrs_UsesSystemIdIndex() { + StandardMessage sys1Dlr = createDlr("account1", "sys1"); + StandardMessage sys2Dlr = createDlr("account2", "sys2"); + dlrService.saveUnpushedDlr(sys1Dlr); + dlrService.saveUnpushedDlr(sys2Dlr); + + List dlrs = dlrService.getUnpushedDlrs("sys1"); + + assertEquals(2, dlrService.getUnpushedDlrIndexSize()); + assertTrue(dlrs.stream().anyMatch(msg -> sys1Dlr.serial.equals(msg.serial))); + assertFalse(dlrs.stream().anyMatch(msg -> sys2Dlr.serial.equals(msg.serial))); + } + + @Test + void removeUnpushedDlr_RemovesStoredDlr() { + StandardMessage dlr = createDlr("account1", "sys1"); + dlrService.saveUnpushedDlr(dlr); + + boolean result = dlrService.removeUnpushedDlr(dlr); + List dlrs = dlrService.getUnpushedDlrs("sys1"); + + assertTrue(result); + assertFalse(dlrs.stream().anyMatch(msg -> dlr.serial.equals(msg.serial))); + assertEquals(0, dlrService.getUnpushedDlrIndexSize()); + } + + @Test + void claimUnpushedDlrs_HidesClaimedDlrUntilReleased() { + StandardMessage dlr = createDlr("account1", "sys1"); + dlrService.saveUnpushedDlr(dlr); + + List firstClaim = dlrService.claimUnpushedDlrs("sys1"); + List secondClaim = dlrService.claimUnpushedDlrs("sys1"); + dlrService.releaseUnpushedDlrClaim(firstClaim.getFirst()); + List afterRelease = dlrService.claimUnpushedDlrs("sys1"); + + assertEquals(1, firstClaim.size()); + assertTrue(secondClaim.isEmpty()); + assertEquals(1, afterRelease.size()); + assertEquals(dlr.serial, afterRelease.getFirst().serial); + } + + @Test + void unpushedDlrs_SurviveRestart() throws Exception { + StandardMessage dlr = createDlr("account-restart", "sys-restart"); + + assertTrue(dlrService.saveUnpushedDlr(dlr)); + dlrService.onStop(null); + + dlrService = new InMemoryDlrService(); + dlrService.init(); + List dlrs = dlrService.getUnpushedDlrs("sys-restart"); + + assertTrue(dlrs.stream().anyMatch(msg -> dlr.serial.equals(msg.serial))); + } + @Test void getPrimaryStoreSize_ReturnsCount() { dlrService.saveInitialState(new MessageState("gw-1", "systemId", "from", "to", null)); @@ -137,7 +259,24 @@ void getCorrelationIndexSize_ReturnsCount() { } @Test - void isPersistent_FalseWhenNoDb() { - assertFalse(dlrService.isPersistent()); + void isPersistent_TrueWhenDbAvailable() { + assertTrue(dlrService.isPersistent()); + } + + private StandardMessage createDlr(String accountId, String systemId) { + StandardMessage dlr = new StandardMessage(); + dlr.type = StandardMessage.MSG_DLR; + dlr.owner_id = accountId; + dlr.systemId = systemId; + dlr.serial = UUID.randomUUID().toString(); + dlr.from = "from"; + dlr.to = "to"; + dlr.state = StandardMessage.DLR_STAT_DELIVRD; + dlr.errcode = "0"; + dlr.acked = true; + dlr.priority = StandardMessage.HIGH_PRIORITY; + dlr.msgId = 123; + dlr.reassembledParts = new ArrayList<>(List.of("part-1", "part-2")); + return dlr; } -} \ No newline at end of file +} diff --git a/sendium-core/src/test/java/utils/NativeE2eSmoke.java b/sendium-core/src/test/java/utils/NativeE2eSmoke.java index 5954b7e..3439736 100644 --- a/sendium-core/src/test/java/utils/NativeE2eSmoke.java +++ b/sendium-core/src/test/java/utils/NativeE2eSmoke.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sun.net.httpserver.HttpServer; import io.netty.channel.nio.NioEventLoopGroup; +import org.h2.mvstore.MVStore; import java.io.IOException; import java.net.HttpURLConnection; @@ -42,6 +43,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -73,8 +75,9 @@ public static void main(String[] args) throws Exception { require(upstream.awaitSessionBound(), "Sendium native container did not bind to the upstream SMPP server"); Thread.sleep(3_000); - verifySmppSubmitGetsDeliverSm(upstream); - verifyHttpSubmitGetsDlrCallback(upstream, callbackServer); + container = verifyUnpushedDlrSurvivesRestart(containerName, workDir, upstream); + verifySmppSubmitGetsDeliverSm(upstream, 2); + verifyHttpSubmitGetsDlrCallback(upstream, callbackServer, 3); } catch (Throwable t) { printDockerLogs(containerName); throw t; @@ -86,7 +89,7 @@ public static void main(String[] args) throws Exception { } } - private static void verifySmppSubmitGetsDeliverSm(UpstreamSmppServer upstream) throws Exception { + private static void verifySmppSubmitGetsDeliverSm(UpstreamSmppServer upstream, int expectedSubmitCount) throws Exception { try (DownstreamSmppClient client = new DownstreamSmppClient()) { client.start(); SubmitSmResp response = client.sendSms("smpp-sender", "306900000001", "native smpp e2e"); @@ -94,15 +97,17 @@ private static void verifySmppSubmitGetsDeliverSm(UpstreamSmppServer upstream) t "SMPP submit_sm_resp status was " + response.getCommandStatus()); require(response.getMessageId() != null && !response.getMessageId().isBlank(), "SMPP submit_sm_resp did not contain a message id"); - require(upstream.awaitSubmitCount(1), "Upstream SMPP server did not receive the SMPP-originated message"); + require(upstream.awaitSubmitCount(expectedSubmitCount), "Upstream SMPP server did not receive the SMPP-originated message"); DeliverSm deliverSm = client.awaitDeliverSm(); require(deliverSm != null, "Downstream SMPP client did not receive deliver_sm"); String body = new String(deliverSm.getShortMessage(), StandardCharsets.UTF_8); require(body.contains("DELIVRD"), "Downstream deliver_sm was not delivered: " + body); + Thread.sleep(500); } } - private static void verifyHttpSubmitGetsDlrCallback(UpstreamSmppServer upstream, CallbackServer callbackServer) throws Exception { + private static void verifyHttpSubmitGetsDlrCallback(UpstreamSmppServer upstream, CallbackServer callbackServer, + int expectedSubmitCount) throws Exception { String dlrUrl = "http://host.docker.internal:" + callbackServer.port() + "/dlr?status=%d&id=%s"; String query = "username=http-user" + "&password=http-pass" @@ -122,7 +127,7 @@ private static void verifyHttpSubmitGetsDlrCallback(UpstreamSmppServer upstream, String gatewayId = response.body().trim(); require(!gatewayId.isBlank(), "HTTP /sendsms did not return a gateway message id"); - require(upstream.awaitSubmitCount(2), "Upstream SMPP server did not receive the HTTP-originated message"); + require(upstream.awaitSubmitCount(expectedSubmitCount), "Upstream SMPP server did not receive the HTTP-originated message"); String callbackQuery = callbackServer.awaitCallback(); require(callbackQuery != null, "DLR callback URL was not called"); @@ -130,6 +135,49 @@ private static void verifyHttpSubmitGetsDlrCallback(UpstreamSmppServer upstream, require(callbackQuery.contains("id=" + gatewayId), "DLR callback did not contain gateway id " + gatewayId + ": " + callbackQuery); } + private static Process verifyUnpushedDlrSurvivesRestart(String containerName, Path workDir, + UpstreamSmppServer upstream) throws Exception { + upstream.setDeliveryReceiptDelayMillis(2_500); + String gatewayId; + try (DownstreamSmppClient client = new DownstreamSmppClient()) { + client.start(); + SubmitSmResp response = client.sendSms("smpp-sender", "306900000003", "native restart dlr e2e"); + require(response.getCommandStatus() == SmppConstants.STATUS_OK, + "SMPP restart submit_sm_resp status was " + response.getCommandStatus()); + gatewayId = response.getMessageId(); + require(gatewayId != null && !gatewayId.isBlank(), "SMPP restart submit_sm_resp did not contain a message id"); + require(upstream.awaitSubmitCount(1), "Upstream SMPP server did not receive the restart test message"); + } finally { + upstream.setDeliveryReceiptDelayMillis(1_000); + } + + Thread.sleep(6_000); + stopContainer(containerName); + assertUnpushedDlrPersisted(workDir, gatewayId, "before restart"); + + Process replayContainer = startSendiumContainer(containerName, workDir); + waitForPort("localhost", SENDIUM_HTTP_PORT, TIMEOUT); + waitForPort("localhost", SENDIUM_SMPP_PORT, TIMEOUT); + require(upstream.awaitSessionBoundCount(2), "Sendium native container did not rebind to upstream after restart"); + + try (DownstreamSmppClient reconnectedClient = new DownstreamSmppClient()) { + reconnectedClient.start(); + DeliverSm deliverSm = reconnectedClient.awaitDeliverSm(); + require(deliverSm != null, "Reconnected downstream SMPP client did not receive persisted unpushed DLR"); + String body = new String(deliverSm.getShortMessage(), StandardCharsets.UTF_8); + require(body.contains("DELIVRD"), "Persisted unpushed DLR was not delivered: " + body); + } + + stopContainer(containerName); + replayContainer.destroyForcibly(); + assertUnpushedDlrRemoved(workDir, gatewayId, "after replay"); + Process container = startSendiumContainer(containerName, workDir); + waitForPort("localhost", SENDIUM_HTTP_PORT, TIMEOUT); + waitForPort("localhost", SENDIUM_SMPP_PORT, TIMEOUT); + require(upstream.awaitSessionBoundCount(3), "Sendium native container did not rebind to upstream after replay check"); + return container; + } + private static Process startSendiumContainer(String containerName, Path workDir) throws Exception { List command = List.of( "docker", "run", "--rm", "-d", @@ -250,6 +298,31 @@ private static void require(boolean condition, String message) { } } + private static void assertUnpushedDlrPersisted(Path workDir, String gatewayId, String phase) { + requireUnpushedDlrPresence(workDir, gatewayId, true, phase); + } + + private static void assertUnpushedDlrRemoved(Path workDir, String gatewayId, String phase) { + requireUnpushedDlrPresence(workDir, gatewayId, false, phase); + } + + private static void requireUnpushedDlrPresence(Path workDir, String gatewayId, boolean expectedPresent, String phase) { + Path dbPath = workDir.resolve("data").resolve("dlr-mvstore.db"); + require(Files.exists(dbPath), "DLR MVStore does not exist " + phase + ": " + dbPath); + try (MVStore store = new MVStore.Builder().fileName(dbPath.toAbsolutePath().toString()).readOnly().open()) { + Map dlrStore = store.openMap("unpushedDlrStore"); + Map dlrIndex = store.openMap("unpushedDlrIndex"); + boolean present = dlrStore.values().stream().anyMatch(value -> value.contains("\"serial\":\"" + gatewayId + "\"")); + require(present == expectedPresent, + "Unexpected unpushed DLR presence " + phase + " for gatewayId " + gatewayId + + ": " + present + " expected " + expectedPresent + + " storeSize=" + dlrStore.size() + " indexSize=" + dlrIndex.size()); + if (expectedPresent) { + require(dlrIndex.containsKey("smpp-user"), "Unpushed DLR index did not contain smpp-user " + phase); + } + } + } + private static final class CallbackServer implements AutoCloseable { private final CountDownLatch latch = new CountDownLatch(1); private final List queries = Collections.synchronizedList(new ArrayList<>()); @@ -291,10 +364,12 @@ public void close() { private static final class UpstreamSmppServer implements AutoCloseable { private final int port; private final AtomicInteger receivedSubmits = new AtomicInteger(); + private final AtomicInteger boundSessions = new AtomicInteger(); private final CountDownLatch sessionBound = new CountDownLatch(1); private final CountDownLatch firstTwoSubmits = new CountDownLatch(2); private final Set sessions = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final DefaultSmppServer server; + private volatile long deliveryReceiptDelayMillis = 1_000; private UpstreamSmppServer(int port) { this.port = port; @@ -327,8 +402,9 @@ public void sessionBindRequested(Long sessionId, SmppSessionConfiguration sessio @Override public void sessionCreated(Long sessionId, SmppServerSession session, - BaseBindResp preparedBindResponse) throws SmppProcessingException { + BaseBindResp preparedBindResponse) throws SmppProcessingException { sessions.add(session); + boundSessions.incrementAndGet(); sessionBound.countDown(); session.serverReady(sessionHandler); } @@ -370,9 +446,24 @@ private boolean awaitSessionBound() throws InterruptedException { return sessionBound.await(TIMEOUT.toSeconds(), TimeUnit.SECONDS); } + private boolean awaitSessionBoundCount(int expected) throws InterruptedException { + long deadline = System.nanoTime() + TIMEOUT.toNanos(); + while (System.nanoTime() < deadline) { + if (boundSessions.get() >= expected) { + return true; + } + Thread.sleep(500); + } + return false; + } + + private void setDeliveryReceiptDelayMillis(long deliveryReceiptDelayMillis) { + this.deliveryReceiptDelayMillis = deliveryReceiptDelayMillis; + } + private void sendDeliveryReceipt(SubmitSm submitSm, String messageId) { try { - Thread.sleep(1_000); + Thread.sleep(deliveryReceiptDelayMillis); DeliverSm deliverSm = new DeliverSm(); deliverSm.setDataCoding(SmppConstants.DATA_CODING_DEFAULT); deliverSm.setEsmClass(SmppConstants.ESM_CLASS_MT_SMSC_DELIVERY_RECEIPT);