diff --git a/pom.xml b/pom.xml
index c840c00..e217902 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,6 +32,7 @@
3.6.0
13.4.2
3.5.5
+ 0.8.13
false
${skipTests}
diff --git a/sendium-core/pom.xml b/sendium-core/pom.xml
index 87bf4d4..94155a0 100644
--- a/sendium-core/pom.xml
+++ b/sendium-core/pom.xml
@@ -103,6 +103,26 @@
true
+
+ org.jacoco
+ jacoco-maven-plugin
+ ${jacoco-plugin.version}
+
+
+ prepare-agent
+
+ prepare-agent
+
+
+
+ report
+ verify
+
+ report
+
+
+
+
maven-surefire-plugin
${surefire-plugin.version}
@@ -160,4 +180,4 @@
-
\ No newline at end of file
+
diff --git a/sendium-core/src/main/java/gr/cytech/sendium/routing/AbstractRoutingManager.java b/sendium-core/src/main/java/gr/cytech/sendium/routing/AbstractRoutingManager.java
index a16b266..c8a2265 100644
--- a/sendium-core/src/main/java/gr/cytech/sendium/routing/AbstractRoutingManager.java
+++ b/sendium-core/src/main/java/gr/cytech/sendium/routing/AbstractRoutingManager.java
@@ -150,6 +150,7 @@ public void getNextMessageInQueueAndRoute() {
if (pause) {
reEnqueueMessage(msg);
+ msg = null;
return;
}
RoutingLookupResult result = lookupRoutingForMessage(msg, targets.defaultTable);
@@ -284,4 +285,4 @@ public void run() {
logger.debug("_STOP_ PROCESSING....");
}
}
-}
\ No newline at end of file
+}
diff --git a/sendium-core/src/test/java/gr/cytech/sendium/routing/AbstractRoutingManagerTest.java b/sendium-core/src/test/java/gr/cytech/sendium/routing/AbstractRoutingManagerTest.java
new file mode 100644
index 0000000..4273294
--- /dev/null
+++ b/sendium-core/src/test/java/gr/cytech/sendium/routing/AbstractRoutingManagerTest.java
@@ -0,0 +1,168 @@
+package gr.cytech.sendium.routing;
+
+import gr.cytech.sendium.core.AbstractOutWorker;
+import gr.cytech.sendium.core.message.StandardMessage;
+import gr.cytech.sendium.external.filter.FilterException;
+import gr.cytech.sendium.external.filter.FilterStatusCodes;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class AbstractRoutingManagerTest {
+
+ @Test
+ void getNextMessageInQueueAndRoute_WhenPausedReEnqueuesWithoutRouting() {
+ TestRoutingManager manager = new TestRoutingManager();
+ StandardMessage msg = new StandardMessage();
+ manager.pause = true;
+ manager.incoming.add(msg);
+
+ manager.getNextMessageInQueueAndRoute();
+
+ assertEquals(0, manager.lookupCalls);
+ assertEquals(List.of(msg), manager.enqueuedToRouter);
+ }
+
+ @Test
+ void getNextMessageInQueueAndRoute_WhenNoRouteReEnqueuesMessage() {
+ TestRoutingManager manager = new TestRoutingManager();
+ StandardMessage msg = new StandardMessage();
+ manager.incoming.add(msg);
+ manager.lookupResult = new RoutingLookupResult(new ArrayList<>(), false);
+
+ manager.getNextMessageInQueueAndRoute();
+
+ assertEquals(1, manager.lookupCalls);
+ assertEquals(List.of(msg), manager.enqueuedToRouter);
+ }
+
+ @Test
+ void getNextMessageInQueueAndRoute_WhenFilterDropsMessageDoesNotReEnqueue() {
+ TestRoutingManager manager = new TestRoutingManager();
+ StandardMessage msg = new StandardMessage();
+ manager.incoming.add(msg);
+ manager.lookupThrowable = filterException(FilterStatusCodes.DROP, msg);
+
+ manager.getNextMessageInQueueAndRoute();
+
+ assertTrue(manager.enqueuedToRouter.isEmpty());
+ assertEquals(0, msg.rtxCnt);
+ }
+
+ @Test
+ void getNextMessageInQueueAndRoute_WhenFilterRequestsRetryIncrementsAndReEnqueues() {
+ TestRoutingManager manager = new TestRoutingManager();
+ StandardMessage msg = new StandardMessage();
+ manager.incoming.add(msg);
+ manager.lookupThrowable = filterException(FilterStatusCodes.RETRY, msg);
+
+ manager.getNextMessageInQueueAndRoute();
+
+ assertEquals(1, msg.rtxCnt);
+ assertEquals(List.of(msg), manager.enqueuedToRouter);
+ }
+
+ @Test
+ void getNextMessageInQueueAndRoute_WhenGenericExceptionExceedsRetryLimitMovesToUnexpectedFailure() {
+ TestRoutingManager manager = new TestRoutingManager();
+ StandardMessage msg = new StandardMessage();
+ msg.rtxCnt = 50;
+ manager.incoming.add(msg);
+ manager.lookupThrowable = new IOException("routing failed");
+
+ manager.getNextMessageInQueueAndRoute();
+
+ assertEquals(51, msg.rtxCnt);
+ assertEquals(List.of(msg), manager.unexpectedFailures);
+ assertTrue(manager.enqueuedToRouter.isEmpty());
+ }
+
+ @Test
+ void enqueueFailedToQueueRetriesAfterTransientEnqueueFailure() {
+ TestRoutingManager manager = new TestRoutingManager();
+ StandardMessage msg = new StandardMessage();
+ manager.failedq.add(msg);
+ manager.enqueueFailuresRemaining = 1;
+
+ manager.enqueueFailedToQueue();
+
+ assertEquals(2, manager.enqueueAttempts);
+ assertEquals(List.of(msg), manager.enqueuedToRouter);
+ assertTrue(manager.failedq.isEmpty());
+ }
+
+ private FilterException filterException(FilterStatusCodes statusCode, StandardMessage msg) {
+ AbstractOutWorker filter = mock(AbstractOutWorker.class);
+ when(filter.getFullName()).thenReturn("filter.test");
+ return new FilterException(filter, statusCode, msg, "filter result");
+ }
+
+ private static class TestRoutingManager extends AbstractRoutingManager {
+ private final ArrayDeque incoming = new ArrayDeque<>();
+ private final List enqueuedToRouter = new ArrayList<>();
+ private final List unexpectedFailures = new ArrayList<>();
+ private RoutingLookupResult lookupResult = RoutingLookupResult.EMPTY_RESULT;
+ private Throwable lookupThrowable;
+ private int lookupCalls;
+ private int enqueueFailuresRemaining;
+ private int enqueueAttempts;
+
+ private TestRoutingManager() {
+ RoutingTable defaultTable = new RoutingTable(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME, RoutingTable.TargetFunction.NORMAL);
+ targets = new RoutingTargets(java.util.Map.of(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME, defaultTable), java.util.Map.of());
+ }
+
+ @Override
+ protected StandardMessage getNextMessageToRoute() {
+ return incoming.poll();
+ }
+
+ @Override
+ protected void enqueueToRouterQueue(StandardMessage msg) throws InterruptedException {
+ enqueueAttempts++;
+ if (enqueueFailuresRemaining > 0) {
+ enqueueFailuresRemaining--;
+ throw new InterruptedException("transient enqueue failure");
+ }
+ enqueuedToRouter.add(msg);
+ }
+
+ @Override
+ protected RoutingLookupResult lookupRoutingForMessage(StandardMessage pMsg, RoutingTable table) throws IOException {
+ lookupCalls++;
+ if (lookupThrowable instanceof FilterException filterException) {
+ throw filterException;
+ }
+ if (lookupThrowable instanceof IOException ioException) {
+ throw ioException;
+ }
+ if (lookupThrowable instanceof RuntimeException runtimeException) {
+ throw runtimeException;
+ }
+ return lookupResult;
+ }
+
+ @Override
+ protected void handleMessageUnexpectedFailure(StandardMessage msg, Throwable e) {
+ unexpectedFailures.add(msg);
+ }
+
+ @Override
+ protected boolean getConfigBoolean(String[] prop) {
+ return false;
+ }
+
+ @Override
+ protected int getConfigInt(String[] prop) {
+ return 1;
+ }
+ }
+}
diff --git a/sendium-core/src/test/java/gr/cytech/sendium/routing/RoutingFileParserTest.java b/sendium-core/src/test/java/gr/cytech/sendium/routing/RoutingFileParserTest.java
new file mode 100644
index 0000000..c24174e
--- /dev/null
+++ b/sendium-core/src/test/java/gr/cytech/sendium/routing/RoutingFileParserTest.java
@@ -0,0 +1,79 @@
+package gr.cytech.sendium.routing;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class RoutingFileParserTest {
+
+ @Test
+ void parseRoutingTable_CreatesDefaultTableWithoutExplicitHeader() {
+ Map routes = RoutingFileParser.parseRoutingTable(List.of(
+ "worker:from:equals:sender"
+ ));
+
+ assertTrue(routes.containsKey(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME));
+ assertEquals(1, routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().size());
+ assertEquals("worker", routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().getFirst().getTarget());
+ }
+
+ @Test
+ void parseRoutingTable_AttachesCommentsToNextRuleOrTable() {
+ Map routes = RoutingFileParser.parseRoutingTable(List.of(
+ "#default route label",
+ "worker:from:equals:sender",
+ "#first line",
+ "#second line",
+ "[marketing]"
+ ));
+
+ RoutingRule rule = routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().getFirst();
+ RoutingTable table = routes.get("marketing");
+
+ assertEquals("default route label", rule.getLabel());
+ assertEquals("first line\nsecond line", table.getLabel());
+ }
+
+ @Test
+ void parseRoutingTable_ParsesFunctionTables() {
+ Map routes = RoutingFileParser.parseRoutingTable(List.of(
+ "[leastCost->function(LCR)]",
+ "worker::default:"
+ ));
+
+ RoutingTable table = routes.get("leastCost");
+
+ assertEquals(RoutingTable.TargetFunction.LCR, table.getTargetFunction());
+ assertEquals(1, table.getRules().size());
+ }
+
+ @Test
+ void parseRoutingTable_IgnoresBlankAndInvalidLines() {
+ Map routes = RoutingFileParser.parseRoutingTable(List.of(
+ "",
+ "not-a-route",
+ "worker::default:"
+ ));
+
+ RoutingTable defaultTable = routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME);
+
+ assertEquals(1, defaultTable.getRules().size());
+ assertFalse(defaultTable.getRules().stream().anyMatch(rule -> "not-a-route".equals(rule.toString())));
+ }
+
+ @Test
+ void parseRoutingTable_PreservesColonsInsideRuleValue() {
+ Map routes = RoutingFileParser.parseRoutingTable(List.of(
+ "worker:body:equals:https://example.test/dlr?id=1"
+ ));
+
+ RoutingRule rule = routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().getFirst();
+
+ assertEquals("https://example.test/dlr?id=1", rule.getConditions().getFirst().getValue());
+ }
+}
diff --git a/sendium-core/src/test/java/gr/cytech/sendium/routing/RoutingRuleTest.java b/sendium-core/src/test/java/gr/cytech/sendium/routing/RoutingRuleTest.java
index 2afb7e9..bf5fa6b 100644
--- a/sendium-core/src/test/java/gr/cytech/sendium/routing/RoutingRuleTest.java
+++ b/sendium-core/src/test/java/gr/cytech/sendium/routing/RoutingRuleTest.java
@@ -43,4 +43,66 @@ void marginPercentageGreaterThanRuleMatchesAboveThreshold() {
assertTrue(rule.matches(msg, null));
}
+
+ @Test
+ void copiedRoutePrefixMarksRuleAsCopiedAndStripsTargetPrefix() {
+ RoutingRule rule = new RoutingRule("+copyTarget:from:equals:sender", "copy label");
+
+ assertEquals("copyTarget", rule.getTarget());
+ assertEquals("+copyTarget:from:equals:sender", rule.toString());
+ assertEquals("copy label", rule.getLabel());
+ }
+
+ @Test
+ void multiConditionRuleRequiresEveryConditionToMatch() {
+ StandardMessage msg = new StandardMessage();
+ msg.from = "sender";
+ msg.to = "306900000000";
+
+ RoutingRule rule = new RoutingRule("target:from~~to:equals~~startsWith:sender~~3069", null);
+
+ assertTrue(rule.matches(msg, null));
+
+ msg.to = "447700000000";
+
+ assertFalse(rule.matches(msg, null));
+ }
+
+ @Test
+ void negatedStringPolicyInvertsMatch() {
+ StandardMessage msg = new StandardMessage();
+ msg.owner_id = "account-a";
+
+ RoutingRule rule = new RoutingRule("owner_id", "!equals", "account-b", "target", null);
+
+ assertTrue(rule.matches(msg, null));
+ }
+
+ @Test
+ void stringPoliciesCoverRegexPrefixSuffixAndNullChecks() {
+ StandardMessage msg = new StandardMessage();
+ msg.body = "Sendium route check";
+ msg.message_center = null;
+
+ assertTrue(new RoutingRule("body", "matches", "Sendium.*check", "target", null).matches(msg, null));
+ assertTrue(new RoutingRule("body", "startsWith", "Sendium", "target", null).matches(msg, null));
+ assertTrue(new RoutingRule("body", "endsWith", "check", "target", null).matches(msg, null));
+ assertTrue(new RoutingRule("message_center", "isNull", "", "target", null).matches(msg, null));
+ }
+
+ @Test
+ void ruleValueCanContainColonCharacters() {
+ StandardMessage msg = new StandardMessage();
+ msg.body = "https://example.test/dlr?id=1";
+
+ RoutingRule rule = new RoutingRule("target:body:equals:https://example.test/dlr?id=1", null);
+
+ assertTrue(rule.matches(msg, null));
+ }
+
+ @Test
+ void unknownFieldOrPolicyFailsFast() {
+ assertThrows(IllegalArgumentException.class, () -> new RoutingRule("unknown", "equals", "x", "target", null));
+ assertThrows(IllegalArgumentException.class, () -> new RoutingRule("body", "unknownPolicy", "x", "target", null));
+ }
}
diff --git a/sendium-core/src/test/java/gr/cytech/sendium/routing/StandardRoutingManagerTest.java b/sendium-core/src/test/java/gr/cytech/sendium/routing/StandardRoutingManagerTest.java
new file mode 100644
index 0000000..f20e6bc
--- /dev/null
+++ b/sendium-core/src/test/java/gr/cytech/sendium/routing/StandardRoutingManagerTest.java
@@ -0,0 +1,85 @@
+package gr.cytech.sendium.routing;
+
+import gr.cytech.sendium.core.AbstractOutWorker;
+import gr.cytech.sendium.core.message.StandardMessage;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class StandardRoutingManagerTest {
+
+ @Test
+ void parseNewRoutingTableIndexesWorkersByFullAndInstanceName() throws Exception {
+ StandardRoutingManager manager = new StandardRoutingManager();
+ AbstractOutWorker worker = acceptingWorker("smpp.route", "route");
+ Map routes = parseRoutes("route::default:");
+
+ manager.parseNewRoutingTable(routes, List.of(worker));
+
+ assertSame(worker, manager.getTargets().getWorker("smpp.route"));
+ assertSame(worker, manager.getTargets().getWorker("route"));
+ }
+
+ @Test
+ void parseNewRoutingTableKeepsPreviousTargetsWhenDefaultTableIsMissing() {
+ StandardRoutingManager manager = new StandardRoutingManager();
+ AbstractOutWorker worker = acceptingWorker("smpp.route", "route");
+ Map validRoutes = parseRoutes("route::default:");
+
+ manager.parseNewRoutingTable(validRoutes, List.of(worker));
+ AbstractRoutingManager.RoutingTargets previousTargets = manager.getTargets();
+
+ manager.parseNewRoutingTable(Map.of("other", new RoutingTable("other", RoutingTable.TargetFunction.NORMAL)), List.of(worker));
+
+ assertSame(previousTargets, manager.getTargets());
+ }
+
+ @Test
+ void lookupRoutingForMessageReturnsEmptyWhenTargetDoesNotExist() throws Exception {
+ StandardRoutingManager manager = new StandardRoutingManager();
+ manager.parseNewRoutingTable(parseRoutes("missing::default:"), List.of());
+
+ RoutingLookupResult result = manager.lookupRoutingForMessage(new StandardMessage(), manager.getTargets().defaultTable);
+
+ assertTrue(result.getDestinations().isEmpty());
+ assertTrue(!result.hasReachedLast());
+ }
+
+ @Test
+ void lookupRoutingForMessageResolvesRecursiveTableToWorker() throws Exception {
+ StandardRoutingManager manager = new StandardRoutingManager();
+ AbstractOutWorker worker = acceptingWorker("smpp.route", "route");
+ Map routes = parseRoutes(
+ "secondary::default:",
+ "[secondary]",
+ "route::default:"
+ );
+ manager.parseNewRoutingTable(routes, List.of(worker));
+
+ RoutingLookupResult result = manager.lookupRoutingForMessage(new StandardMessage(), manager.getTargets().defaultTable);
+
+ assertEquals(1, result.getDestinations().size());
+ assertSame(worker, result.getDestinations().getFirst());
+ assertTrue(result.hasReachedLast());
+ }
+
+ private Map parseRoutes(String... lines) {
+ return RoutingFileParser.parseRoutingTable(List.of(lines));
+ }
+
+ private AbstractOutWorker acceptingWorker(String fullName, String instanceName) {
+ AbstractOutWorker worker = mock(AbstractOutWorker.class);
+ when(worker.getFullName()).thenReturn(fullName);
+ when(worker.getInstanceName()).thenReturn(instanceName);
+ when(worker.isFilter()).thenReturn(false);
+ when(worker.acceptsMessages()).thenReturn(true);
+ return worker;
+ }
+}