Skip to content

Commit bfab089

Browse files
authored
Add NewStreamMessage wrapper and functions (#18)
1 parent 76a986d commit bfab089

File tree

6 files changed

+104
-56
lines changed

6 files changed

+104
-56
lines changed

src/SqlStreamStore.FSharp/Append.fs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace SqlStreamStore.FSharp
22

33
open FSharp.Prelude
44
open System.Threading
5-
open SqlStreamStore.Streams
5+
open SqlStreamStore
66

77
[<RequireQualifiedAccess>]
88
type AppendOption =
@@ -13,9 +13,10 @@ module Append =
1313
let streamMessages'
1414
(messages: NewStreamMessage list)
1515
(appendOptions: AppendOption list)
16-
: Stream -> AsyncResult<AppendResult, exn> =
16+
(Stream stream: Stream)
17+
: AsyncResult<Streams.AppendResult, exn> =
1718

18-
let mutable expectedVersion = ExpectedVersion.Any
19+
let mutable expectedVersion = Streams.ExpectedVersion.Any
1920
let mutable cancellationToken = Unchecked.defaultof<CancellationToken>
2021

2122
appendOptions
@@ -24,8 +25,10 @@ module Append =
2425
| AppendOption.CancellationToken token -> cancellationToken <- token
2526
| AppendOption.ExpectedVersion version -> expectedVersion <- version)
2627

27-
fun (Stream stream) ->
28-
stream.store.AppendToStream(stream.streamId, expectedVersion, List.toArray messages, cancellationToken)
28+
let messages' =
29+
List.map NewStreamMessage.toOriginalNewStreamMessage messages
2930

30-
let streamMessages (messages: NewStreamMessage list) : Stream -> AsyncResult<AppendResult, exn> =
31+
stream.store.AppendToStream(stream.streamId, expectedVersion, List.toArray messages', cancellationToken)
32+
33+
let streamMessages (messages: NewStreamMessage list) : Stream -> AsyncResult<Streams.AppendResult, exn> =
3134
streamMessages' messages []
File renamed without changes.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
namespace SqlStreamStore.FSharp
2+
3+
type NewStreamMessageInternal =
4+
{
5+
messageId: System.Guid option
6+
messageType: string
7+
jsonData: string
8+
jsonMetadata: string option
9+
}
10+
11+
type NewStreamMessage = private NewStreamMessage of NewStreamMessageInternal
12+
13+
module NewStreamMessage =
14+
15+
let create (messageType: string) (jsonData: string) : NewStreamMessage =
16+
NewStreamMessage
17+
{
18+
messageId = None
19+
messageType = messageType
20+
jsonData = jsonData
21+
jsonMetadata = None
22+
}
23+
24+
let withMessageId (messageId: System.Guid) (NewStreamMessage msg: NewStreamMessage) : NewStreamMessage =
25+
NewStreamMessage { msg with messageId = Some messageId }
26+
27+
let withJsonMetadata (jsonMetadata: string) (NewStreamMessage msg: NewStreamMessage) : NewStreamMessage =
28+
NewStreamMessage
29+
{ msg with
30+
jsonMetadata = Some jsonMetadata
31+
}
32+
33+
let internal toOriginalNewStreamMessage
34+
(NewStreamMessage msg: NewStreamMessage)
35+
: SqlStreamStore.Streams.NewStreamMessage =
36+
let id =
37+
match msg.messageId with
38+
| Some id -> id
39+
| None -> System.Guid.NewGuid()
40+
41+
match msg.jsonMetadata with
42+
| Some metadata -> SqlStreamStore.Streams.NewStreamMessage(id, msg.messageType, msg.jsonData, metadata)
43+
| None -> SqlStreamStore.Streams.NewStreamMessage(id, msg.messageType, msg.jsonData)
44+

src/SqlStreamStore.FSharp/Read.fs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type ReadAllOption =
3333

3434
module Read =
3535

36-
let partial' (readOptions: ReadPartialOption list) : Stream -> AsyncResult<ReadStreamPage, exn> =
36+
let partial' (readOptions: ReadPartialOption list) (Stream stream: Stream) : AsyncResult<ReadStreamPage, exn> =
3737

3838
let mutable cancellationToken = Unchecked.defaultof<CancellationToken>
3939
let mutable fromVersionInclusive : int option = None
@@ -59,25 +59,25 @@ module Read =
5959
| ReadDirection.Forward, Some index -> index
6060
| _ -> failwith "Illegal ReadDirection enum."
6161

62-
fun (Stream stream) ->
63-
match readDirection with
64-
| ReadDirection.Forward ->
65-
stream.store.ReadStreamForwards(
66-
stream.streamId,
67-
fromVersionInclusive',
68-
messageCount,
69-
prefetch,
70-
cancellationToken
71-
)
72-
| ReadDirection.Backward ->
73-
stream.store.ReadStreamBackwards(
74-
stream.streamId,
75-
fromVersionInclusive',
76-
messageCount,
77-
prefetch,
78-
cancellationToken
79-
)
80-
| _ -> failwith "Illegal ReadDirection enum."
62+
63+
match readDirection with
64+
| ReadDirection.Forward ->
65+
stream.store.ReadStreamForwards(
66+
stream.streamId,
67+
fromVersionInclusive',
68+
messageCount,
69+
prefetch,
70+
cancellationToken
71+
)
72+
| ReadDirection.Backward ->
73+
stream.store.ReadStreamBackwards(
74+
stream.streamId,
75+
fromVersionInclusive',
76+
messageCount,
77+
prefetch,
78+
cancellationToken
79+
)
80+
| _ -> failwith "Illegal ReadDirection enum."
8181

8282
let partial : Stream -> AsyncResult<ReadStreamPage, exn> = partial' []
8383

src/SqlStreamStore.FSharp/SqlStreamStore.FSharp.fsproj

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,22 @@
1212
</PropertyGroup>
1313

1414
<ItemGroup>
15-
<PackageReference Update="FSharp.Core" Version="5.0.0"/>
16-
<PackageReference Include="FSharp.Core" Version="5.0.0"/>
17-
<PackageReference Include="FSharp.Prelude" Version="4.0.3"/>
18-
<PackageReference Include="Npgsql" Version="5.0.4"/>
19-
<PackageReference Include="SqlStreamStore" Version="1.2.0-beta.8"/>
15+
<PackageReference Update="FSharp.Core" Version="5.0.0" />
16+
<PackageReference Include="FSharp.Core" Version="5.0.0" />
17+
<PackageReference Include="FSharp.Prelude" Version="4.0.3" />
18+
<PackageReference Include="Npgsql" Version="5.0.4" />
19+
<PackageReference Include="SqlStreamStore" Version="1.2.0-beta.8" />
2020
</ItemGroup>
2121

2222
<ItemGroup>
23-
<Compile Include="SqlStreamStoreExtensions.fs"/>
24-
<Compile Include="Connect.fs"/>
25-
<Compile Include="Read.fs"/>
26-
<Compile Include="Get.fs"/>
27-
<Compile Include="Subscribe.fs"/>
28-
<Folder Include="SqlStreamStore.FSharp"/>
29-
<Compile Include="Append.fs"/>
23+
<Compile Include="Extensions.fs" />
24+
<Compile Include="NewStreamMessage.fs" />
25+
<Compile Include="Connect.fs" />
26+
<Compile Include="Read.fs" />
27+
<Compile Include="Get.fs" />
28+
<Compile Include="Subscribe.fs" />
29+
<Folder Include="SqlStreamStore.FSharp" />
30+
<Compile Include="Append.fs" />
3031
</ItemGroup>
3132

3233
</Project>

src/SqlStreamStore.FSharp/Subscribe.fs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ module Subscribe =
2626
(subscriptionName: string)
2727
(continueAfterVersion: int)
2828
(streamMessageReceived: IStreamSubscription -> StreamMessage -> CancellationToken -> AsyncResult<_, _>)
29-
(streamSubOption: StreamSubOption list)
30-
: Stream -> IStreamSubscription =
29+
(streamSubOptions: StreamSubOption list)
30+
(Stream stream: Stream)
31+
: IStreamSubscription =
3132

3233
let mutable hasCaughtUp = null
3334
let mutable maxCountPerRead = 10
@@ -43,28 +44,27 @@ module Subscribe =
4344

4445
StreamMessageReceived subs
4546

46-
streamSubOption
47+
streamSubOptions
4748
|> List.iter
4849
(function
4950
| StreamSubOption.HasCaughtUp f -> hasCaughtUp <- HasCaughtUp f
5051
| StreamSubOption.MaxCountPerRead n -> maxCountPerRead <- n
5152
| StreamSubOption.NoPrefetch -> prefetch <- false
5253
| StreamSubOption.SubscriptionDropped f -> subscriptionDropped <- SubscriptionDropped f)
5354

54-
fun (Stream stream) ->
55-
let sub =
56-
stream.store.SubscribeToStream(
57-
streamId = StreamId stream.streamId,
58-
continueAfterVersion = System.Nullable continueAfterVersion,
59-
streamMessageReceived = streamMessageReceived',
60-
subscriptionDropped = subscriptionDropped,
61-
hasCaughtUp = hasCaughtUp,
62-
prefetchJsonData = prefetch,
63-
name = subscriptionName
64-
)
55+
let sub =
56+
stream.store.SubscribeToStream(
57+
streamId = StreamId stream.streamId,
58+
continueAfterVersion = System.Nullable continueAfterVersion,
59+
streamMessageReceived = streamMessageReceived',
60+
subscriptionDropped = subscriptionDropped,
61+
hasCaughtUp = hasCaughtUp,
62+
prefetchJsonData = prefetch,
63+
name = subscriptionName
64+
)
6565

66-
sub.MaxCountPerRead <- maxCountPerRead
67-
sub
66+
sub.MaxCountPerRead <- maxCountPerRead
67+
sub
6868

6969
let toStreamMessages
7070
(subscriptionName: string)
@@ -77,7 +77,7 @@ module Subscribe =
7777
(subscriptionName: string)
7878
(continueAfterPosition: int64)
7979
(streamMessageReceived: IAllStreamSubscription -> StreamMessage -> CancellationToken -> AsyncResult<_, _>)
80-
(streamSubOption: AllStreamSubOption list)
80+
(streamSubOptions: AllStreamSubOption list)
8181
: IStreamStore -> IAllStreamSubscription =
8282

8383
let mutable hasCaughtUp = null
@@ -94,7 +94,7 @@ module Subscribe =
9494

9595
AllStreamMessageReceived subs
9696

97-
streamSubOption
97+
streamSubOptions
9898
|> List.iter
9999
(function
100100
| AllStreamSubOption.HasCaughtUp f -> hasCaughtUp <- HasCaughtUp f

0 commit comments

Comments
 (0)