Skip to content

Commit 44ad21d

Browse files
committed
Added Pulse facility on Projector
1 parent aafbe8a commit 44ad21d

5 files changed

Lines changed: 60 additions & 59 deletions

File tree

FunDomain.Persistence.NEventStore.Acceptance/InMemoryStore.fs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ open Uno // Card Builders
44
open Uno.Game // Commands, handle
55

66
open FunDomain // CommandHandler, Evolution.replay
7-
open FunDomain.Persistence.NEventStore.NesGateway // createInMemory, StreamId
8-
open FunDomain.Persistence.NEventStore // NesProjector
7+
open FunDomain.Persistence.NEventStore // Projector, StreamId
98

109
open Xunit
1110
open Swensen.Unquote
@@ -48,23 +47,24 @@ let gameStreamId (GameId no) = {Bucket=None; StreamId=string no }
4847
let [<Fact>] ``Can run a full round using NEventStore's InMemoryPersistence`` () =
4948
let domainHandler = CommandHandler.create replay handle
5049

51-
let store = createInMemory()
52-
let persistingHandler = domainHandler store.read store.append
53-
5450
let monitor = DirectionMonitor()
5551

52+
let store = NesGateway.createInMemory()
53+
let projector = Projector( store, 10, (fun batch ->
54+
batch.chooseOfUnion () |> Seq.iter (fun evt ->
55+
monitor.Post evt
56+
logger.Post evt)))
57+
58+
let persistingHandler = domainHandler store.read store.append
59+
5660
let gameId = GameId 42
5761
let streamId = gameStreamId gameId
5862
for action in fullGameActions gameId do
5963
printfn "Processing %A against Stream %A" action streamId
6064
action |> persistingHandler streamId
65+
projector.Pulse ()
6166

62-
NesProjector.start store 10 (fun batch ->
63-
batch.chooseOfUnion () |> Seq.iter (fun evt ->
64-
monitor.Post evt
65-
logger.Post evt))
66-
67-
Async.AwaitEvent NesProjector.sleeping
67+
Async.AwaitEvent projector.sleeping
6868
|> Async.RunSynchronously
6969
printfn "Projection queue empty"
7070

FunDomain.Persistence.NEventStore/FunDomain.Persistence.NEventStore.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
<Link>Serialization.fs</Link>
5858
</Compile>
5959
<Compile Include="NesGateway.fs" />
60-
<Compile Include="NesProjector.fs" />
60+
<Compile Include="Projector.fs" />
6161
</ItemGroup>
6262
<ItemGroup>
6363
<Reference Include="FSharp.Core, Version=$(TargetFSharpCoreVersion), Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a">

FunDomain.Persistence.NEventStore/NesGateway.fs

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module FunDomain.Persistence.NEventStore.NesGateway
1+
namespace FunDomain.Persistence.NEventStore
22

33
open FunDomain.Persistence.Serialization
44

@@ -24,7 +24,7 @@ type EncodedEventBatch (events) =
2424
cached |> Seq.choose (fun (ee:EncodedEvent) -> ee.deserializeUnionByCaseItemType<'a> ())
2525

2626
/// Wrapper yielded by create* functions with create/append functions matching FunDomain.CommandHandler requirements
27-
type Streamer private (inner') =
27+
type Store private (inner') =
2828
// Hoop jumping a la C++ pimpl pattern - if we don't do this, we're foisting an NEventStore package reference on all downstream users
2929
let inner : IPersistStreams = unbox inner'
3030
let defaultBucket bucketId = defaultArg bucketId "default"
@@ -89,7 +89,7 @@ type Streamer private (inner') =
8989
Some checkpoint
9090
batch |> Seq.fold dispatchElements None
9191

92-
static member internal wrap persister = Streamer( box persister)
92+
static member internal wrap persister = Store( box persister)
9393

9494
member this.read<'a> stream =
9595
let commits,version,_ =
@@ -110,24 +110,25 @@ type Streamer private (inner') =
110110
appendToStream stream metadata token encodedEvents
111111
|> Async.RunSynchronously
112112

113-
let createFromStore (inner:IStoreEvents) =
114-
inner.Advanced |> Streamer.wrap
115-
116-
let createInMemory () =
117-
Wireup.Init()
118-
.LogToOutputWindow()
119-
.UsingInMemoryPersistence()
120-
.UsingJsonSerialization()
121-
.Build()
122-
|> createFromStore
123-
124-
let createInMsSqlWithPerfCounters (connectionString:string) perfCounterSetName =
125-
Wireup.Init()
126-
.UsingSqlPersistence(connectionString)
127-
.WithDialect(new MsSqlDialect())
128-
.InitializeStorageEngine()
129-
.TrackPerformanceInstance(perfCounterSetName)
130-
.UsingJsonSerialization()
131-
.Compress()
132-
.Build()
133-
|> createFromStore
113+
module NesGateway =
114+
let createFromStore (inner:IStoreEvents) =
115+
inner.Advanced |> Store.wrap
116+
117+
let createInMemory () =
118+
Wireup.Init()
119+
.LogToOutputWindow()
120+
.UsingInMemoryPersistence()
121+
.UsingJsonSerialization()
122+
.Build()
123+
|> createFromStore
124+
125+
let createInMsSqlWithPerfCounters (connectionString:string) perfCounterSetName =
126+
Wireup.Init()
127+
.UsingSqlPersistence(connectionString)
128+
.WithDialect(new MsSqlDialect())
129+
.InitializeStorageEngine()
130+
.TrackPerformanceInstance(perfCounterSetName)
131+
.UsingJsonSerialization()
132+
.Compress()
133+
.Build()
134+
|> createFromStore

FunDomain.Persistence.NEventStore/NesProjector.fs

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace FunDomain.Persistence.NEventStore
2+
3+
open System.Threading
4+
5+
type Projector( store:Store, sleepMs, projection ) =
6+
let empty = new Event<unit>()
7+
let wakeEvent = new AutoResetEvent false
8+
let _ =
9+
MailboxProcessor.Start <|
10+
fun inbox ->
11+
let rec loop token = async {
12+
match token |> store.project projection with
13+
| Some token ->
14+
return! loop token
15+
| _ ->
16+
empty.Trigger ()
17+
let! _ = Async.AwaitWaitHandle (wakeEvent,sleepMs)
18+
return! loop token }
19+
20+
async {
21+
return! loop CheckpointToken.initial }
22+
member this.sleeping = empty.Publish
23+
member this.Pulse = wakeEvent.Set >> ignore

0 commit comments

Comments
 (0)