Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions docs/api/appending-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,64 @@ const credentials = {
await client.appendToStream("some-stream", event, {
credentials,
});
```

## Append to multiple streams

::: note
This feature is only available in KurrentDB 25.1 and later.
:::

You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.

::: warning
Currently, metadata must be valid JSON. Binary metadata will not be supported in
this version. This limitation ensures compatibility with KurrentDB's metadata
handling and will be removed in the next major release.
:::

```ts
import { jsonEvent } from "@kurrent/kurrentdb-client";
import { v4 as uuid } from "uuid";

const metadata = {
timestamp: new Date().toISOString(),
source: "OrderProcessingSystem",
version: 1.0
};

const requests = [
{
streamName: "order-stream-1",
expectedState: "any",
events: [
jsonEvent({
id: uuid(),
type: "OrderCreated",
data: {
orderId: "12345",
amount: 99.99
},
metadata
})
]
},
{
streamName: "inventory-stream-1",
expectedState: "any",
events: [
jsonEvent({
id: uuid(),
type: "ItemReserved",
data: {
itemId: "ABC123",
quantity: 2
},
metadata
})
]
}
];

await client.multiStreamAppend(requests);
```
21 changes: 21 additions & 0 deletions docs/api/persistent-subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,27 @@ The main aim of this strategy is to decrease the likelihood of concurrency and
ordering issues while maintaining load balancing. This is **not a guarantee**,
and you should handle the usual ordering and concurrency issues.

### PinnedByCorrelation

The PinnedByCorrelation strategy is a consumer strategy available for persistent subscriptions
It ensures that events with the same correlation id are consistently delivered to the same
consumer within a subscription group.

:::note
This strategy requires database version 21.10.1 or later. You can only create a persistent subscription
with this strategy. To change the strategy, you must delete the existing subscription and create a
new one with the desired settings.
:::

## Updating a subscription group

You can edit the settings of an existing subscription group while it is running,
you don't need to delete and recreate it to change settings. When you update the
subscription group, it resets itself internally, dropping the connections and
having them reconnect. You must have admin permissions to update a persistent
subscription group.


## Updating a subscription group

You can edit the settings of an existing subscription group while it is running,
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
"devDependencies": {
"@tsconfig/node18": "^18.2.4",
"@types/node": "18.19.76",
"@types/semver": "^7.7.0",
"@typescript-eslint/eslint-plugin": "^8.10.0",
"@typescript-eslint/parser": "^8.10.0",
"cross-env": "^7.0.3",
"eslint": "^8.56.0",
"eslint-plugin-tsdoc": "^0.2.17",
"nx": "20.1.3",
"prettier": "^2.8.8",
"semver": "^7.6.3",
"semver": "^7.7.2",
"typescript": "^5.6.3"
}
}
1 change: 1 addition & 0 deletions packages/db-client/generated/errors_grpc_pb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
// GENERATED CODE -- NO SERVICES IN PROTO
199 changes: 199 additions & 0 deletions packages/db-client/generated/errors_pb.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// package: kurrentdb.protocol.v2.streams.errors
// file: kurrentdb/protocols/v2/streams/errors.proto

/* tslint:disable */
/* eslint-disable */

import * as jspb from "google-protobuf";
import * as kurrentdb_protocols_v2_rpc_pb from "../../../../kurrentdb/protocols/v2/rpc_pb";

export class StreamNotFoundErrorDetails extends jspb.Message {
getStream(): string;
setStream(value: string): StreamNotFoundErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): StreamNotFoundErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: StreamNotFoundErrorDetails): StreamNotFoundErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: StreamNotFoundErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): StreamNotFoundErrorDetails;
static deserializeBinaryFromReader(message: StreamNotFoundErrorDetails, reader: jspb.BinaryReader): StreamNotFoundErrorDetails;
}

export namespace StreamNotFoundErrorDetails {
export type AsObject = {
stream: string,
}
}

export class StreamAlreadyExistsErrorDetails extends jspb.Message {
getStream(): string;
setStream(value: string): StreamAlreadyExistsErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): StreamAlreadyExistsErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: StreamAlreadyExistsErrorDetails): StreamAlreadyExistsErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: StreamAlreadyExistsErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): StreamAlreadyExistsErrorDetails;
static deserializeBinaryFromReader(message: StreamAlreadyExistsErrorDetails, reader: jspb.BinaryReader): StreamAlreadyExistsErrorDetails;
}

export namespace StreamAlreadyExistsErrorDetails {
export type AsObject = {
stream: string,
}
}

export class StreamDeletedErrorDetails extends jspb.Message {
getStream(): string;
setStream(value: string): StreamDeletedErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): StreamDeletedErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: StreamDeletedErrorDetails): StreamDeletedErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: StreamDeletedErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): StreamDeletedErrorDetails;
static deserializeBinaryFromReader(message: StreamDeletedErrorDetails, reader: jspb.BinaryReader): StreamDeletedErrorDetails;
}

export namespace StreamDeletedErrorDetails {
export type AsObject = {
stream: string,
}
}

export class StreamTombstonedErrorDetails extends jspb.Message {
getStream(): string;
setStream(value: string): StreamTombstonedErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): StreamTombstonedErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: StreamTombstonedErrorDetails): StreamTombstonedErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: StreamTombstonedErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): StreamTombstonedErrorDetails;
static deserializeBinaryFromReader(message: StreamTombstonedErrorDetails, reader: jspb.BinaryReader): StreamTombstonedErrorDetails;
}

export namespace StreamTombstonedErrorDetails {
export type AsObject = {
stream: string,
}
}

export class StreamRevisionConflictErrorDetails extends jspb.Message {
getStream(): string;
setStream(value: string): StreamRevisionConflictErrorDetails;
getExpectedRevision(): string;
setExpectedRevision(value: string): StreamRevisionConflictErrorDetails;
getActualRevision(): string;
setActualRevision(value: string): StreamRevisionConflictErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): StreamRevisionConflictErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: StreamRevisionConflictErrorDetails): StreamRevisionConflictErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: StreamRevisionConflictErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): StreamRevisionConflictErrorDetails;
static deserializeBinaryFromReader(message: StreamRevisionConflictErrorDetails, reader: jspb.BinaryReader): StreamRevisionConflictErrorDetails;
}

export namespace StreamRevisionConflictErrorDetails {
export type AsObject = {
stream: string,
expectedRevision: string,
actualRevision: string,
}
}

export class AppendRecordSizeExceededErrorDetails extends jspb.Message {
getStream(): string;
setStream(value: string): AppendRecordSizeExceededErrorDetails;
getRecordId(): string;
setRecordId(value: string): AppendRecordSizeExceededErrorDetails;
getSize(): number;
setSize(value: number): AppendRecordSizeExceededErrorDetails;
getMaxSize(): number;
setMaxSize(value: number): AppendRecordSizeExceededErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): AppendRecordSizeExceededErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: AppendRecordSizeExceededErrorDetails): AppendRecordSizeExceededErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: AppendRecordSizeExceededErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): AppendRecordSizeExceededErrorDetails;
static deserializeBinaryFromReader(message: AppendRecordSizeExceededErrorDetails, reader: jspb.BinaryReader): AppendRecordSizeExceededErrorDetails;
}

export namespace AppendRecordSizeExceededErrorDetails {
export type AsObject = {
stream: string,
recordId: string,
size: number,
maxSize: number,
}
}

export class AppendTransactionSizeExceededErrorDetails extends jspb.Message {
getSize(): number;
setSize(value: number): AppendTransactionSizeExceededErrorDetails;
getMaxSize(): number;
setMaxSize(value: number): AppendTransactionSizeExceededErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): AppendTransactionSizeExceededErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: AppendTransactionSizeExceededErrorDetails): AppendTransactionSizeExceededErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: AppendTransactionSizeExceededErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): AppendTransactionSizeExceededErrorDetails;
static deserializeBinaryFromReader(message: AppendTransactionSizeExceededErrorDetails, reader: jspb.BinaryReader): AppendTransactionSizeExceededErrorDetails;
}

export namespace AppendTransactionSizeExceededErrorDetails {
export type AsObject = {
size: number,
maxSize: number,
}
}

export class StreamAlreadyInAppendSessionErrorDetails extends jspb.Message {
getStream(): string;
setStream(value: string): StreamAlreadyInAppendSessionErrorDetails;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): StreamAlreadyInAppendSessionErrorDetails.AsObject;
static toObject(includeInstance: boolean, msg: StreamAlreadyInAppendSessionErrorDetails): StreamAlreadyInAppendSessionErrorDetails.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: StreamAlreadyInAppendSessionErrorDetails, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): StreamAlreadyInAppendSessionErrorDetails;
static deserializeBinaryFromReader(message: StreamAlreadyInAppendSessionErrorDetails, reader: jspb.BinaryReader): StreamAlreadyInAppendSessionErrorDetails;
}

export namespace StreamAlreadyInAppendSessionErrorDetails {
export type AsObject = {
stream: string,
}
}

export enum StreamsError {
STREAMS_ERROR_UNSPECIFIED = 0,
STREAMS_ERROR_STREAM_NOT_FOUND = 1,
STREAMS_ERROR_STREAM_ALREADY_EXISTS = 2,
STREAMS_ERROR_STREAM_DELETED = 3,
STREAMS_ERROR_STREAM_TOMBSTONED = 4,
STREAMS_ERROR_STREAM_REVISION_CONFLICT = 5,
STREAMS_ERROR_APPEND_RECORD_SIZE_EXCEEDED = 6,
STREAMS_ERROR_APPEND_TRANSACTION_SIZE_EXCEEDED = 7,
STREAMS_ERROR_STREAM_ALREADY_IN_APPEND_SESSION = 8,
STREAMS_ERROR_APPEND_SESSION_NO_REQUESTS = 9,
}
Loading
Loading