From 7849cfdd1318901972f63c8436ce405d7627bc47 Mon Sep 17 00:00:00 2001 From: Alberto Tessarotto Date: Tue, 10 Mar 2026 08:22:31 +0100 Subject: [PATCH 1/8] docs(stream-processor): document custom partitioner feature - Add partitionerSettings field documentation in Producer Configuration section of 20_Configuration.mdx, including selection modes table (all, one, lowerHalf, upperHalf), explicit partition list, CRC32 algorithm callout, and JSON examples - Add dedicated '## Custom Partitioner' section in 30_Usage.md with partition selection algorithm explanation, available modes table, and split-replica usage example --- .../stream_processor/20_Configuration.mdx | 49 ++++++++++++++++++- .../fast_data_v2/stream_processor/30_Usage.md | 33 +++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index f162ee5a72..3c42414cba 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -133,8 +133,36 @@ The `producer` field configures the output stream destination. Currently support - `connectionName`: reference to a connection defined in the `connections` field - `config`: additional [`librdkafka`](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) producer configuration properties (client ID, compression settings, etc.) + - `partitionerSettings` _(optional)_: controls which partitions the producer may select when sending messages. + When not specified, all available partitions are used (equivalent to `"all"`). -**Example:** + This field accepts either a **selection mode** (string) or an **explicit list of partition indices** (array of integers): + + **Selection modes:** + + | Value | Description | + |-------|-------------| + | `"all"` _(default)_ | All available partitions. Reverts to the topic default settings and, if configured, allows the use of a custom `partitioner` librdkafka option. | + | `"one"` | Always targets partition `0`. | + | `"lowerHalf"` | Lower half of available partitions, count rounded up (at least 1). Equivalent to `consistent_random` using CRC32 hasher on the lower half. | + | `"upperHalf"` | Upper half of available partitions, count rounded down (at least 1). Equivalent to `consistent_random` using CRC32 hasher on the upper half. | + + **Explicit partition list:** + + An array of one or more partition indices (e.g. `[0, 1, 2]`) that the producer may target. + Partitions not present in the topic are silently ignored at runtime. + This is equivalent to applying `consistent_random` using CRC32 hasher on the specified subset of partitions. + + :::info Partition selection algorithm + + When `partitionerSettings` is set to any value other than `"all"`, the producer uses a **CRC32-based** algorithm to determine the target partition: + + - **Non-empty key**: selects the partition using `CRC32(key) % len(usable_partitions)`, matching the behavior of `consistent_random` in librdkafka. + - **Empty or null key**: selects a partition at random among the usable ones. + + ::: + +**Example — using a selection mode:** ```json { "producer": { @@ -144,7 +172,24 @@ The `producer` field configures the output stream destination. Currently support "config": { "client.id": "", "compression.type": "snappy" - } + }, + "partitionerSettings": "lowerHalf" + } +} +``` + +**Example — using an explicit partition list:** +```json +{ + "producer": { + "type": "kafka", + "topic": "", + "connectionName": "kafka", + "config": { + "client.id": "", + "compression.type": "snappy" + }, + "partitionerSettings": [0, 1, 2] } } ``` diff --git a/docs/products/fast_data_v2/stream_processor/30_Usage.md b/docs/products/fast_data_v2/stream_processor/30_Usage.md index e59914edab..41101928f8 100644 --- a/docs/products/fast_data_v2/stream_processor/30_Usage.md +++ b/docs/products/fast_data_v2/stream_processor/30_Usage.md @@ -871,3 +871,36 @@ When implementing the event routing pattern, consider the following recommendati group ID so that all instances receive every message from the source topic; - **Efficient Filtering**: Place filtering logic at the beginning of your processing function to minimize unnecessary computation on irrelevant events + +## Custom Partitioner + +The _Stream Processor_ producer supports a **custom partition selection strategy** via the `partitionerSettings` field in the producer configuration. +This allows fine-grained control over which partitions of the output topic receive messages, which is particularly useful in scenarios where multiple service replicas need to target non-overlapping sets of partitions to avoid duplicated processing or to enforce ordering guarantees. + +For the full list of accepted values and the corresponding JSON schema, refer to the [`partitionerSettings` field description](/products/fast_data_v2/stream_processor/20_Configuration.mdx#producer-configuration) in the Configuration page. + +### Partition Selection Algorithm + +When `partitionerSettings` is set to any value other than `"all"`, the producer replaces the default librdkafka partitioner with a **CRC32-based** algorithm that operates exclusively on the _usable_ partitions determined by the selected mode: + +- **Non-empty key** → `CRC32(key) % number_of_usable_partitions`: the same key is always routed to the same partition, preserving ordering by key within the selected partition set; +- **Empty or null key** → a partition is chosen at random among the usable ones. + +This matches the behavior of the `consistent_random` strategy in librdkafka, restricted to the configured subset of partitions. + +:::warning + +When using `"lowerHalf"`, `"upperHalf"`, or an explicit partition list, ensure that all referenced partitions actually exist in the target topic. +Partition indices that fall outside the topic's partition range are **silently ignored** at runtime. If the resulting usable set is empty, the message will not be delivered. + +::: + +### Available Modes + +| Value | Usable partitions | Notes | +|-------|-------------------|-------| +| `"all"` _(default)_ | All topic partitions | No custom partitioner is applied; uses the librdkafka default (or the `partitioner` config option if set). | +| `"one"` | Partition `0` only | Always targets the first partition regardless of key. | +| `"lowerHalf"` | Partitions `0 … ⌈N/2⌉ − 1` | Lower half, rounded up. At least one partition is always selected. | +| `"upperHalf"` | Partitions `⌊N/2⌋ … N − 1` | Upper half, rounded down. At least one partition is always selected. | +| `[p0, p1, …]` | Explicit list of indices | Only the listed partitions are targeted. Must contain at least one entry. | From 451ea09c6e89abbc5e85be0c0c30b5a1e0c2fc76 Mon Sep 17 00:00:00 2001 From: Alberto Tessarotto Date: Tue, 10 Mar 2026 08:43:46 +0100 Subject: [PATCH 2/8] add modifications --- .../stream_processor/20_Configuration.mdx | 59 +++---------------- .../fast_data_v2/stream_processor/30_Usage.md | 36 +++++++++++ 2 files changed, 43 insertions(+), 52 deletions(-) diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index 3c42414cba..e25f143de3 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -84,7 +84,7 @@ The configuration requires these main fields: - **`producer`**: defines how the service produces messages to the output destination - **`processor`**: defines the processing engine and its settings -### Connections (Optional) +### Connections Configuration The `connections` field is a map where each key is a connection name and its value is a `ConnectionConfig`. @@ -93,7 +93,7 @@ schema. The connection name has to be referenced in both `consumer` and `producer` configurations using the `connectionName` field. -#### Consumer Configuration +### Consumer Configuration The `consumer` field configures the input stream source. Currently supports: @@ -123,7 +123,7 @@ The `consumer` field configures the input stream source. Currently supports: } ``` -#### Producer Configuration +### Producer Configuration The `producer` field configures the output stream destination. Currently supports: @@ -153,48 +153,9 @@ The `producer` field configures the output stream destination. Currently support Partitions not present in the topic are silently ignored at runtime. This is equivalent to applying `consistent_random` using CRC32 hasher on the specified subset of partitions. - :::info Partition selection algorithm + For usage examples and details on the partition selection algorithm, refer to the [Custom Partitioner](/products/fast_data_v2/stream_processor/30_Usage.md#custom-partitioner) section in the Usage page. - When `partitionerSettings` is set to any value other than `"all"`, the producer uses a **CRC32-based** algorithm to determine the target partition: - - - **Non-empty key**: selects the partition using `CRC32(key) % len(usable_partitions)`, matching the behavior of `consistent_random` in librdkafka. - - **Empty or null key**: selects a partition at random among the usable ones. - - ::: - -**Example — using a selection mode:** -```json -{ - "producer": { - "type": "kafka", - "topic": "", - "connectionName": "kafka", - "config": { - "client.id": "", - "compression.type": "snappy" - }, - "partitionerSettings": "lowerHalf" - } -} -``` - -**Example — using an explicit partition list:** -```json -{ - "producer": { - "type": "kafka", - "topic": "", - "connectionName": "kafka", - "config": { - "client.id": "", - "compression.type": "snappy" - }, - "partitionerSettings": [0, 1, 2] - } -} -``` - -#### Processor Configuration +### Processor Configuration The `processor` field configures the processing engine. Currently supports: @@ -248,7 +209,7 @@ Each cache can be: For more details on how to interact with the caches, please read the [dedicated section](/products/fast_data_v2/stream_processor/30_Usage.md#cache-access-) in the Usage page. -#### Control Plane Configuration (Optional) +### Control Plane Configuration (Optional) The `controlPlane` field is optional and configures integration with the Control Plane Operator: @@ -256,7 +217,7 @@ The `controlPlane` field is optional and configures integration with the Control - `feedbackInterval`: interval between feedback events in milliseconds (default: `3000ms`) - `resumeAfterMs`: delay before processing when control plane connection fails (optional) -#### Secret Management +### Secret Management The schema supports flexible secret management through the `Secret` type, which can be: @@ -282,12 +243,6 @@ in the `index.js` file. For more details on how the user-defined processor function can be written and it how works, please read the [Usage](/products/fast_data_v2/stream_processor/30_Usage.md) page. -## Control Plane Support - -The service implements the interface for connecting towards a Control-Plane Operator. -However, complete [Runtime Management](/products/fast_data/runtime_management/overview.mdx) support, that is with Control Plane UI -and a central Control Plane instance will be added in the future. - ## Recommended Kafka Configuration When configuring Kafka consumer, it is advised to set appropriate values for diff --git a/docs/products/fast_data_v2/stream_processor/30_Usage.md b/docs/products/fast_data_v2/stream_processor/30_Usage.md index 41101928f8..a5d828e960 100644 --- a/docs/products/fast_data_v2/stream_processor/30_Usage.md +++ b/docs/products/fast_data_v2/stream_processor/30_Usage.md @@ -904,3 +904,39 @@ Partition indices that fall outside the topic's partition range are **silently i | `"lowerHalf"` | Partitions `0 … ⌈N/2⌉ − 1` | Lower half, rounded up. At least one partition is always selected. | | `"upperHalf"` | Partitions `⌊N/2⌋ … N − 1` | Upper half, rounded down. At least one partition is always selected. | | `[p0, p1, …]` | Explicit list of indices | Only the listed partitions are targeted. Must contain at least one entry. | + +### Configuration Examples + +**Using a selection mode:** + +```json +{ + "producer": { + "type": "kafka", + "topic": "", + "connectionName": "kafka", + "config": { + "client.id": "", + "compression.type": "snappy" + }, + "partitionerSettings": "lowerHalf" + } +} +``` + +**Using an explicit partition list:** + +```json +{ + "producer": { + "type": "kafka", + "topic": "", + "connectionName": "kafka", + "config": { + "client.id": "", + "compression.type": "snappy" + }, + "partitionerSettings": [0, 1, 2] + } +} +``` From ce32c754352743eee82dc38bcc0667db96f79bae Mon Sep 17 00:00:00 2001 From: Alberto Tessarotto Date: Tue, 10 Mar 2026 08:58:39 +0100 Subject: [PATCH 3/8] update schemas --- .../stream_processor/20_Configuration.mdx | 20 +- .../stream-processor.0.7.0.example-basic.json | 28 + .../stream-processor.0.7.0.schema.json | 782 ++++++++++++++++++ 3 files changed, 819 insertions(+), 11 deletions(-) create mode 100644 static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json create mode 100644 static/schemas/fast_data/stream-processor.0.7.0.schema.json diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index e25f143de3..14b32876df 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -4,8 +4,8 @@ title: Configuration sidebar_label: Configuration --- -import SPSchema from "@site/static/schemas/fast_data/stream-processor.0.6.0.schema.json" -import SPConfigSchema from "@site/static/schemas/fast_data/examples/stream-processor.0.6.0.example-basic.json" +import SPSchema from "@site/static/schemas/fast_data/stream-processor.0.7.0.schema.json" +import SPConfigSchema from "@site/static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json" import SchemaViewer from "@site/src/components/SchemaViewer" In order to configure the service, a set of environment variables are adopted for @@ -54,8 +54,6 @@ Here it is shown the layout the configuration folder should have: └── index.js # --> user-defined processing function ``` -### Service Settings file - The main configuration file describes the connection details with the stream platform, which customizations should be applied to the consumer and producer, whether one or more cache connections should be instantiated and how the sandbox @@ -68,7 +66,7 @@ exampled in details. :::note -The raw JSON schema can also be found here. +The raw JSON schema can also be found here. In addition, Kafka configurations and cache persistence properties support [**secret resolution**](/products/fast_data_v2/secrets_resolution.md). @@ -84,7 +82,7 @@ The configuration requires these main fields: - **`producer`**: defines how the service produces messages to the output destination - **`processor`**: defines the processing engine and its settings -### Connections Configuration +### Connections The `connections` field is a map where each key is a connection name and its value is a `ConnectionConfig`. @@ -93,7 +91,7 @@ schema. The connection name has to be referenced in both `consumer` and `producer` configurations using the `connectionName` field. -### Consumer Configuration +### Consumer The `consumer` field configures the input stream source. Currently supports: @@ -123,7 +121,7 @@ The `consumer` field configures the input stream source. Currently supports: } ``` -### Producer Configuration +### Producer The `producer` field configures the output stream destination. Currently supports: @@ -155,7 +153,7 @@ The `producer` field configures the output stream destination. Currently support For usage examples and details on the partition selection algorithm, refer to the [Custom Partitioner](/products/fast_data_v2/stream_processor/30_Usage.md#custom-partitioner) section in the Usage page. -### Processor Configuration +### Processor The `processor` field configures the processing engine. Currently supports: @@ -176,7 +174,7 @@ The `processor` field configures the processing engine. Currently supports: in the for of a [kafka producer](#producer-configuration) configuration object, which defines how to connect to the DLQ topic. -#### Caches Configuration (Optional) +#### Caches (Optional) The `caches` field is an optional object that defines named caches for use within processing functions. In case no cache is defined, the _Stateful Stream Processing_ @@ -209,7 +207,7 @@ Each cache can be: For more details on how to interact with the caches, please read the [dedicated section](/products/fast_data_v2/stream_processor/30_Usage.md#cache-access-) in the Usage page. -### Control Plane Configuration (Optional) +### Control Plane Support (Optional) The `controlPlane` field is optional and configures integration with the Control Plane Operator: diff --git a/static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json b/static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json new file mode 100644 index 0000000000..50332b9868 --- /dev/null +++ b/static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json @@ -0,0 +1,28 @@ +{ + "connections": { + "kafka-main": { + "type": "kafka", + "config": { + "bootstrap.servers": "localhost:9092" + } + } + }, + "consumer": { + "type": "kafka", + "connectionName": "kafka-main", + "topic": "input-topic", + "config": { + "group.id": "stream-processor-consumer-group", + "auto.offset.reset": "earliest" + } + }, + "producer": { + "type": "kafka", + "connectionName": "kafka-main", + "topic": "output-topic", + "partitionerSettings": "all" + }, + "processor": { + "type": "javascript" + } +} diff --git a/static/schemas/fast_data/stream-processor.0.7.0.schema.json b/static/schemas/fast_data/stream-processor.0.7.0.schema.json new file mode 100644 index 0000000000..b28623f3f3 --- /dev/null +++ b/static/schemas/fast_data/stream-processor.0.7.0.schema.json @@ -0,0 +1,782 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Configuration", + "type": "object", + "properties": { + "caches": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/Cache" + }, + "default": {} + }, + "connections": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/ConnectionConfig" + } + }, + "consumer": { + "$ref": "#/definitions/ConsumerConfig" + }, + "controlPlane": { + "anyOf": [ + { + "$ref": "#/definitions/ControlPlaneConfig" + }, + { + "type": "null" + } + ] + }, + "processor": { + "$ref": "#/definitions/ProcessorModeConfig" + }, + "producer": { + "$ref": "#/definitions/ProducerConfig" + } + }, + "required": [ + "connections", + "consumer", + "producer", + "processor" + ], + "definitions": { + "Cache": { + "oneOf": [ + { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "in-memory" + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "mongodb" + } + }, + "allOf": [ + { + "$ref": "#/definitions/MongodbCacheConfig" + } + ], + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "farm-data" + } + }, + "allOf": [ + { + "$ref": "#/definitions/FarmDataConfig" + } + ], + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "http-rest" + } + }, + "allOf": [ + { + "$ref": "#/definitions/HttpRestConfig" + } + ], + "required": [ + "type" + ] + } + ] + }, + "ConnectionConfig": { + "oneOf": [ + { + "type": "object", + "properties": { + "config": { + "$ref": "#/definitions/KafkaConnectionConfig" + }, + "type": { + "type": "string", + "const": "kafka" + } + }, + "required": [ + "type", + "config" + ] + }, + { + "type": "object", + "properties": { + "config": { + "$ref": "#/definitions/MongodbConnectionConfig" + }, + "type": { + "type": "string", + "const": "mongodb" + } + }, + "required": [ + "type", + "config" + ] + } + ] + }, + "ConsumerConfig": { + "oneOf": [ + { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "kafka" + } + }, + "allOf": [ + { + "$ref": "#/definitions/KafkaConsumerConfig" + } + ], + "required": [ + "type" + ] + } + ] + }, + "ControlPlaneConfig": { + "type": "object", + "properties": { + "feedbackInterval": { + "description": "Interval in milliseconds that must elapse between two feedback events sent to Control Plane Operator.\nIt defaults to `3000` ms when not provided during deserialization.", + "type": "integer", + "format": "uint64", + "default": 3000, + "minimum": 0 + }, + "grpcAddress": { + "description": "Address to the gRPC server that should receive service feedback events", + "type": "string", + "examples": [ + "http://control-plane-operator:50052" + ] + }, + "onCreate": { + "description": "Desired initial state of the Service, to be included in the registration request with the Control Plane.\nIf not provided, the service will start in `Pause` state.", + "anyOf": [ + { + "$ref": "#/definitions/InitialState" + }, + { + "type": "null" + } + ], + "default": null + }, + "resumeAfterMs": { + "description": "The number of milliseconds to wait before running the processing logic\nwhen connection with control plane operator failed\nand no desired fast data state was ever received.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "default": null, + "minimum": 0 + } + }, + "required": [ + "grpcAddress" + ] + }, + "FarmDataConfig": { + "type": "object", + "properties": { + "head": { + "description": "The name of the aggregation head node", + "type": "string" + }, + "http2": { + "description": "Whether to use HTTP/2 prior knowledge", + "type": "boolean", + "default": false + }, + "url": { + "description": "The base URL of the FarmData service.", + "type": "string", + "format": "uri" + } + }, + "required": [ + "url", + "head" + ] + }, + "HttpRestConfig": { + "type": "object", + "properties": { + "get": { + "description": "Cache GET endpoint configuration", + "anyOf": [ + { + "$ref": "#/definitions/RestEndpointConfig" + }, + { + "type": "null" + } + ] + }, + "http2": { + "description": "Whether to use HTTP/2 prior knowledge", + "type": "boolean", + "default": false + }, + "url": { + "description": "The common base URL for the REST endpoints.", + "type": "string", + "format": "uri" + } + }, + "required": [ + "url" + ] + }, + "InitialState": { + "description": "Enum to map the initial state of the service when registering to Control Plane,\naccording to the value defined in the configuration file.\n\nThis state should be used to be mapped to the Piper's ExecState.", + "type": "string", + "enum": [ + "pause", + "resume" + ] + }, + "KafkaConnectionConfig": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/Secret" + } + }, + "KafkaConsumerConfig": { + "type": "object", + "properties": { + "commitIntervalMs": { + "description": "number of milliseconds between one commit and another", + "type": "integer", + "format": "uint64", + "default": 500, + "minimum": 0 + }, + "config": { + "description": "librdkafka Kafka consumer configuration properties\nlibrdkafka Kafka consumer configuration properties. Do not include here Kafka configurations\nabout the Kafka instance and/or connection, as it will be overrided by the selected connection", + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/Secret" + }, + "default": {} + }, + "connectionName": { + "description": "connection name to use from the connections map (must be type kafka)\n\nThis property is required from version 0.6.0 onwards.", + "type": "string" + }, + "topic": { + "description": "name of the Kafka topic from which the consumer will read messages", + "type": "string" + } + }, + "required": [ + "connectionName", + "topic" + ] + }, + "KafkaProducerConfig": { + "type": "object", + "properties": { + "config": { + "description": "librdkafka Kafka consumer configuration properties. Do not include here Kafka configurations\nabout the Kafka instance and/or connection, as it will be overrided by the selected connection", + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/Secret" + }, + "default": {} + }, + "connectionName": { + "description": "Connection name to use from the connections map (must be type kafka)\n\nThis property is required from version 0.6.0 onwards.", + "type": "string" + }, + "partitionerSettings": { + "description": "Includes the settings for the custom partitioner logic to determine the partition to which send the message.\nIf not specified, the default logic will be to use all the available partitions.", + "allOf": [ + { + "$ref": "#/definitions/PartitionerSettings" + } + ], + "default": "all" + }, + "topic": { + "description": "name of the Kafka topic to which the producer will send messages", + "type": "string" + } + }, + "required": [ + "connectionName", + "topic" + ] + }, + "MongodbCacheConfig": { + "type": "object", + "properties": { + "appName": { + "type": [ + "string", + "null" + ] + }, + "collection": { + "type": "string" + }, + "connectionName": { + "description": "Connection name to use from the connections map (must be type mongo/mongodb)\n\nThis property is required from version 0.6.0 onwards.", + "type": "string" + }, + "database": { + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "collection", + "connectionName" + ] + }, + "MongodbConnectionConfig": { + "type": "object", + "properties": { + "appName": { + "type": [ + "string", + "null" + ] + }, + "database": { + "type": [ + "string", + "null" + ] + }, + "url": { + "$ref": "#/definitions/Secret" + } + }, + "required": [ + "url" + ] + }, + "MultiValue": { + "type": "object", + "properties": { + "key": { + "type": "string" + }, + "value": { + "$ref": "#/definitions/Secret" + } + }, + "required": [ + "key", + "value" + ] + }, + "MultiValue2": { + "type": "object", + "properties": { + "key": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "required": [ + "key", + "value" + ] + }, + "PartitionerSelectionMode": { + "oneOf": [ + { + "description": "Use only one partition", + "type": "string", + "const": "one" + }, + { + "description": "Use lower half of the available partitions, rounded up, with at least one partition.\n\nThis is equivalent to `consistent_random` using CRC32 hasher on the\nlower half of the partitions.", + "type": "string", + "const": "lowerHalf" + }, + { + "description": "Use upper half of the available partitions, rounded down, with at least one partition.\n\nThis is equivalent to `consistent_random` using CRC32 hasher on the\nupper half of the partitions.", + "type": "string", + "const": "upperHalf" + }, + { + "description": "Use all the available partitions.\nThis allows to revert to topic default settings\nand, if applies, the use of custom partitioner other then `consistent_random`.\n\nPartitioners can be configured using the [`partitioner` flag](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html)", + "type": "string", + "const": "all" + } + ] + }, + "PartitionerSettings": { + "description": "Specify which logic of custom partitioning to use in the producer to determine the partition to which send the message.\n\nSelection of the partition to use will be done at runtime, using [`crc32fast` lib](https://docs.rs/crc32fast/latest/crc32fast/) hash\namong the message key and the list of available partitions determined by the selected logic.\nIf no logic is specified, all the available partitions will be used.", + "anyOf": [ + { + "description": "Use one of the predefined selection modes.", + "allOf": [ + { + "$ref": "#/definitions/PartitionerSelectionMode" + } + ] + }, + { + "description": "List which partitions are going to be selectable when sending messages. The list must contain at least one partition.\nIf the list includes one or more partitions that are not available, the partitioner will ignore them\n\nThis is equivalent to `consistent_random` using CRC32 hasher on the\nselected subset of the partitions.", + "type": "array", + "items": { + "type": "integer", + "format": "int32" + }, + "minItems": 1 + } + ] + }, + "ProcessorConfig": { + "oneOf": [ + { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "javascript" + } + }, + "allOf": [ + { + "$ref": "#/definitions/SandboxConfig" + } + ], + "required": [ + "type" + ] + } + ] + }, + "ProcessorModeConfig": { + "anyOf": [ + { + "allOf": [ + { + "type": "object", + "properties": { + "dlq": { + "$ref": "#/definitions/ProducerConfig" + }, + "onError": { + "enum": [ + "dlq" + ] + } + }, + "required": [ + "onError", + "dlq" + ] + }, + { + "$ref": "#/definitions/ProcessorConfig" + } + ] + }, + { + "allOf": [ + { + "type": "object", + "properties": { + "onError": { + "enum": [ + "fastFast", + "FailFast" + ] + } + } + }, + { + "$ref": "#/definitions/ProcessorConfig" + } + ] + } + ] + }, + "ProducerConfig": { + "oneOf": [ + { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "kafka" + } + }, + "allOf": [ + { + "$ref": "#/definitions/KafkaProducerConfig" + } + ], + "required": [ + "type" + ] + } + ] + }, + "RestEndpointConfig": { + "type": "object", + "properties": { + "headers": { + "description": "Additional headers to include in the request.", + "type": "array", + "items": { + "$ref": "#/definitions/MultiValue" + } + }, + "method": { + "description": "A valid HTTP method", + "type": "string", + "enum": [ + "GET", + "HEAD", + "POST", + "PUT", + "DELETE", + "CONNECT", + "OPTIONS", + "TRACE", + "PATCH", + "get", + "head", + "post", + "put", + "delete", + "connect", + "options", + "trace", + "patch", + "Get", + "Head", + "Post", + "Put", + "Delete", + "Connect", + "Options", + "Trace", + "Patch" + ] + }, + "path": { + "description": "The base URL of the FarmData service.", + "allOf": [ + { + "$ref": "#/definitions/Segments" + } + ] + }, + "query": { + "description": "Additional query parameters to include in the request.", + "type": "array", + "items": { + "$ref": "#/definitions/MultiValue2" + } + } + }, + "required": [ + "method", + "path" + ] + }, + "SandboxConfig": { + "type": "object", + "properties": { + "consoleBuffer": { + "description": "Size in bytes available to the console object in the sandbox", + "type": "integer", + "format": "uint", + "default": 1024, + "minimum": 0 + }, + "interruptMs": { + "description": "Max time in milliseconds a single function can be running within the sandbox", + "type": "integer", + "format": "uint64", + "default": 5000, + "minimum": 0 + }, + "maxHeapSize": { + "description": "Max heap size in bytes. When not set quickjs's default will be used", + "type": [ + "integer", + "null" + ], + "format": "uint", + "default": null, + "minimum": 0 + }, + "maxStackSize": { + "description": "Max stack size in bytes. When not set quickjs's default will be used", + "type": [ + "integer", + "null" + ], + "format": "uint", + "default": null, + "minimum": 0 + }, + "payloadSerdeStrategy": { + "description": "Option to configure the deserialization\nfor incoming payload, a.k.a. the method to call\non payload before to inject it in the sandbox message argument", + "allOf": [ + { + "$ref": "#/definitions/SerdeSettings" + } + ], + "default": { + "deserialize": "json" + } + } + } + }, + "Secret": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "object", + "properties": { + "encoding": { + "description": "Define which type of encoding the library supports when it needs to read the actual secret value.", + "type": "string", + "enum": [ + "base64" + ] + }, + "key": { + "type": "string" + }, + "type": { + "const": "env" + } + }, + "required": [ + "type", + "key" + ] + }, + { + "type": "object", + "properties": { + "encoding": { + "description": "Define which type of encoding the library supports when it needs to read the actual secret value.", + "type": "string", + "enum": [ + "base64" + ] + }, + "key": { + "type": "string" + }, + "path": { + "type": "string" + }, + "type": { + "const": "file" + } + }, + "required": [ + "type", + "path" + ] + } + ], + "examples": [ + "my-secret", + { + "key": "CUSTOM_ENV_VAR", + "type": "env" + }, + { + "encoding": "base64", + "key": "CUSTOM_ENV_VAR", + "type": "env" + }, + { + "path": "/path/to/file", + "type": "file" + } + ] + }, + "Segments": { + "type": "string" + }, + "SerdeMode": { + "description": "Describe which serialization or deserialization strategy\nshould be applied to the key of a Kafka message", + "oneOf": [ + { + "description": "serialize/deserialize the content as a JSON Object", + "type": "string", + "const": "json" + }, + { + "description": "serialize/deserialize the content as a JSON Object\nwith compatibility for schema+payload when\nKafka uses a schema registry. The payload is in a\nsubkey payload", + "type": "string", + "const": "jsonWithSchema" + }, + { + "description": "serialize/deserialize the key as a string: useful\nwhen payload bytes have to be processed raw inside the\nsandbox", + "type": "string", + "const": "string" + } + ] + }, + "SerdeSettings": { + "type": "object", + "properties": { + "deserialize": { + "allOf": [ + { + "$ref": "#/definitions/SerdeMode" + } + ], + "default": "json" + } + } + } + } +} From b9478e0c6a2d80f44e154c5b828bad2c0fcff430 Mon Sep 17 00:00:00 2001 From: Alberto Tessarotto Date: Tue, 10 Mar 2026 09:05:45 +0100 Subject: [PATCH 4/8] improve control plane paragraph --- docs/products/fast_data_v2/farm_data/20_Configuration.mdx | 8 +++----- docs/products/fast_data_v2/kango/20_Configuration.mdx | 7 +++---- .../fast_data_v2/stream_processor/20_Configuration.mdx | 7 ++----- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx index 16ee20164e..444f7fced5 100644 --- a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx +++ b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx @@ -214,12 +214,10 @@ Properties that support `Secret` interface are: For more details, please refer to [secrets resolution](/products/fast_data_v2/secrets_resolution.md) in config maps documentation page and [`secret_rs`](https://docs.rs/secret_rs/latest/secret_rs/index.html) library documentation. -### Control Plane Support +### Control Plane Support (Optional) -The service implements the interface for connecting towards a Control-Plane Operator. -However, complete [Runtime Management](/products/fast_data/runtime_management/overview.mdx) -support, that is with Control Plane UI and a central Control Plane instance -will be added in the future. +The `controlPlane` field is optional and configures integration with the Control Plane. +For more details on how to configure it, please refer to the [Workloads Configuration](/products/fast_data_v2/runtime_management/application_configuration.md#workloads-configuration) section of the Runtime Management documentation. ### Recommended Kafka Configuration diff --git a/docs/products/fast_data_v2/kango/20_Configuration.mdx b/docs/products/fast_data_v2/kango/20_Configuration.mdx index cb5543385a..55c275986b 100644 --- a/docs/products/fast_data_v2/kango/20_Configuration.mdx +++ b/docs/products/fast_data_v2/kango/20_Configuration.mdx @@ -59,11 +59,10 @@ The raw JSON schema can also be found Date: Tue, 10 Mar 2026 11:17:35 +0100 Subject: [PATCH 7/8] refinement --- .../stream_processor/20_Configuration.mdx | 4 ++-- .../fast_data_v2/stream_processor/30_Usage.md | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index 3e83d034be..69d37cbd37 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -183,7 +183,7 @@ use case cannot be supported. Each cache can be: - **MongoDB Cache** (`type: "mongodb"`): Persistent cache backed by MongoDB - - `url`: MongoDB connection string (supports secrets) + - `connectionName`: reference to a MongoDB connection defined in the `connections` field - `collection`: collection name for cache storage - `database`: database name (optional) - `appName`: application name for MongoDB connection (optional - useful to track @@ -191,7 +191,7 @@ Each cache can be: - **In-Memory Cache** (`type: "in-memory"`): Simple in-memory key-value storage. Use only when sharing the stream state across different service instances is not necessary. However, when it is possible use a sharable stream state. -- **Farm Data Cache** (`type: "farmdata"`): Cache backed by Farm Data Service. +- **Farm Data Cache** (`type: "farm-data"`): Cache backed by Farm Data Service. - `url`: Farm Data Service endpoint URL - `head`: Head collection for aggregation - `http2`: set http2_prior_knowledge to true when connecting to an http2 enabled endpoint (default: false) diff --git a/docs/products/fast_data_v2/stream_processor/30_Usage.md b/docs/products/fast_data_v2/stream_processor/30_Usage.md index a5d828e960..512ae1032a 100644 --- a/docs/products/fast_data_v2/stream_processor/30_Usage.md +++ b/docs/products/fast_data_v2/stream_processor/30_Usage.md @@ -465,7 +465,7 @@ them quickly. | Cache Type | Get | Set | Update | Delete | Purpose | |------------|:---:|:---:|:------:|:------:|---------| | `mongodb` | ✔️ | ✔️ | ✔️ | ✔️ | Full stateful stream processing with versioning | -| `farmdata` | ✔️ | ❌ | ❌ | ❌ | Read-only access to Farm Data service | +| `farm-data` | ✔️ | ❌ | ❌ | ❌ | Read-only access to Farm Data service | | `http-rest`| ✔️ | ❌ | ❌ | ❌ | Read-only access to external REST APIs | All cache methods are **asynchronous operations**. When a cache type doesn't support @@ -481,10 +481,18 @@ stream processing requirements. ```json { + "connections": { + "mongo": { + "type": "mongodb", + "config": { + "url": "mongodb://localhost:27017/fast-data" + } + } + }, "caches": { "customer-cache": { "type": "mongodb", - "url": "mongodb://localhost:27017/fast-data", + "connectionName": "mongo", "appName": "eu.miaplatform.fastdata.stream-processor", "database": "fast-data", "collection": "stream-processor-state" @@ -581,7 +589,7 @@ use the Kafka key with the FarmData cache to retrieve the full content. { "caches": { "farmdata-cache": { - "type": "farmdata", + "type": "farm-data", "url": "http://farm-data-service:3000", "head": "aggregations", "http2": false @@ -833,7 +841,7 @@ based on their content or type, the recommended pattern is to deploy **multiple instances**, each subscribing to the same source topic with distinct consumer groups. Each Stream Processor filters and processes only the messages relevant to its specific Single View, -discarding all others using the filtering capabilities described in the [Filtering section](#filtering-). +discarding all others using the filtering capabilities described in the [Filtering section](#filtering). ```mermaid graph LR From 431fe82ee2cee6c96daf6672f555d2d4c4c95813 Mon Sep 17 00:00:00 2001 From: Alberto Tessarotto Date: Wed, 18 Mar 2026 18:49:08 +0100 Subject: [PATCH 8/8] add new updates --- .../fast_data_v2/stream_processor/20_Configuration.mdx | 6 +++--- docs/products/fast_data_v2/stream_processor/CHANGELOG.md | 2 +- ...basic.json => stream-processor.0.6.3.example-basic.json} | 0 ...0.7.0.schema.json => stream-processor.0.6.3.schema.json} | 0 4 files changed, 4 insertions(+), 4 deletions(-) rename static/schemas/fast_data/examples/{stream-processor.0.7.0.example-basic.json => stream-processor.0.6.3.example-basic.json} (100%) rename static/schemas/fast_data/{stream-processor.0.7.0.schema.json => stream-processor.0.6.3.schema.json} (100%) diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index 69d37cbd37..40c43de7d4 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -4,8 +4,8 @@ title: Configuration sidebar_label: Configuration --- -import SPSchema from "@site/static/schemas/fast_data/stream-processor.0.7.0.schema.json" -import SPConfigSchema from "@site/static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json" +import SPSchema from "@site/static/schemas/fast_data/stream-processor.0.6.3.schema.json" +import SPConfigSchema from "@site/static/schemas/fast_data/examples/stream-processor.0.6.3.example-basic.json" import SchemaViewer from "@site/src/components/SchemaViewer" In order to configure the service, a set of environment variables are adopted for @@ -66,7 +66,7 @@ exampled in details. :::note -The raw JSON schema can also be found here. +The raw JSON schema can also be found here. In addition, Kafka configurations and cache persistence properties support [**secret resolution**](/products/fast_data_v2/secrets_resolution.md). diff --git a/docs/products/fast_data_v2/stream_processor/CHANGELOG.md b/docs/products/fast_data_v2/stream_processor/CHANGELOG.md index 4e7732f311..525b9a5cec 100644 --- a/docs/products/fast_data_v2/stream_processor/CHANGELOG.md +++ b/docs/products/fast_data_v2/stream_processor/CHANGELOG.md @@ -10,7 +10,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.7.0] - 2026-03-10 +## [0.7.3] - 2026-03-10 ### Added diff --git a/static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json b/static/schemas/fast_data/examples/stream-processor.0.6.3.example-basic.json similarity index 100% rename from static/schemas/fast_data/examples/stream-processor.0.7.0.example-basic.json rename to static/schemas/fast_data/examples/stream-processor.0.6.3.example-basic.json diff --git a/static/schemas/fast_data/stream-processor.0.7.0.schema.json b/static/schemas/fast_data/stream-processor.0.6.3.schema.json similarity index 100% rename from static/schemas/fast_data/stream-processor.0.7.0.schema.json rename to static/schemas/fast_data/stream-processor.0.6.3.schema.json