Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<checkstyle-plugin.version>3.6.0</checkstyle-plugin.version>
<checkstyle-tools.version>13.4.2</checkstyle-tools.version>
<surefire-plugin.version>3.5.5</surefire-plugin.version>
<jacoco-plugin.version>0.8.13</jacoco-plugin.version>

<skipTests>false</skipTests>
<skipUT>${skipTests}</skipUT>
Expand Down
22 changes: 21 additions & 1 deletion sendium-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,26 @@
<parameters>true</parameters>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco-plugin.version}</version>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>verify</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
Expand Down Expand Up @@ -160,4 +180,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void getNextMessageInQueueAndRoute() {

if (pause) {
reEnqueueMessage(msg);
msg = null;
return;
}
RoutingLookupResult result = lookupRoutingForMessage(msg, targets.defaultTable);
Expand Down Expand Up @@ -284,4 +285,4 @@ public void run() {
logger.debug("_STOP_ PROCESSING....");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package gr.cytech.sendium.routing;

import gr.cytech.sendium.core.AbstractOutWorker;
import gr.cytech.sendium.core.message.StandardMessage;
import gr.cytech.sendium.external.filter.FilterException;
import gr.cytech.sendium.external.filter.FilterStatusCodes;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractRoutingManagerTest {

@Test
void getNextMessageInQueueAndRoute_WhenPausedReEnqueuesWithoutRouting() {
TestRoutingManager manager = new TestRoutingManager();
StandardMessage msg = new StandardMessage();
manager.pause = true;
manager.incoming.add(msg);

manager.getNextMessageInQueueAndRoute();

assertEquals(0, manager.lookupCalls);
assertEquals(List.of(msg), manager.enqueuedToRouter);
}

@Test
void getNextMessageInQueueAndRoute_WhenNoRouteReEnqueuesMessage() {
TestRoutingManager manager = new TestRoutingManager();
StandardMessage msg = new StandardMessage();
manager.incoming.add(msg);
manager.lookupResult = new RoutingLookupResult(new ArrayList<>(), false);

manager.getNextMessageInQueueAndRoute();

assertEquals(1, manager.lookupCalls);
assertEquals(List.of(msg), manager.enqueuedToRouter);
}

@Test
void getNextMessageInQueueAndRoute_WhenFilterDropsMessageDoesNotReEnqueue() {
TestRoutingManager manager = new TestRoutingManager();
StandardMessage msg = new StandardMessage();
manager.incoming.add(msg);
manager.lookupThrowable = filterException(FilterStatusCodes.DROP, msg);

manager.getNextMessageInQueueAndRoute();

assertTrue(manager.enqueuedToRouter.isEmpty());
assertEquals(0, msg.rtxCnt);
}

@Test
void getNextMessageInQueueAndRoute_WhenFilterRequestsRetryIncrementsAndReEnqueues() {
TestRoutingManager manager = new TestRoutingManager();
StandardMessage msg = new StandardMessage();
manager.incoming.add(msg);
manager.lookupThrowable = filterException(FilterStatusCodes.RETRY, msg);

manager.getNextMessageInQueueAndRoute();

assertEquals(1, msg.rtxCnt);
assertEquals(List.of(msg), manager.enqueuedToRouter);
}

@Test
void getNextMessageInQueueAndRoute_WhenGenericExceptionExceedsRetryLimitMovesToUnexpectedFailure() {
TestRoutingManager manager = new TestRoutingManager();
StandardMessage msg = new StandardMessage();
msg.rtxCnt = 50;
manager.incoming.add(msg);
manager.lookupThrowable = new IOException("routing failed");

manager.getNextMessageInQueueAndRoute();

assertEquals(51, msg.rtxCnt);
assertEquals(List.of(msg), manager.unexpectedFailures);
assertTrue(manager.enqueuedToRouter.isEmpty());
}

@Test
void enqueueFailedToQueueRetriesAfterTransientEnqueueFailure() {
TestRoutingManager manager = new TestRoutingManager();
StandardMessage msg = new StandardMessage();
manager.failedq.add(msg);
manager.enqueueFailuresRemaining = 1;

manager.enqueueFailedToQueue();

assertEquals(2, manager.enqueueAttempts);
assertEquals(List.of(msg), manager.enqueuedToRouter);
assertTrue(manager.failedq.isEmpty());
}

private FilterException filterException(FilterStatusCodes statusCode, StandardMessage msg) {
AbstractOutWorker<StandardMessage> filter = mock(AbstractOutWorker.class);
when(filter.getFullName()).thenReturn("filter.test");
return new FilterException(filter, statusCode, msg, "filter result");
}

private static class TestRoutingManager extends AbstractRoutingManager<StandardMessage> {
private final ArrayDeque<StandardMessage> incoming = new ArrayDeque<>();
private final List<StandardMessage> enqueuedToRouter = new ArrayList<>();
private final List<StandardMessage> unexpectedFailures = new ArrayList<>();
private RoutingLookupResult lookupResult = RoutingLookupResult.EMPTY_RESULT;
private Throwable lookupThrowable;
private int lookupCalls;
private int enqueueFailuresRemaining;
private int enqueueAttempts;

private TestRoutingManager() {
RoutingTable defaultTable = new RoutingTable(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME, RoutingTable.TargetFunction.NORMAL);
targets = new RoutingTargets(java.util.Map.of(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME, defaultTable), java.util.Map.of());
}

@Override
protected StandardMessage getNextMessageToRoute() {
return incoming.poll();
}

@Override
protected void enqueueToRouterQueue(StandardMessage msg) throws InterruptedException {
enqueueAttempts++;
if (enqueueFailuresRemaining > 0) {
enqueueFailuresRemaining--;
throw new InterruptedException("transient enqueue failure");
}
enqueuedToRouter.add(msg);
}

@Override
protected RoutingLookupResult lookupRoutingForMessage(StandardMessage pMsg, RoutingTable table) throws IOException {
lookupCalls++;
if (lookupThrowable instanceof FilterException filterException) {
throw filterException;
}
if (lookupThrowable instanceof IOException ioException) {
throw ioException;
}
if (lookupThrowable instanceof RuntimeException runtimeException) {
throw runtimeException;
}
return lookupResult;
}

@Override
protected void handleMessageUnexpectedFailure(StandardMessage msg, Throwable e) {
unexpectedFailures.add(msg);
}

@Override
protected boolean getConfigBoolean(String[] prop) {
return false;
}

@Override
protected int getConfigInt(String[] prop) {
return 1;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package gr.cytech.sendium.routing;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class RoutingFileParserTest {

@Test
void parseRoutingTable_CreatesDefaultTableWithoutExplicitHeader() {
Map<String, RoutingTable> routes = RoutingFileParser.parseRoutingTable(List.of(
"worker:from:equals:sender"
));

assertTrue(routes.containsKey(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME));
assertEquals(1, routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().size());
assertEquals("worker", routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().getFirst().getTarget());
}

@Test
void parseRoutingTable_AttachesCommentsToNextRuleOrTable() {
Map<String, RoutingTable> routes = RoutingFileParser.parseRoutingTable(List.of(
"#default route label",
"worker:from:equals:sender",
"#first line",
"#second line",
"[marketing]"
));

RoutingRule rule = routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().getFirst();
RoutingTable table = routes.get("marketing");

assertEquals("default route label", rule.getLabel());
assertEquals("first line\nsecond line", table.getLabel());
}

@Test
void parseRoutingTable_ParsesFunctionTables() {
Map<String, RoutingTable> routes = RoutingFileParser.parseRoutingTable(List.of(
"[leastCost->function(LCR)]",
"worker::default:"
));

RoutingTable table = routes.get("leastCost");

assertEquals(RoutingTable.TargetFunction.LCR, table.getTargetFunction());
assertEquals(1, table.getRules().size());
}

@Test
void parseRoutingTable_IgnoresBlankAndInvalidLines() {
Map<String, RoutingTable> routes = RoutingFileParser.parseRoutingTable(List.of(
"",
"not-a-route",
"worker::default:"
));

RoutingTable defaultTable = routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME);

assertEquals(1, defaultTable.getRules().size());
assertFalse(defaultTable.getRules().stream().anyMatch(rule -> "not-a-route".equals(rule.toString())));
}

@Test
void parseRoutingTable_PreservesColonsInsideRuleValue() {
Map<String, RoutingTable> routes = RoutingFileParser.parseRoutingTable(List.of(
"worker:body:equals:https://example.test/dlr?id=1"
));

RoutingRule rule = routes.get(RoutingFileParser.DEFAULT_ROUTING_TABLE_NAME).getRules().getFirst();

assertEquals("https://example.test/dlr?id=1", rule.getConditions().getFirst().getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,66 @@ void marginPercentageGreaterThanRuleMatchesAboveThreshold() {

assertTrue(rule.matches(msg, null));
}

@Test
void copiedRoutePrefixMarksRuleAsCopiedAndStripsTargetPrefix() {
RoutingRule rule = new RoutingRule("+copyTarget:from:equals:sender", "copy label");

assertEquals("copyTarget", rule.getTarget());
assertEquals("+copyTarget:from:equals:sender", rule.toString());
assertEquals("copy label", rule.getLabel());
}

@Test
void multiConditionRuleRequiresEveryConditionToMatch() {
StandardMessage msg = new StandardMessage();
msg.from = "sender";
msg.to = "306900000000";

RoutingRule rule = new RoutingRule("target:from~~to:equals~~startsWith:sender~~3069", null);

assertTrue(rule.matches(msg, null));

msg.to = "447700000000";

assertFalse(rule.matches(msg, null));
}

@Test
void negatedStringPolicyInvertsMatch() {
StandardMessage msg = new StandardMessage();
msg.owner_id = "account-a";

RoutingRule rule = new RoutingRule("owner_id", "!equals", "account-b", "target", null);

assertTrue(rule.matches(msg, null));
}

@Test
void stringPoliciesCoverRegexPrefixSuffixAndNullChecks() {
StandardMessage msg = new StandardMessage();
msg.body = "Sendium route check";
msg.message_center = null;

assertTrue(new RoutingRule("body", "matches", "Sendium.*check", "target", null).matches(msg, null));
assertTrue(new RoutingRule("body", "startsWith", "Sendium", "target", null).matches(msg, null));
assertTrue(new RoutingRule("body", "endsWith", "check", "target", null).matches(msg, null));
assertTrue(new RoutingRule("message_center", "isNull", "", "target", null).matches(msg, null));
}

@Test
void ruleValueCanContainColonCharacters() {
StandardMessage msg = new StandardMessage();
msg.body = "https://example.test/dlr?id=1";

RoutingRule rule = new RoutingRule("target:body:equals:https://example.test/dlr?id=1", null);

assertTrue(rule.matches(msg, null));
}

@Test
void unknownFieldOrPolicyFailsFast() {
assertThrows(IllegalArgumentException.class, () -> new RoutingRule("unknown", "equals", "x", "target", null));
assertThrows(IllegalArgumentException.class, () -> new RoutingRule("body", "unknownPolicy", "x", "target", null));
}
}
Loading
Loading