diff --git a/docs/server/ongoing-tasks/queue-sink/assets/info-hub.png b/docs/server/ongoing-tasks/queue-sink/assets/info-hub.png index 3682a6d7e8..12e923a92c 100644 Binary files a/docs/server/ongoing-tasks/queue-sink/assets/info-hub.png and b/docs/server/ongoing-tasks/queue-sink/assets/info-hub.png differ diff --git a/docs/server/ongoing-tasks/queue-sink/assets/overview_ongoing-tasks.png b/docs/server/ongoing-tasks/queue-sink/assets/overview_ongoing-tasks.png index 277fb6a976..f32eb3d8ae 100644 Binary files a/docs/server/ongoing-tasks/queue-sink/assets/overview_ongoing-tasks.png and b/docs/server/ongoing-tasks/queue-sink/assets/overview_ongoing-tasks.png differ diff --git a/docs/server/ongoing-tasks/queue-sink/assets/overview_task-selection.png b/docs/server/ongoing-tasks/queue-sink/assets/overview_task-selection.png index 167215bc88..4d45de985a 100644 Binary files a/docs/server/ongoing-tasks/queue-sink/assets/overview_task-selection.png and b/docs/server/ongoing-tasks/queue-sink/assets/overview_task-selection.png differ diff --git a/docs/server/ongoing-tasks/queue-sink/assets/snagit/overview_ongoing-tasks.snagx b/docs/server/ongoing-tasks/queue-sink/assets/snagit/overview_ongoing-tasks.snagx new file mode 100644 index 0000000000..f33762b213 Binary files /dev/null and b/docs/server/ongoing-tasks/queue-sink/assets/snagit/overview_ongoing-tasks.snagx differ diff --git a/docs/server/ongoing-tasks/queue-sink/assets/snagit/overview_task-selection.snagx b/docs/server/ongoing-tasks/queue-sink/assets/snagit/overview_task-selection.snagx new file mode 100644 index 0000000000..a32ce3767b Binary files /dev/null and b/docs/server/ongoing-tasks/queue-sink/assets/snagit/overview_task-selection.snagx differ diff --git a/docs/server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx b/docs/server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx new file mode 100644 index 0000000000..6087adc4be --- /dev/null +++ b/docs/server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx @@ -0,0 +1,638 @@ +--- +title: "Queue Sink: Azure Service Bus" +sidebar_label: "Azure Service Bus Queue Sink" +description: "Consume messages from Azure Service Bus queues and topic subscriptions and write them into RavenDB documents using queue sink tasks with JavaScript processing scripts." +sidebar_position: 3 +--- + +import Admonition from '@theme/Admonition'; +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; +import CodeBlock from '@theme/CodeBlock'; +import Panel from "@site/src/components/Panel"; +import ContentFrame from "@site/src/components/ContentFrame"; + +# Queue Sink: Azure Service Bus + + + +* **Azure Service Bus** is Microsoft Azure's fully managed message broker. + The broker carries messages between applications (like microservices, + or a web app and its background workers) through queues or topics + with subscriptions. + +* RavenDB can consume messages from Azure Service Bus by running an ongoing + **sink task**. + The task reads JSON-formatted messages from a queue or a topic subscription, + runs a user-defined script over each message, and stores the documents the + script produces in RavenDB collections. + +* The sink task connects to an Azure Service Bus + [namespace](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#namespaces) + using one of three authentication methods: + * a SAS (Shared Access Signature) connection string + * Entra ID application credentials + * passwordless authentication (using a managed identity) + +* This page explains how to define an Azure Service Bus sink task using the client API. + To configure this task in Studio, see the + [Azure Service Bus Queue Sink Task](../../../studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx) page. + To learn about RavenDB queue sinks in general, see the + [Queue Sink overview](../../../server/ongoing-tasks/queue-sink/overview.mdx). + +* In this article: + * [The Azure Service Bus Sink Task](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#the-azure-service-bus-sink-task) + * [Connecting an Azure Service Bus namespace](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#connecting-an-azure-service-bus-namespace) + * [Selecting message sources](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#selecting-message-sources) + * [Retrieving enqueued messages](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#retrieving-enqueued-messages) + * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#running-user-defined-scripts) + * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#storing-documents-in-ravendb-collections) + * [Client API](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#client-api) + * [Adding an Azure Service Bus connection string](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#adding-an-azure-service-bus-connection-string) + * [Adding an Azure Service Bus sink task](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#adding-an-azure-service-bus-sink-task) + * [Configuration Options](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#configuration-options) + * [Syntax](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#syntax) + * [Methods](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#methods) + * [Classes](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#classes) + + + + + + + +### Connecting an Azure Service Bus namespace + +A RavenDB sink task is a **consumer** in the Azure Service Bus architecture, reading +the messages that producer applications place on the broker. + +The task connects to an Azure Service Bus namespace using a connection string that you +register with RavenDB. The connection string identifies the namespace and holds the +credentials that the task authenticates with. + +Read [below](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#adding-an-azure-service-bus-connection-string) +about adding a connection string via the client API. + + + + + +### Selecting message sources + +Each sink script lists one or more **sources**. + +A source is either: + +* a **queue**, identified by its name, or +* a **topic subscription**, identified by its topic and subscription names. + +One script can read from several sources, and a sink task can run several scripts, so a +single task can consume from any mix of queues and subscriptions in the same namespace. + +Learn [below](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#adding-an-azure-service-bus-sink-task) +how to specify each source via the client API. + + + + + +### Retrieving enqueued messages + + + +A running sink task prevents RavenDB from unloading its host database due to inactivity, +so consumption from Azure Service Bus continues uninterrupted even when the database +would otherwise go idle. + + + +A producer places a message on a queue or publishes it to a topic, where it waits to be +consumed. +The sink task receives the messages available on each of its sources and hands +each message to a script. + + + + + +### Running user-defined scripts + +A sink script is a JavaScript segment that turns each consumed message into one or +more RavenDB documents. + +The simplest script stores the message as it arrives, using the `put` command: + +```javascript +// Set the @collection metadata to store the document in a collection; +// without it, the document is stored in the @empty collection. +this['@metadata']['@collection'] = 'Orders'; + +// Store the message as-is, reusing its Id property as the document ID. +put(this.Id.toString(), this); +``` +
+ +A script can also reshape a message into a different document. +e.g., for a message like this one: + +```json +{ + "Id": 13, + "FirstName": "John", + "LastName": "Doe" +} +``` +
+ +the following script adds a `FullName` field and stores the document in the `Users` +collection: + +```javascript +var item = { + Id: this.Id, + FirstName: this.FirstName, + LastName: this.LastName, + FullName: this.FirstName + ' ' + this.LastName, + "@metadata": { + "@collection": "Users" + } +}; + +// Pass the Id as a string, even when Azure Service Bus delivers it as a number. +put(this.Id.toString(), item); +``` +
+ +Beyond `put`, a script can call `load` to read an existing RavenDB document +(e.g., to enrich a message with related data), `del` to delete a document, and +[various other commands](../../../server/kb/javascript-engine.mdx#predefined-javascript-functions). + +
+ + + +### Storing documents in RavenDB collections + +The sink task processes a batch of messages and stores the resulting documents in a single transaction, +either the whole batch or none of it. + + + +Some script processing errors are allowed. When such an error occurs, RavenDB skips the +affected message, logs the event, and raises an alert, but continues processing the rest +of the batch. + + + +Only after the batch is stored does the task acknowledge its messages to Azure Service +Bus, which then removes them from the queue or subscription. +Because the task acknowledges a message only after its document is persisted, every message +is processed **at least once**, even if the sink fails partway through a batch. + +The number of messages in a batch is +[configurable](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#configuration-options). + + + +At-least-once processing means the same message can be delivered more than once. This +happens when a producer enqueues a message more than once, or when a batch takes longer +than the message's **lock duration** (the time Azure Service Bus reserves a message for +the sink): once the lock expires, Azure Service Bus redelivers the message. + +If processing each message only once matters to the consumer, it is the consumer's +responsibility to handle the duplicates. Often this needs no extra work: as long as a +message keeps the same `Id`, the script's `put(id, ...)` command overwrites the earlier +document, so only one copy remains. + + + + + +
+ + + +### Adding an Azure Service Bus connection string + +Before defining a sink task, add an Azure Service Bus connection string for the task to use. +Create a `QueueConnectionString` object configured for Azure Service Bus, and pass it to +`PutConnectionStringOperation`. + +An Azure Service Bus connection authenticates in one of three modes: a SAS connection string, +Entra ID application credentials, or passwordless authentication. +Set exactly one of them in the connection string's `AzureServiceBusConnectionSettings` object: + + + + +```csharp +var connectionString = new QueueConnectionString +{ + Name = "AzureServiceBusConStr", + BrokerType = QueueBrokerType.AzureServiceBus, + AzureServiceBusConnectionSettings = new AzureServiceBusConnectionSettings + { + // A Service Bus SAS connection string, copied from the Azure portal + ConnectionString = "Endpoint=sb://.servicebus.windows.net/;" + + "SharedAccessKeyName=;SharedAccessKey=" + } +}; + +store.Maintenance.Send( + new PutConnectionStringOperation(connectionString)); +``` + + + + +```csharp +var connectionString = new QueueConnectionString +{ + Name = "AzureServiceBusConStr", + BrokerType = QueueBrokerType.AzureServiceBus, + AzureServiceBusConnectionSettings = new AzureServiceBusConnectionSettings + { + // Authenticate using an Entra ID application registration (tenant, client ID, and secret) + EntraId = new AzureServiceBusEntraId + { + Namespace = ".servicebus.windows.net", + TenantId = "", + ClientId = "", + ClientSecret = "" + } + } +}; + +store.Maintenance.Send( + new PutConnectionStringOperation(connectionString)); +``` + + + + +```csharp +var connectionString = new QueueConnectionString +{ + Name = "AzureServiceBusConStr", + BrokerType = QueueBrokerType.AzureServiceBus, + AzureServiceBusConnectionSettings = new AzureServiceBusConnectionSettings + { + // Authenticate using the host's managed identity; no secret is stored + Passwordless = new AzureServiceBusPasswordless + { + Namespace = ".servicebus.windows.net" + } + } +}; + +store.Maintenance.Send( + new PutConnectionStringOperation(connectionString)); +``` + + + + +For the full property reference, see the +[Connection string](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#connection-string) +and +[Authentication credentials](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#authentication-credentials) +classes in the Syntax section. + +--- + +### Adding an Azure Service Bus sink task + +To define the sink task, prepare a `QueueSinkConfiguration` object and pass it to +`AddQueueSinkOperation`. +The configuration needs to name the connection string to use, set the broker type to Azure +Service Bus, and hold one or more `QueueSinkScript` objects. + +* Each script lists its sources in the `Queues` property, where each entry is a single string: + * A **queue** source is just the queue name. + e.g., `orders` + * A **topic-subscription** source is the topic name and the subscription name joined by a semicolon. + e.g., `orders-topic;ravendb-sub` + The semicolon is an unambiguous separator: Service Bus queue, topic, and subscription names can + never contain a semicolon, so it cannot be mistaken for part of either name. +* `AzureServiceBusSinkSource` is a helper class that builds and validates these entries, so you don't + have to build the `topicName;subscriptionName` form yourself and can't pass an invalid name: + * Use `AzureServiceBusSinkSource.Queue(queueName)` for a queue source. + * Use `AzureServiceBusSinkSource.Subscription(topicName, subscriptionName)` for a topic-subscription source. + + + +#### Example: Reading from a queue + +```csharp +// Read from a single Service Bus queue +var script = new QueueSinkScript +{ + Name = "orders", + Queues = new List { AzureServiceBusSinkSource.Queue("orders") }, + // Store each message as an Orders document + Script = @"this['@metadata']['@collection'] = 'Orders'; + put(this.Id.toString(), this)" +}; + +var config = new QueueSinkConfiguration +{ + Name = "AzureServiceBusSink", + // The connection string added above + ConnectionStringName = "AzureServiceBusConStr", + BrokerType = QueueBrokerType.AzureServiceBus, + Scripts = { script } +}; + +store.Maintenance.Send(new AddQueueSinkOperation(config)); +``` + + + + + +#### Example: Reading from multiple sources + +A script can list any mix of queues and subscriptions in its `Queues` property: + +```csharp +Queues = new List +{ + AzureServiceBusSinkSource.Queue("orders"), + AzureServiceBusSinkSource.Subscription("orders-topic", "ravendb-sub") +} +``` + + + +For the `QueueSinkConfiguration` and `QueueSinkScript` property reference, see the +[Sink task](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#sink-task) +classes in the Syntax section. + + + + + +Use these configuration options for finer control over the sink task: + +* [QueueSink.MaxBatchSize](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxbatchsize) + The maximum number of messages consumed in a single batch. +* [QueueSink.MaxFallbackTimeInSec](../../../server/configuration/queue-sink-configuration.mdx#queuesinkmaxfallbacktimeinsec) + The maximum time, in seconds, that the sink stays in fallback mode (suspended) after a connection failure. + + + + + +## Methods + +The `AzureServiceBusSinkSource` methods build and validate the entries stored in a script's `Queues` list. + + + + +Validates a queue name and returns it as a sink source entry. + +```csharp +public static string Queue(string queueName) +``` +
+ +**Usage:** + +```csharp +AzureServiceBusSinkSource.Queue("orders") +``` +
+ +**Parameters:** + +| Parameter | Type | Description | +|---|---|---| +| **queueName** | `string` | The Service Bus queue to consume from | + +**Return value:** + +| Type | Description | +|---|---| +| `string` | The source entry to add to `QueueSinkScript.Queues` | + +
+ + +Joins a topic name and a subscription name into a sink source entry, `topicName;subscriptionName`. + +```csharp +public static string Subscription(string topicName, string subscriptionName) +``` +
+ +**Usage:** + +```csharp +AzureServiceBusSinkSource.Subscription("orders-topic", "ravendb-sub") +``` +
+ +**Parameters:** + +| Parameter | Type | Description | +|---|---|---| +| **topicName** | `string` | The Service Bus topic | +| **subscriptionName** | `string` | The subscription on that topic to consume from | + +**Return value:** + +| Type | Description | +|---|---| +| `string` | The `topicName;subscriptionName` source entry to add to `QueueSinkScript.Queues` | + +
+
+ +## Classes + +### Connection string + + + + +**The connection string a sink task uses to reach an Azure Service Bus namespace.** + +```csharp +class QueueConnectionString +{ + string Name + QueueBrokerType BrokerType + AzureServiceBusConnectionSettings AzureServiceBusConnectionSettings +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.AzureServiceBus` for an Azure Service Bus connection string | +| **AzureServiceBusConnectionSettings** | `AzureServiceBusConnectionSettings` | The namespace and authentication details | + +`QueueConnectionString` is shared by all queue brokers, so it also defines settings for the other +broker types (`KafkaConnectionSettings`, `RabbitMqConnectionSettings`, +`AzureQueueStorageConnectionSettings`, `AmazonSqsConnectionSettings`). +Set only the one matching `BrokerType`. + +
+ + +**The authentication details for an Azure Service Bus connection. Set exactly one of its properties.** + +```csharp +class AzureServiceBusConnectionSettings +{ + string ConnectionString + AzureServiceBusEntraId EntraId + AzureServiceBusPasswordless Passwordless +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **ConnectionString** | `string` | A Service Bus SAS connection string | +| **EntraId** | `AzureServiceBusEntraId` | Entra ID application credentials | +| **Passwordless** | `AzureServiceBusPasswordless` | A managed-identity connection that stores no secret | + +
+
+ +### Authentication credentials + + + + +**Entra ID application credentials for connecting to a namespace.** + +```csharp +class AzureServiceBusEntraId +{ + string Namespace + string TenantId + string ClientId + string ClientSecret +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Namespace** | `string` | The fully qualified namespace, e.g. `mynamespace.servicebus.windows.net` | +| **TenantId** | `string` | The Entra ID tenant ID | +| **ClientId** | `string` | The application (client) ID | +| **ClientSecret** | `string` | The application's client secret | + +
+ + +**A managed-identity connection to a namespace; stores no secret.** + +```csharp +class AzureServiceBusPasswordless +{ + string Namespace +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Namespace** | `string` | The fully qualified namespace, e.g. `mynamespace.servicebus.windows.net` | + +
+
+ +### Sink task + + + + +**The configuration of an Azure Service Bus sink task.** + +```csharp +class QueueSinkConfiguration +{ + string Name + string ConnectionStringName + QueueBrokerType BrokerType + List Scripts + bool Disabled + string MentorNode + bool PinToMentorNode + long TaskId +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The name of the Azure Service Bus connection string the task uses | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.AzureServiceBus` (must match the connection string's broker type) | +| **Scripts** | `List` | The scripts the task runs | +| **Disabled** | `bool` | Whether the task is created in a disabled state | +| **MentorNode** | `string` | The preferred responsible node for the task, if any | +| **PinToMentorNode** | `bool` | Whether to pin the task to its mentor node | +| **TaskId** | `long` | The task's identifier, assigned by the server | + +
+ + +**A script that turns the messages from its sources into RavenDB documents.** + +```csharp +class QueueSinkScript +{ + string Name + List Queues + string Script + bool Disabled +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The script name | +| **Queues** | `List` | The sources to consume from, each built with `AzureServiceBusSinkSource` | +| **Script** | `string` | The JavaScript that processes each message | +| **Disabled** | `bool` | Whether the script is disabled | + +
+
+ +### Enums + + + + +**Identifies the message broker that a connection string and sink task use.** + +```csharp +enum QueueBrokerType +{ + None, + Kafka, + RabbitMq, + AzureQueueStorage, + AmazonSqs, + AzureServiceBus +} +``` +
+ +| Value | Description | +|---|---| +| **AzureServiceBus** | Selects Azure Service Bus. Use this value for an Azure Service Bus connection string and sink task. | +| **None, Kafka, RabbitMq, AzureQueueStorage, AmazonSqs** | The other broker types, used by the remaining Queue Sink and Queue ETL brokers. | + +
+
+ +
diff --git a/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx b/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx index 11b6df492b..01f611aeda 100644 --- a/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx +++ b/docs/server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx @@ -6,6 +6,8 @@ sidebar_position: 1 --- import Admonition from '@theme/Admonition'; +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; import Panel from "@site/src/components/Panel"; import ContentFrame from "@site/src/components/ContentFrame"; @@ -26,16 +28,18 @@ import ContentFrame from "@site/src/components/ContentFrame"; of enqueued JSON formatted messages from Kafka topics, construct documents using user-defined scripts, and store the documents in RavenDB collections. -* In this page: +* In this article: * [The Queue Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#the-queue-sink-task) * [Connecting a Kafka broker](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connecting-a-kafka-broker) * [Retrieving enqueued messages from selected Kafka topics](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#retrieving-enqueued-messages-from-selected-kafka-topics) * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#running-user-defined-scripts) * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#storing-documents-in-ravendb-collections) * [Client API](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#client-api) - * [Add a Kafka Connection String](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) - * [Add a Kafka Sink Task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-sink-task) + * [Adding a Kafka connection string](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#adding-a-kafka-connection-string) + * [Adding a Kafka sink task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#adding-a-kafka-sink-task) * [Configuration Options](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#configuration-options) + * [Syntax](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#syntax) + * [Classes](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#classes) @@ -43,7 +47,7 @@ import ContentFrame from "@site/src/components/ContentFrame"; -## Connecting a Kafka broker +### Connecting a Kafka broker Users of RavenDB 6.0 and on can create an ongoing Sink task that connects a Kafka broker, retrieves enqueued messages from selected Kafka topics, @@ -54,7 +58,7 @@ In the message broker architecture, RavenDB sinks take the role of data consumer A sink would connect a Kafka broker using a connection string, and retrieve messages from the broker's Topics. -Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#add-a-kafka-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#adding-a-kafka-connection-string) about adding a connection string via API. Read [here](../../../studio/database/tasks/ongoing-tasks/kafka-queue-sink.mdx#define-a-kafka-sink-task) about adding a connection string using Studio. @@ -74,11 +78,9 @@ e.g. from node `A` to node `B` as a result of node `A` down time: ---- - -## Retrieving enqueued messages from selected Kafka topics +### Retrieving enqueued messages from selected Kafka topics While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. @@ -91,11 +93,9 @@ up the queue until it reaches its head and can be consumed by RavenDB's sink. ---- - -## Running user-defined scripts +### Running user-defined scripts A sink task's script is a JavaScript segment. Its basic role is to retrieve selected Kafka messages or message properties, and construct documents that @@ -158,11 +158,9 @@ and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript ---- - -## Storing documents in RavenDB collections +### Storing documents in RavenDB collections The sink task consumes batches of queued messages and stores them in RavenDB in a transactional manner, processing either the entire batch or none of it. @@ -198,22 +196,14 @@ one copy of it will remain. - - -## Add a Kafka Connection String +### Adding a Kafka connection string -Prior to defining a Kafka sink task, add a **Kafka connection string** -that the task will use to connect the message broker's bootstrap servers. - -To create the connection string: - -* Create a `QueueConnectionString` instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. - -`QueueConnectionString`: +Before defining a sink task, add a Kafka connection string for the task to use. +Create a `QueueConnectionString` object configured for Kafka, and pass it to +`PutConnectionStringOperation`. ```csharp -// Add Kafka connection string +// Add a Kafka connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString @@ -227,77 +217,27 @@ var res = store.Maintenance.Send(
-`QueueBrokerType`: - -```csharp -public enum QueueBrokerType -{ - None, - Kafka, - RabbitMq -} -``` - -
- -| Property | Type | Description | -|---|---|---| -| **Name** | `string` | Connection string name | -| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | -| **KafkaConnectionSettings** | `KafkaConnectionSettings[]` | A list of comma-separated host:port URLs to Kafka brokers | - -
+For the full property reference, see the +[Connection string](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#connection-string) +class in the Syntax section. --- - - -## Add a Kafka Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. +### Adding a Kafka sink task -```csharp -// Define a Sink script -QueueSinkScript queueSinkScript = new QueueSinkScript -{ - // Script name - Name = "orders", - // A list of Kafka topics to connect - Queues = new List() { "orders" }, - // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; - put(this.Id.toString(), this)" -}; -``` +To define the sink task, prepare a `QueueSinkConfiguration` object and pass it to +`AddQueueSinkOperation`. +The configuration needs to name the connection string to use, set the broker type to Kafka, +and hold one or more `QueueSinkScript` objects. -
- -* Prepare a `QueueSinkConfiguration` object with the sink task configuration. +* Each script lists the Kafka topics to consume from in its `Queues` property, where each + entry is a topic name. +* Each script's `Script` property holds the JavaScript that turns the consumed messages into + RavenDB documents. -`QueueSinkConfiguration` properties: - -| Property | Type | Description | -|---|---|---| -| **Name** | `string` | The sink task name | -| **ConnectionStringName** | `string` | The registered connection string name | -| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` to define a Kafka sink task | -| **Scripts** | `List` | A list of scripts | - -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - -`QueueSinkScript` properties: - -| Property | Type | Description | -|---|---|---| -| **Name** | `string` | Script name | -| **Queues** | `List` | A list of Kafka topics to consume messages from | -| **Script** | `string` | The script contents | + -#### Example: +#### Example: Adding a Kafka sink task ```csharp // Add Kafka connection string @@ -316,7 +256,7 @@ QueueSinkScript queueSinkScript = new QueueSinkScript { // Script name Name = "orders", - // A list of Kafka topics to connect + // A list of Kafka topics to consume from Queues = new List() { "orders" }, // Apply this script Script = @"this['@metadata']['@collection'] = 'Orders'; @@ -342,6 +282,10 @@ AddQueueSinkOperationResult addQueueSinkOperationResult = +For the `QueueSinkConfiguration` and `QueueSinkScript` property reference, see the +[Sink task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx#sink-task) +classes in the Syntax section. +
@@ -355,3 +299,148 @@ Use these configuration options to gain more control over queue sink tasks. mode (i.e. suspending the process) after a connection failure. + + + +## Classes + +### Connection string + + + + +**The connection string a sink task uses to reach a Kafka broker.** + +```csharp +class QueueConnectionString +{ + string Name + QueueBrokerType BrokerType + KafkaConnectionSettings KafkaConnectionSettings +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` for a Kafka connection string | +| **KafkaConnectionSettings** | `KafkaConnectionSettings` | The broker's bootstrap servers and connection options | + +`QueueConnectionString` is shared by all queue brokers, so it also defines settings for the other +broker types (`RabbitMqConnectionSettings`, `AzureQueueStorageConnectionSettings`, +`AmazonSqsConnectionSettings`, `AzureServiceBusConnectionSettings`). +Set only the one matching `BrokerType`. + +
+ + +**The connection details for a Kafka broker.** + +```csharp +class KafkaConnectionSettings +{ + string BootstrapServers + Dictionary ConnectionOptions + bool UseRavenCertificate +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **BootstrapServers** | `string` | A comma-separated list of the broker's bootstrap servers, in `host:port` form | +| **ConnectionOptions** | `Dictionary` | Additional Kafka client configuration options, as key-value pairs | +| **UseRavenCertificate** | `bool` | Whether to authenticate the connection with RavenDB's own certificate | + +
+
+ +### Sink task + + + + +**The configuration of a Kafka sink task.** + +```csharp +class QueueSinkConfiguration +{ + string Name + string ConnectionStringName + QueueBrokerType BrokerType + List Scripts + bool Disabled + string MentorNode + bool PinToMentorNode + long TaskId +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The name of the Kafka connection string the task uses | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.Kafka` (must match the connection string's broker type) | +| **Scripts** | `List` | The scripts the task runs | +| **Disabled** | `bool` | Whether the task is created in a disabled state | +| **MentorNode** | `string` | The preferred responsible node for the task, if any | +| **PinToMentorNode** | `bool` | Whether to pin the task to its mentor node | +| **TaskId** | `long` | The task's identifier, assigned by the server | + +
+ + +**A script that turns the messages from its topics into RavenDB documents.** + +```csharp +class QueueSinkScript +{ + string Name + List Queues + string Script + bool Disabled +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The script name | +| **Queues** | `List` | The Kafka topics to consume from | +| **Script** | `string` | The JavaScript that processes each message | +| **Disabled** | `bool` | Whether the script is disabled | + +
+
+ +### Enums + + + + +**Identifies the message broker that a connection string and sink task use.** + +```csharp +enum QueueBrokerType +{ + None, + Kafka, + RabbitMq, + AzureQueueStorage, + AmazonSqs, + AzureServiceBus +} +``` +
+ +| Value | Description | +|---|---| +| **Kafka** | Selects Apache Kafka. Use this value for a Kafka connection string and sink task. | +| **None, RabbitMq, AzureQueueStorage, AmazonSqs, AzureServiceBus** | The other broker types, used by the remaining Queue Sink and Queue ETL brokers. | + +
+
+ +
diff --git a/docs/server/ongoing-tasks/queue-sink/overview.mdx b/docs/server/ongoing-tasks/queue-sink/overview.mdx index 4902f04999..1530862e36 100644 --- a/docs/server/ongoing-tasks/queue-sink/overview.mdx +++ b/docs/server/ongoing-tasks/queue-sink/overview.mdx @@ -1,7 +1,7 @@ --- title: "Ongoing Tasks: Queue Sink Overview" sidebar_label: Overview -description: "Ingest messages from Kafka and RabbitMQ queues into RavenDB documents using queue sink tasks with configurable processing scripts." +description: "Ingest messages from Kafka, RabbitMQ, and Azure Service Bus into RavenDB documents using queue sink tasks with configurable processing scripts." sidebar_position: 0 --- @@ -31,11 +31,11 @@ import LanguageContent from "@site/src/components/LanguageContent"; from broker queues, apply a user-defined script that can, among other things, construct documents from the retrieved messages, and potentially store manufactured documents in RavenDB's database. -* Supported broker queues currently include **Apache Kafka** and **RabbitMQ**. +* Supported message brokers currently include **Apache Kafka**, **RabbitMQ**, and **Azure Service Bus**. Using RavenDB as a message broker sink can benefit users who want to combine -Kafka or RabbitMQ's immense capability to collect and stream data with RavenDB's +Kafka, RabbitMQ, or Azure Service Bus's immense capability to collect and stream data with RavenDB's ability to process this data, reveal and exploit its value. @@ -47,26 +47,24 @@ ability to process this data, reveal and exploit its value. ## Supported Message Brokers -Queue brokers currently supported by RavenDB include **Apache Kafka** and **RabbitMQ**. +Message brokers currently supported by RavenDB include **Apache Kafka**, **RabbitMQ**, and **Azure Service Bus**. ![Ongoing Tasks](./assets/overview_ongoing-tasks.png) -1. **Ongoing Tasks** - Click to open the ongoing tasks view. -2. **Add a Database Task** - Click to create a new ongoing task. -3. **Info Hub** - Click for usage and licensing assistance. - - ![Info Hub](./assets/info-hub.png) -![Define Queue Sink Task](./assets/overview_task-selection.png) - -1. **Kafka Sink** - Click to define a Kafka Queue Sink task. -2. **RabbitMQ Sink** - Click to define a RabbitMQ Queue Sink task. - +1. Open the **tasks menu**. +2. Open the **ongoing tasks** view. +3. Create a new **database task**. + ![Define Queue Sink Task](./assets/overview_task-selection.png) + * **A**. **Kafka Sink** + Click to define a [Kafka Queue Sink task](../../../server/ongoing-tasks/queue-sink/kafka-queue-sink.mdx). + * **B**. **RabbitMQ Sink** + Click to define a [RabbitMQ Queue Sink task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx). + * **C**. **Azure Service Bus Sink** + Click to define an [Azure Service Bus Queue Sink task](../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx). +4. Open the **info hub** for information about ongoing tasks and this view. + ![Info Hub](./assets/info-hub.png) +--- ## Task Statistics diff --git a/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx b/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx index ac6e88140f..fab177a75f 100644 --- a/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx +++ b/docs/server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx @@ -6,6 +6,8 @@ sidebar_position: 2 --- import Admonition from '@theme/Admonition'; +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; import CodeBlock from '@theme/CodeBlock'; import Panel from "@site/src/components/Panel"; import ContentFrame from "@site/src/components/ContentFrame"; @@ -26,16 +28,18 @@ import ContentFrame from "@site/src/components/ContentFrame"; batches of JSON formatted messages from RabbitMQ queues, construct documents using user-defined scripts, and store the documents in RavenDB collections. -* In this page: +* In this article: * [The RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#the-rabbitmq-sink-task) * [Connecting a RabbitMQ broker](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#connecting-a-rabbitmq-broker) * [Retrieving messages from RabbitMQ queues](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#retrieving-messages-from-rabbitmq-queues) * [Running user-defined scripts](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#running-user-defined-scripts) * [Storing documents in RavenDB collections](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#storing-documents-in-ravendb-collections) * [Client API](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#client-api) - * [Add a RabbitMQ Connection String](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) - * [Add a RabbitMQ Sink Task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-sink-task) + * [Adding a RabbitMQ connection string](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#adding-a-rabbitmq-connection-string) + * [Adding a RabbitMQ sink task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#adding-a-rabbitmq-sink-task) * [Configuration Options](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#configuration-options) + * [Syntax](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#syntax) + * [Classes](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#classes) @@ -43,7 +47,7 @@ import ContentFrame from "@site/src/components/ContentFrame"; -## Connecting a RabbitMQ broker +### Connecting a RabbitMQ broker Users of RavenDB 6.0 and on can create an ongoing Sink task that connects a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined @@ -54,18 +58,16 @@ In the message broker architecture, RavenDB sinks take the role of data consumer A sink would connect a RabbitMQ broker using a connection string, and retrieve messages from the broker's queues. -Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#add-a-rabbitmq-connection-string) +Read [below](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#adding-a-rabbitmq-connection-string) about adding a connection string via API. Read [here](../../../studio/database/tasks/ongoing-tasks/rabbitmq-queue-sink.mdx#define-a-rabbitmq-sink-task) about adding a connection string using Studio. ---- - -## Retrieving messages from RabbitMQ queues +### Retrieving messages from RabbitMQ queues While a queue sink task is running, it prevents the host database from being unloaded due to idle operations. @@ -78,11 +80,9 @@ up the queue until it reaches its head and can be consumed by RavenDB's sink. ---- - -## Running user-defined scripts +### Running user-defined scripts A sink task's script is a JavaScript segment. Its basic role is to retrieve selected RabbitMQ messages or message properties, and construct documents that @@ -145,11 +145,9 @@ and [many others](../../../server/kb/javascript-engine.mdx#predefined-javascript ---- - -## Storing documents in RavenDB collections +### Storing documents in RavenDB collections The sink task consumes batches of queued messages and stores them in RavenDB in a transactional manner, processing either the entire batch or none of it. @@ -185,22 +183,14 @@ it will remain. - - -## Add a RabbitMQ Connection String +### Adding a RabbitMQ connection string -Prior to defining a RabbitMQ sink task, add a **RabbitMQ connection string** -that the task will use to connect the message brokers. - -To create the connection string: - -* Create a `QueueConnectionString` instance with the connection string configuration. - Pass it to the `PutConnectionStringOperation` store operation to add the connection string. - -`QueueConnectionString`: +Before defining a sink task, add a RabbitMQ connection string for the task to use. +Create a `QueueConnectionString` object configured for RabbitMQ, and pass it to +`PutConnectionStringOperation`. ```csharp -// Add RabbitMQ connection string +// Add a RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation( new QueueConnectionString @@ -214,77 +204,27 @@ var res = store.Maintenance.Send(
-`QueueBrokerType`: - -```csharp -public enum QueueBrokerType -{ - None, - Kafka, - RabbitMq -} -``` - -
- -| Property | Type | Description | -|---|---|---| -| **Name** | `string` | Connection string name | -| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | -| **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings[]` | A list of strings indicating RabbitMQ brokers connection details | - -
+For the full property reference, see the +[Connection string](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#connection-string) +class in the Syntax section. --- - - -## Add a RabbitMQ Sink Task - -To create the Sink task: - -* Create `QueueSinkScript` instances to define scripts with which the - task can process retrieved messages, apply JavaScript commands, construct - documents and store them in RavenDB. +### Adding a RabbitMQ sink task -```csharp -// Define a Sink script -QueueSinkScript queueSinkScript = new QueueSinkScript -{ - // Script name - Name = "orders", - // A list of RabbitMQ queues to connect - Queues = new List() { "orders" }, - // Apply this script - Script = @"this['@metadata']['@collection'] = 'Orders'; - put(this.Id.toString(), this)" -}; -``` +To define the sink task, prepare a `QueueSinkConfiguration` object and pass it to +`AddQueueSinkOperation`. +The configuration needs to name the connection string to use, set the broker type to RabbitMQ, +and hold one or more `QueueSinkScript` objects. -
- -* Prepare a `QueueSinkConfiguration` object with the sink task configuration. +* Each script lists the RabbitMQ queues to consume from in its `Queues` property, where each + entry is a queue name. +* Each script's `Script` property holds the JavaScript that turns the consumed messages into + RavenDB documents. -`QueueSinkConfiguration` properties: - -| Property | Type | Description | -|---|---|---| -| **Name** | `string` | The sink task name | -| **ConnectionStringName** | `string` | The registered connection string name | -| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` to define a RabbitMQ sink task | -| **Scripts** | `List` | A list of scripts | - -* Pass this object to the `AddQueueSinkOperation` store operation to add the Sink task. - -`QueueSinkScript` properties: - -| Property | Type | Description | -|---|---|---| -| **Name** | `string` | Script name | -| **Queues** | `List` | A list of RabbitMQ queues to consume messages from | -| **Script** | `string` | The script contents | + -#### Example: +#### Example: Adding a RabbitMQ sink task ```csharp // Add RabbitMQ connection string @@ -303,7 +243,7 @@ QueueSinkScript queueSinkScript = new QueueSinkScript { // Script name Name = "orders", - // A list of RabbitMQ queues to connect + // A list of RabbitMQ queues to consume from Queues = new List() { "orders" }, // Apply this script Script = @"this['@metadata']['@collection'] = 'Orders'; @@ -329,6 +269,10 @@ AddQueueSinkOperationResult addQueueSinkOperationResult = +For the `QueueSinkConfiguration` and `QueueSinkScript` property reference, see the +[Sink task](../../../server/ongoing-tasks/queue-sink/rabbit-mq-queue-sink.mdx#sink-task) +classes in the Syntax section. +
@@ -342,3 +286,144 @@ Use these configuration options to gain more control over queue sink tasks. mode (i.e. suspending the process) after a connection failure. + + + +## Classes + +### Connection string + + + + +**The connection string a sink task uses to reach a RabbitMQ broker.** + +```csharp +class QueueConnectionString +{ + string Name + QueueBrokerType BrokerType + RabbitMqConnectionSettings RabbitMqConnectionSettings +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The connection string name | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` for a RabbitMQ connection string | +| **RabbitMqConnectionSettings** | `RabbitMqConnectionSettings` | The broker's connection details | + +`QueueConnectionString` is shared by all queue brokers, so it also defines settings for the other +broker types (`KafkaConnectionSettings`, `AzureQueueStorageConnectionSettings`, +`AmazonSqsConnectionSettings`, `AzureServiceBusConnectionSettings`). +Set only the one matching `BrokerType`. + +
+ + +**The connection details for a RabbitMQ broker.** + +```csharp +class RabbitMqConnectionSettings +{ + string ConnectionString +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **ConnectionString** | `string` | An AMQP connection string for the RabbitMQ broker, e.g. `amqp://guest:guest@localhost:5672/` | + +
+
+ +### Sink task + + + + +**The configuration of a RabbitMQ sink task.** + +```csharp +class QueueSinkConfiguration +{ + string Name + string ConnectionStringName + QueueBrokerType BrokerType + List Scripts + bool Disabled + string MentorNode + bool PinToMentorNode + long TaskId +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The sink task name | +| **ConnectionStringName** | `string` | The name of the RabbitMQ connection string the task uses | +| **BrokerType** | `QueueBrokerType` | Set to `QueueBrokerType.RabbitMq` (must match the connection string's broker type) | +| **Scripts** | `List` | The scripts the task runs | +| **Disabled** | `bool` | Whether the task is created in a disabled state | +| **MentorNode** | `string` | The preferred responsible node for the task, if any | +| **PinToMentorNode** | `bool` | Whether to pin the task to its mentor node | +| **TaskId** | `long` | The task's identifier, assigned by the server | + +
+ + +**A script that turns the messages from its queues into RavenDB documents.** + +```csharp +class QueueSinkScript +{ + string Name + List Queues + string Script + bool Disabled +} +``` +
+ +| Property | Type | Description | +|---|---|---| +| **Name** | `string` | The script name | +| **Queues** | `List` | The RabbitMQ queues to consume from | +| **Script** | `string` | The JavaScript that processes each message | +| **Disabled** | `bool` | Whether the script is disabled | + +
+
+ +### Enums + + + + +**Identifies the message broker that a connection string and sink task use.** + +```csharp +enum QueueBrokerType +{ + None, + Kafka, + RabbitMq, + AzureQueueStorage, + AmazonSqs, + AzureServiceBus +} +``` +
+ +| Value | Description | +|---|---| +| **RabbitMq** | Selects RabbitMQ. Use this value for a RabbitMQ connection string and sink task. | +| **None, Kafka, AzureQueueStorage, AmazonSqs, AzureServiceBus** | The other broker types, used by the remaining Queue Sink and Queue ETL brokers. | + +
+
+ +
diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_constr.png b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_constr.png new file mode 100644 index 0000000000..b0c357f2bb Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_constr.png differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_entra-ID.png b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_entra-ID.png new file mode 100644 index 0000000000..daace7f9e2 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_entra-ID.png differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_passwordless.png b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_passwordless.png new file mode 100644 index 0000000000..5105b5f56f Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_connection-string_passwordless.png differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_define-task.png b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_define-task.png new file mode 100644 index 0000000000..e5214321f8 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_define-task.png differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_ongoing-tasks-view.png b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_ongoing-tasks-view.png new file mode 100644 index 0000000000..5aeb69412e Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_ongoing-tasks-view.png differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_task-selection.png b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_task-selection.png new file mode 100644 index 0000000000..d6635fadf5 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/azure-bus-sink_task-selection.png differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_constr.snagx b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_constr.snagx new file mode 100644 index 0000000000..5b0579b5ac Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_constr.snagx differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_entra-ID.snagx b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_entra-ID.snagx new file mode 100644 index 0000000000..bd33220c83 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_entra-ID.snagx differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_passwordless.snagx b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_passwordless.snagx new file mode 100644 index 0000000000..51d62a42f6 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_connection-string_passwordless.snagx differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_define-task.snagx b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_define-task.snagx new file mode 100644 index 0000000000..ac65381ba0 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_define-task.snagx differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_ongoing-tasks-view.snagx b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_ongoing-tasks-view.snagx new file mode 100644 index 0000000000..06503d6f45 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_ongoing-tasks-view.snagx differ diff --git a/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_task-selection.snagx b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_task-selection.snagx new file mode 100644 index 0000000000..5ba1ffdf42 Binary files /dev/null and b/docs/studio/database/tasks/ongoing-tasks/assets/snagit/azure-bus-sink_task-selection.snagx differ diff --git a/docs/studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx b/docs/studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx new file mode 100644 index 0000000000..893bc569d0 --- /dev/null +++ b/docs/studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx @@ -0,0 +1,124 @@ +--- +title: "Azure Service Bus Queue Sink Task" +sidebar_label: Azure Service Bus Queue Sink +description: "Configure an Azure Service Bus queue sink task in RavenDB Studio to consume messages from Azure Service Bus queues and topic subscriptions and store them as RavenDB documents." +sidebar_position: 14 +--- + +import Admonition from '@theme/Admonition'; + +# Azure Service Bus Queue Sink Task + + + +* **Azure Service Bus** is Microsoft Azure's fully managed message broker. + The broker carries messages between applications (like microservices, or a web app + and its background workers) through queues or topics with subscriptions. + +* RavenDB can consume messages from Azure Service Bus by running an ongoing **sink task**. + The task reads JSON-formatted messages from a queue or a topic subscription, runs a + user-defined script over each message, and stores the documents the script produces + in RavenDB collections. + +* This page explains how to create an Azure Service Bus sink task using Studio. + Learn more about RavenDB queue sinks [here](../../../../server/ongoing-tasks/queue-sink/overview.mdx). + Learn how to define an Azure Service Bus sink task using the client API [here](../../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx). + +* In this article: + * [Add a Database Task](../../../../studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx#add-a-database-task) + * [Define an Azure Service Bus Sink Task](../../../../studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx#define-an-azure-service-bus-sink-task) + * [Authentication methods](../../../../studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx#authentication-methods) + + + +## Add a Database Task + +To open the ongoing tasks view: + +![Ongoing Tasks](./assets/azure-bus-sink_ongoing-tasks-view.png) + +1. **Tasks** + Click to open the Tasks menu. +2. **Ongoing Tasks** + Click to open the ongoing tasks view. +3. **Add a Database Task** + Click to create a new ongoing task. + +![Task Selection](./assets/azure-bus-sink_task-selection.png) + +* Click **Azure Service Bus Sink** to create the task. + +## Define an Azure Service Bus Sink Task + +![New Azure Service Bus Sink](./assets/azure-bus-sink_define-task.png) + +1. **Save** to store the configuration and exit. If the task is enabled it will start running. + **Cancel** to revoke the creation of a new task or the changes made to an existing task. + +2. **Task Name** (Optional) + * Enter a name for your task, e.g., *Orders sink*. + * If no name is provided, RavenDB will create a name based on the connection string, + e.g., *Queue Sink to AzureServiceBusConStr*. + +3. **Task State** + Select the task state: + Enabled - The task runs in the background, reading messages, running the scripts, and storing documents as defined in this view. + Disabled - No messages are read or stored, and the task's scripts are inactive. + +4. **Set responsible node** (Optional) + * Select a node from the [Database Group](../../../../studio/database/settings/manage-database-group.mdx) to be responsible for this task, e.g., node `A`. + * If no node is selected, the cluster will assign a responsible node (see [Members Duties](../../../../studio/database/settings/manage-database-group.mdx#database-group-topology---members-duties)). + +5. **Connection String** + The connection string holds the details RavenDB needs to reach your Azure Service Bus namespace. + Select an existing connection string from the list, or click **Create new connection string** to define a new one. + A new connection string requires a **Name**, e.g., *AzureServiceBusConStr*, and an **Authentication** method. + Learn about the three authentication methods [below](../../../../studio/database/tasks/ongoing-tasks/azure-service-bus-queue-sink.mdx#authentication-methods). + +6. **Test Connection** + After defining the connection string, click to test the connection to the Azure Service Bus namespace. + +7. **Add Script** + Click to add a script to the task and open the script editor. + Edit or delete an existing script from the list. + +8. **Script editor** + Define a script that turns the consumed messages into RavenDB documents, and the sources it reads from. + * **Name** - Name the script, or leave it for the task to generate a name, e.g., *Script_1*. + * **Syntax** - Click for scripting assistance and sample scripts. + * **Script** - Write the script that processes each message, e.g., `put(this.Id.toString(), this)`. + See [Running user-defined scripts](../../../../server/ongoing-tasks/queue-sink/azure-service-bus-queue-sink.mdx#running-user-defined-scripts) on the API page. + * **Enter Queue Name** / **Add Queue** - Enter a source and click **Add Queue** to add it to the list. + A source is either a queue name, e.g., `orders`, or a topic subscription written as `topic;subscription`, e.g., `orders-topic;ravendb-sub`. + * **Add** / **Cancel** - Add the script to the task, or update an existing one. + **Cancel** discards your changes. + * **Test script** - Test the script against a sample message, without reading from Azure Service Bus or storing documents. + +## Authentication methods + +An Azure Service Bus connection authenticates in one of three ways. +Select the method from the **Authentication** dropdown in the connection string dialog. + +* **Connection String** + A SAS (Shared Access Signature) connection string for the namespace, copied from the Azure portal. + It includes the namespace endpoint, a shared access key name, and the key itself. + e.g., `Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=` + + ![Connection string method](./assets/azure-bus-sink_connection-string_constr.png) + +* **Entra ID** + Authenticate with a Microsoft Entra ID application registration instead of a shared access key. + Provide the **Service Bus Namespace**, **Tenant ID**, **Client ID**, and **Client Secret**. + This keeps a shared key out of the configuration and allows granular control through [Role-Based Access Control](https://learn.microsoft.com/en-us/azure/role-based-access-control/). + e.g., the **Service Bus Namespace** `mynamespace.servicebus.windows.net` + + ![Entra ID method](./assets/azure-bus-sink_connection-string_entra-ID.png) + +* **Passwordless** + Authenticate with the managed identity of the machine hosting RavenDB, so no secret is stored in the configuration. + Provide only the **Service Bus Namespace**. + In Azure, grant this identity the **Azure Service Bus Data Receiver** role so it can read messages. + Assign the role on the whole namespace, or on the specific queue or topic the sink consumes from. + e.g., the **Service Bus Namespace** `mynamespace.servicebus.windows.net` + + ![Passwordless method](./assets/azure-bus-sink_connection-string_passwordless.png)