Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified docs/server/ongoing-tasks/queue-sink/assets/info-hub.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file not shown.
638 changes: 638 additions & 0 deletions docs/server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx

Large diffs are not rendered by default.

273 changes: 181 additions & 92 deletions docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ sidebar_position: 1
---

import Admonition from '@theme/Admonition';
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import CodeBlock from '@theme/CodeBlock';
import Panel from "@site/src/components/Panel";
import ContentFrame from "@site/src/components/ContentFrame";
Expand All @@ -26,24 +28,26 @@ import ContentFrame from "@site/src/components/ContentFrame";
of enqueued JSON formatted messages from Kafka topics, construct documents using
user-defined scripts, and store the documents in RavenDB collections.

* In this page:
* In this article:
* [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task)
* [Connecting a Kafka broker](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connecting-a-kafka-broker)
* [Retrieving enqueued messages from selected Kafka topics](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#retrieving-enqueued-messages-from-selected-kafka-topics)
* [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#running-user-defined-scripts)
* [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#storing-documents-in-ravendb-collections)
* [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api)
* [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string)
* [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task)
* [Adding a Kafka connection string](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#adding-a-kafka-connection-string)
* [Adding a Kafka sink task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#adding-a-kafka-sink-task)
* [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options)
* [Syntax](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#syntax)
* [Classes](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#classes)

</Admonition>

<Panel heading="The Queue Sink Task">

<ContentFrame>

## Connecting a Kafka broker
### Connecting a Kafka broker

Users of RavenDB 6.0 and on can create an ongoing Sink task that connects
a Kafka broker, retrieves enqueued messages from selected Kafka topics,
Expand All @@ -54,7 +58,7 @@ In the message broker architecture, RavenDB sinks take the role of data consumer
A sink would connect a Kafka broker using a connection string, and retrieve messages
from the broker's Topics.

Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string)
Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#adding-a-kafka-connection-string)
about adding a connection string via API.
Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task)
about adding a connection string using Studio.
Expand All @@ -74,11 +78,9 @@ e.g. from node `A` to node `B` as a result of node `A` down time:

</ContentFrame>

---

<ContentFrame>

## Retrieving enqueued messages from selected Kafka topics
### Retrieving enqueued messages from selected Kafka topics

<Admonition type="note" title="">
While a queue sink task is running, it prevents the host database from being unloaded due to idle operations.
Expand All @@ -91,11 +93,9 @@ up the queue until it reaches its head and can be consumed by RavenDB's sink.

</ContentFrame>

---

<ContentFrame>

## Running user-defined scripts
### Running user-defined scripts

A sink task's script is a JavaScript segment. Its basic role is to retrieve
selected Kafka messages or message properties, and construct documents that
Expand Down Expand Up @@ -158,11 +158,9 @@ and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript

</ContentFrame>

---

<ContentFrame>

## Storing documents in RavenDB collections
### Storing documents in RavenDB collections

The sink task consumes batches of queued messages and stores them in RavenDB
in a transactional manner, processing either the entire batch or none of it.
Expand Down Expand Up @@ -198,22 +196,14 @@ one copy of it will remain.

<Panel heading="Client API">

<ContentFrame>

## Add a Kafka Connection String
### Adding a Kafka connection string

Prior to defining a Kafka sink task, add a **Kafka connection string**
that the task will use to connect the message broker's bootstrap servers.

To create the connection string:

* Create a `QueueConnectionString` instance with the connection string configuration.
Pass it to the `PutConnectionStringOperation` store operation to add the connection string.

`QueueConnectionString`:
Before defining a sink task, add a Kafka connection string for the task to use.
Create a `QueueConnectionString` object configured for Kafka, and pass it to
`PutConnectionStringOperation`.

```csharp
// Add Kafka connection string
// Add a Kafka connection string
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
Expand All @@ -227,77 +217,27 @@ var res = store.Maintenance.Send(

<br />

`QueueBrokerType`:

```csharp
public enum QueueBrokerType
{
None,
Kafka,
RabbitMq
}
```

<br />

| Property | Type | Description |
|---|---|---|
| **Name** | `string` | Connection string name |
| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string |
| **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers |

</ContentFrame>
For the full property reference, see the
[Connection string](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connection-string)
class in the Syntax section.

---

<ContentFrame>

## Add a Kafka Sink Task

To create the Sink task:

* Create `QueueSinkScript` instances to define scripts with which the
task can process retrieved messages, apply JavaScript commands, construct
documents and store them in RavenDB.
### Adding a Kafka sink task

```csharp
// Define a Sink script
QueueSinkScript queueSinkScript = new QueueSinkScript
{
// Script name
Name = "orders",
// A list of Kafka topics to connect
Queues = new List<string>() { "orders" },
// Apply this script
Script = @"this['@metadata']['@collection'] = 'Orders';
put(this.Id.toString(), this)"
};
```
To define the sink task, prepare a `QueueSinkConfiguration` object and pass it to
`AddQueueSinkOperation`.
The configuration needs to name the connection string to use, set the broker type to Kafka,
and hold one or more `QueueSinkScript` objects.

<br />

* Prepare a `QueueSinkConfiguration` object with the sink task configuration.
* Each script lists the Kafka topics to consume from in its `Queues` property, where each
entry is a topic name.
* Each script's `Script` property holds the JavaScript that turns the consumed messages into
RavenDB documents.

`QueueSinkConfiguration` properties:

| Property | Type | Description |
|---|---|---|
| **Name** | `string` | The sink task name |
| **ConnectionStringName** | `string` | The registered connection string name |
| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task |
| **Scripts** | `List<QueueSinkScript>` | A list of scripts |

* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task.

`QueueSinkScript` properties:

| Property | Type | Description |
|---|---|---|
| **Name** | `string` | Script name |
| **Queues** | `List<string>` | A list of Kafka topics to consume messages from |
| **Script** | `string` | The script contents |
<ContentFrame>

#### Example:
#### Example: Adding a Kafka sink task

```csharp
// Add Kafka connection string
Expand All @@ -316,7 +256,7 @@ QueueSinkScript queueSinkScript = new QueueSinkScript
{
// Script name
Name = "orders",
// A list of Kafka topics to connect
// A list of Kafka topics to consume from
Queues = new List<string>() { "orders" },
// Apply this script
Script = @"this['@metadata']['@collection'] = 'Orders';
Expand All @@ -342,6 +282,10 @@ AddQueueSinkOperationResult addQueueSinkOperationResult =

</ContentFrame>

For the `QueueSinkConfiguration` and `QueueSinkScript` property reference, see the
[Sink task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#sink-task)
classes in the Syntax section.

</Panel>

<Panel heading="Configuration Options">
Expand All @@ -355,3 +299,148 @@ Use these configuration options to gain more control over queue sink tasks.
mode (i.e. suspending the process) after a connection failure.

</Panel>

<Panel heading="Syntax">

## Classes

### Connection string

<Tabs>
<TabItem value="QueueConnectionString" label="QueueConnectionString" default>

**The connection string a sink task uses to reach a Kafka broker.**

```csharp
class QueueConnectionString
{
string Name
QueueBrokerType BrokerType
KafkaConnectionSettings KafkaConnectionSettings
}
```
<br />

| Property | Type | Description |
|---|---|---|
| **Name** | `string` | The connection string name |
| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string |
| **KafkaConnectionSettings** | `KafkaConnectionSettings` | The broker's bootstrap servers and connection options |

`QueueConnectionString` is shared by all queue brokers, so it also defines settings for the other
broker types (`RabbitMqConnectionSettings`, `AzureQueueStorageConnectionSettings`,
`AmazonSqsConnectionSettings`, `AzureServiceBusConnectionSettings`).
Set only the one matching `BrokerType`.

</TabItem>
<TabItem value="KafkaConnectionSettings" label="KafkaConnectionSettings">

**The connection details for a Kafka broker.**

```csharp
class KafkaConnectionSettings
{
string BootstrapServers
Dictionary<string, string> ConnectionOptions
bool UseRavenCertificate
}
```
<br />

| Property | Type | Description |
|---|---|---|
| **BootstrapServers** | `string` | A comma-separated list of the broker's bootstrap servers, in `host:port` form |
| **ConnectionOptions** | `Dictionary<string, string>` | Additional Kafka client configuration options, as key-value pairs |
| **UseRavenCertificate** | `bool` | Whether to authenticate the connection with RavenDB's own certificate |

</TabItem>
</Tabs>

### Sink task

<Tabs>
<TabItem value="QueueSinkConfiguration" label="QueueSinkConfiguration" default>

**The configuration of a Kafka sink task.**

```csharp
class QueueSinkConfiguration
{
string Name
string ConnectionStringName
QueueBrokerType BrokerType
List<QueueSinkScript> Scripts
bool Disabled
string MentorNode
bool PinToMentorNode
long TaskId
}
```
<br />

| Property | Type | Description |
|---|---|---|
| **Name** | `string` | The sink task name |
| **ConnectionStringName** | `string` | The name of the Kafka connection string the task uses |
| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` (must match the connection string's broker type) |
| **Scripts** | `List<QueueSinkScript>` | The scripts the task runs |
| **Disabled** | `bool` | Whether the task is created in a disabled state |
| **MentorNode** | `string` | The preferred responsible node for the task, if any |
| **PinToMentorNode** | `bool` | Whether to pin the task to its mentor node |
| **TaskId** | `long` | The task's identifier, assigned by the server |

</TabItem>
<TabItem value="QueueSinkScript" label="QueueSinkScript">

**A script that turns the messages from its topics into RavenDB documents.**

```csharp
class QueueSinkScript
{
string Name
List<string> Queues
string Script
bool Disabled
}
```
<br />

| Property | Type | Description |
|---|---|---|
| **Name** | `string` | The script name |
| **Queues** | `List<string>` | The Kafka topics to consume from |
| **Script** | `string` | The JavaScript that processes each message |
| **Disabled** | `bool` | Whether the script is disabled |

</TabItem>
</Tabs>

### Enums

<Tabs>
<TabItem value="QueueBrokerType" label="QueueBrokerType" default>

**Identifies the message broker that a connection string and sink task use.**

```csharp
enum QueueBrokerType
{
None,
Kafka,
RabbitMq,
AzureQueueStorage,
AmazonSqs,
AzureServiceBus
}
```
<br />

| Value | Description |
|---|---|
| **Kafka** | Selects Apache Kafka. Use this value for a Kafka connection string and sink task. |
| **None, RabbitMq, AzureQueueStorage, AmazonSqs, AzureServiceBus** | The other broker types, used by the remaining Queue Sink and Queue ETL brokers. |

</TabItem>
</Tabs>

</Panel>
Loading
Loading