Skip to content

Commit e637ae8

Browse files
committed
fix: add sort guard and stale-event detection in ApiPollerWorker
1 parent a9a1caf commit e637ae8

7 files changed

Lines changed: 97 additions & 12 deletions

File tree

docs/local-setup.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ docker exec -it $(docker ps -q -f ancestor=mcr.microsoft.com/azure-sql-edge) \
2626
-Q "CREATE DATABASE ScanEvents"
2727
```
2828

29-
The worker auto-creates the `ProcessingState` and `ParcelSummary` tables on startup via `DatabaseInitializer`.
29+
The worker auto-creates the `ProcessingState` and `ParcelSummary` tables on startup via `DatabaseInitialiser`.
3030

3131
### 3. Set dummy AWS credentials (LocalStack doesn't validate them)
3232

src/ScanEventWorker/Infrastructure/ApiClient/ApiJsonContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace ScanEventWorker.Infrastructure.ApiClient;
55

6-
// CamelCase policy applies to ScanEvent (SQS serialization/deserialization) - symmetrical both ways.
6+
// CamelCase policy applies to ScanEvent (SQS serialisation/deserialisation) - symmetrical both ways.
77
// ScanEventApiResponse/ScanEventDto use explicit [JsonPropertyName] overrides and are unaffected.
88
// Do NOT add [JsonPropertyName] to ScanEvent properties - they inherit camelCase from this policy.
99
[JsonSerializable(typeof(ScanEventApiResponse))]

src/ScanEventWorker/Infrastructure/Persistence/DatabaseInitializer.cs renamed to src/ScanEventWorker/Infrastructure/Persistence/DatabaseInitialiser.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
namespace ScanEventWorker.Infrastructure.Persistence;
55

66
[DapperAot]
7-
public sealed class DatabaseInitializer(string connectionString, ILogger<DatabaseInitializer> logger)
7+
public sealed class DatabaseInitialiser(string connectionString, ILogger<DatabaseInitialiser> logger)
88
{
9-
public async Task InitializeAsync(CancellationToken ct)
9+
public async Task InitialiseAsync(CancellationToken ct)
1010
{
11-
logger.LogInformation("Initializing database schema");
11+
logger.LogInformation("Initialising database schema");
1212

1313
await using var connection = new SqlConnection(connectionString);
1414
await connection.OpenAsync(ct);
@@ -42,6 +42,6 @@ DeliveredAtUtc DATETIMEOFFSET NULL
4242
END
4343
""", cancellationToken: ct));
4444

45-
logger.LogInformation("Database schema initialized");
45+
logger.LogInformation("Database schema initialised");
4646
}
4747
}

src/ScanEventWorker/Infrastructure/Persistence/ScanEventRepository.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public async Task<long> GetLastEventIdAsync(CancellationToken ct)
2222
{
2323
logger.LogWarning(
2424
"ProcessingState row not found - defaulting LastEventId to 1. " +
25-
"Re-run DatabaseInitializer or INSERT INTO ProcessingState (Id, LastEventId) VALUES (1, 1).");
25+
"Re-run DatabaseInitialiser or INSERT INTO ProcessingState (Id, LastEventId) VALUES (1, 1).");
2626
return 1L;
2727
}
2828

src/ScanEventWorker/Program.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@
77
using ScanEventWorker.Services;
88
using ScanEventWorker.Workers;
99

10+
// Prevent multiple instances from running simultaneously (see Assumptions: single-instance constraint).
11+
// The Mutex is held for the lifetime of the process; released automatically on process exit.
12+
using var instanceMutex = new Mutex(initiallyOwned: true, name: "Global\\ScanEventWorker_Instance", out bool mutexAcquired);
13+
if (!mutexAcquired)
14+
{
15+
// Another instance is already running — log to stderr and exit cleanly.
16+
Console.Error.WriteLine("FATAL: Another ScanEventWorker instance is already running. Exiting.");
17+
return 1;
18+
}
19+
1020
HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
1121

1222
// Configuration
@@ -30,7 +40,7 @@
3040

3141
// Infrastructure — Persistence
3242
builder.Services.AddSingleton(sp =>
33-
new DatabaseInitializer(connectionString, sp.GetRequiredService<ILogger<DatabaseInitializer>>()));
43+
new DatabaseInitialiser(connectionString, sp.GetRequiredService<ILogger<DatabaseInitialiser>>()));
3444
builder.Services.AddSingleton<IScanEventRepository>(sp =>
3545
new ScanEventRepository(
3646
connectionString,
@@ -69,8 +79,10 @@
6979

7080
IHost host = builder.Build();
7181

72-
// Initialize database schema on startup
73-
DatabaseInitializer dbInitializer = host.Services.GetRequiredService<DatabaseInitializer>();
74-
await dbInitializer.InitializeAsync(CancellationToken.None);
82+
// Initialise database schema on startup
83+
DatabaseInitialiser dbInitialiser = host.Services.GetRequiredService<DatabaseInitialiser>();
84+
await dbInitialiser.InitialiseAsync(CancellationToken.None);
7585

7686
host.Run();
87+
88+
return 0;

src/ScanEventWorker/Workers/ApiPollerWorker.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,31 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4949
continue;
5050
}
5151

52+
// Guard: sort defensively if API returns events out of EventId order (Assumption 1)
53+
if (events.Count > 1 && events.Zip(events.Skip(1)).Any(pair => pair.First.EventId.Value > pair.Second.EventId.Value))
54+
{
55+
logger.LogWarning(
56+
"API returned {Count} events out of EventId order — sorting defensively",
57+
events.Count);
58+
events = [.. events.OrderBy(e => e.EventId.Value)];
59+
}
60+
61+
// Guard: warn if API returns events older than lastEventId (Assumption 2)
62+
int staleCount = events.Count(e => e.EventId.Value < lastEventId);
63+
if (staleCount > 0)
64+
{
65+
logger.LogWarning(
66+
"API returned {StaleCount} stale events with EventId < {FromId} — possible FromEventId contract violation",
67+
staleCount,
68+
lastEventId);
69+
}
70+
5271
foreach (ScanEvent scanEvent in events)
5372
{
5473
await messageQueue.SendAsync(scanEvent, stoppingToken);
5574
}
5675

57-
long maxEventId = events[^1].EventId.Value;
76+
long maxEventId = Math.Max(lastEventId, events[^1].EventId.Value);
5877
await repository.UpdateLastEventIdAsync(maxEventId, stoppingToken);
5978
lastEventId = maxEventId;
6079

tests/ScanEventWorker.Tests/Workers/ApiPollerWorkerTests.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,58 @@ public async Task WhenApiReturnsEvents_CallsUpdateExactlyOnce()
210210

211211
await _repository.Received(1).UpdateLastEventIdAsync(Arg.Any<long>(), Arg.Any<CancellationToken>());
212212
}
213+
214+
[Fact(Timeout = 5000)]
215+
public async Task WhenApiReturnsOutOfOrderEvents_SortsDefensivelyAndAdvancesCorrectly()
216+
{
217+
var cts = new CancellationTokenSource();
218+
// Deliberately out of order: 10 first, then 5
219+
var events = new List<ScanEvent> { MakeScanEvent(10), MakeScanEvent(5) };
220+
221+
_ = _repository.GetLastEventIdAsync(Arg.Any<CancellationToken>()).Returns(0L);
222+
_ = _apiClient.GetScanEventsAsync(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
223+
.Returns(Result<IReadOnlyList<ScanEvent>>.Success(events));
224+
_ = _repository.UpdateLastEventIdAsync(Arg.Any<long>(), Arg.Any<CancellationToken>())
225+
.Returns(callInfo =>
226+
{
227+
cts.Cancel();
228+
return Task.CompletedTask;
229+
});
230+
231+
ApiPollerWorker worker = CreateWorker();
232+
await worker.StartAsync(cts.Token);
233+
await worker.ExecuteTask!;
234+
await worker.StopAsync(CancellationToken.None);
235+
236+
// After sort [5, 10], events[^1] is EventId=10 — the correct max
237+
await _repository.Received(1).UpdateLastEventIdAsync(10L, Arg.Any<CancellationToken>());
238+
}
239+
240+
[Fact(Timeout = 5000)]
241+
public async Task WhenApiReturnsStaleEvents_ContinuesProcessingNormally()
242+
{
243+
var cts = new CancellationTokenSource();
244+
// Both events are older than lastEventId=20
245+
var events = new List<ScanEvent> { MakeScanEvent(5), MakeScanEvent(10) };
246+
247+
_ = _repository.GetLastEventIdAsync(Arg.Any<CancellationToken>()).Returns(20L);
248+
_ = _apiClient.GetScanEventsAsync(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
249+
.Returns(Result<IReadOnlyList<ScanEvent>>.Success(events));
250+
_ = _repository.UpdateLastEventIdAsync(Arg.Any<long>(), Arg.Any<CancellationToken>())
251+
.Returns(callInfo =>
252+
{
253+
cts.Cancel();
254+
return Task.CompletedTask;
255+
});
256+
257+
ApiPollerWorker worker = CreateWorker();
258+
await worker.StartAsync(cts.Token);
259+
await worker.ExecuteTask!;
260+
await worker.StopAsync(CancellationToken.None);
261+
262+
// Stale events are processed normally — idempotent MERGE handles dedup.
263+
// lastEventId must not regress: advance marker stays at 20, not events[^1]=10.
264+
await _repository.Received(1).UpdateLastEventIdAsync(20L, Arg.Any<CancellationToken>());
265+
await _queue.Received(2).SendAsync(Arg.Any<ScanEvent>(), Arg.Any<CancellationToken>());
266+
}
213267
}

0 commit comments

Comments
 (0)