Skip to content

Commit 746ec92

Browse files
author
Developer
committed
Added more usage samples
1 parent 79ef877 commit 746ec92

File tree

4 files changed

+278
-69
lines changed

4 files changed

+278
-69
lines changed

samples/ByTech.EmbeddedCommitLog.LedgerSample/AccountSink.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@ namespace ByTech.EmbeddedCommitLog.LedgerSample;
1616
/// read-side view rebuilt by replaying a shared, ordered log.
1717
/// </para>
1818
/// <para>
19-
/// <see cref="Record.Header.SeqNo"/> is the record's global position in the pipeline —
19+
/// <see cref="Statement.SeqNo"/> is the record's global position in the pipeline —
2020
/// the same sequence number appears in every consumer's view of the log, making it
2121
/// straightforward to correlate entries across accounts.
2222
/// </para>
23+
/// <para>
24+
/// Pass <paramref name="initialBalance"/> when restarting a pipeline from a previously
25+
/// saved cursor position. The sink will project from that starting balance, receiving
26+
/// only the records that were posted after the cursor was last flushed to disk.
27+
/// </para>
2328
/// </remarks>
2429
public sealed class AccountSink : ISink
2530
{
@@ -31,7 +36,16 @@ public sealed record Statement(ulong SeqNo, LedgerEntry Entry, decimal RunningBa
3136
private decimal _balance;
3237

3338
/// <summary>Initialises the sink for the given account.</summary>
34-
public AccountSink(string accountId) => _accountId = accountId;
39+
/// <param name="accountId">Account ID to project (e.g. "ACC-001").</param>
40+
/// <param name="initialBalance">
41+
/// Starting balance carried forward from a previous pipeline run.
42+
/// Defaults to zero for a brand-new account.
43+
/// </param>
44+
public AccountSink(string accountId, decimal initialBalance = 0m)
45+
{
46+
_accountId = accountId;
47+
_balance = initialBalance;
48+
}
3549

3650
/// <summary>Account identifier this sink projects.</summary>
3751
public string AccountId => _accountId;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using ByTech.EmbeddedCommitLog.Consumer;
2+
using ByTech.EmbeddedCommitLog.Sinks;
3+
4+
namespace ByTech.EmbeddedCommitLog.LedgerSample;
5+
6+
/// <summary>
7+
/// Push-mode sink that records every entry in the general ledger without filtering.
8+
/// </summary>
9+
/// <remarks>
10+
/// Unlike <see cref="AccountSink"/>, this sink does not filter by account — it
11+
/// receives the full broadcast and stores every record in arrival order. The result
12+
/// is an immutable, ordered audit trail of the entire log, identical to what a
13+
/// human auditor would see when reading the segment files directly.
14+
/// </remarks>
15+
public sealed class AuditSink : ISink
16+
{
17+
/// <summary>Immutable snapshot of one audit log entry.</summary>
18+
public sealed record Entry(ulong SeqNo, LedgerEntry LedgerEntry);
19+
20+
private readonly List<Entry> _entries = [];
21+
22+
/// <summary>All entries received, in delivery order (which matches SeqNo order).</summary>
23+
public IReadOnlyList<Entry> Entries => _entries;
24+
25+
/// <inheritdoc/>
26+
public Task WriteAsync(IReadOnlyList<LogRecord> batch, CancellationToken ct)
27+
{
28+
foreach (LogRecord record in batch)
29+
{
30+
_entries.Add(new Entry(record.Header.SeqNo, LedgerEntry.Deserialize(record.Payload)));
31+
}
32+
33+
return Task.CompletedTask;
34+
}
35+
}

samples/ByTech.EmbeddedCommitLog.LedgerSample/LedgerEntry.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ public sealed record LedgerEntry(
2727
public byte[] Serialize() =>
2828
JsonSerializer.SerializeToUtf8Bytes(this, _jsonOptions);
2929

30-
/// <summary>Deserialises a <see cref="LedgerEntry"/> from a UTF-8 JSON payload byte array.</summary>
31-
public static LedgerEntry Deserialize(byte[] payload) =>
30+
/// <summary>Deserialises a <see cref="LedgerEntry"/> from a UTF-8 JSON payload.</summary>
31+
/// <remarks>
32+
/// Accepts <see cref="byte"/>[] (push-mode <c>LogRecord.Payload</c>) and
33+
/// <see cref="ReadOnlySpan{T}"/> (pull-mode <c>RecordReadResult.Payload.Span</c>)
34+
/// without allocating an intermediate array.
35+
/// </remarks>
36+
public static LedgerEntry Deserialize(ReadOnlySpan<byte> payload) =>
3237
JsonSerializer.Deserialize<LedgerEntry>(payload, _jsonOptions)!;
3338
}

0 commit comments

Comments
 (0)