Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@
"PRELIOS",
"principale",
"println",
"partitioner",
"Partitioner",
"Proto",
"qiankun",
"quarkus",
Expand Down
8 changes: 3 additions & 5 deletions docs/products/fast_data_v2/farm_data/20_Configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,10 @@ Properties that support `Secret` interface are:
For more details, please refer to [secrets resolution](/products/fast_data_v2/secrets_resolution.md)
in config maps documentation page and [`secret_rs`](https://docs.rs/secret_rs/latest/secret_rs/index.html) library documentation.

### Control Plane Support
### Control Plane Support (Optional)

The service implements the interface for connecting towards a Control-Plane Operator.
However, complete [Runtime Management](/products/fast_data/runtime_management/overview.mdx)
support, that is with Control Plane UI and a central Control Plane instance
will be added in the future.
The `controlPlane` field is optional and configures integration with the Control Plane.
For more details on how to configure it, please refer to the [Workloads Configuration](/products/fast_data_v2/runtime_management/application_configuration.md#workloads-configuration) section of the Runtime Management documentation.

### Recommended Kafka Configuration

Expand Down
7 changes: 3 additions & 4 deletions docs/products/fast_data_v2/kango/20_Configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ The raw JSON schema can also be found <a target="_blank" href="/schemas/fast_dat

:::

## Control Plane Support
### Control Plane Support (Optional)

The service implements the interface for connecting towards a Control-Plane Operator.
However, complete [Runtime Management](/products/fast_data/runtime_management/overview.mdx) support, that is with Control Plane UI and
a central Control Plane instance will be added in the future.
The `controlPlane` field is optional and configures integration with the Control Plane.
For more details on how to configure it, please refer to the [Workloads Configuration](/products/fast_data_v2/runtime_management/application_configuration.md#workloads-configuration) section of the Runtime Management documentation.

## Recommended Kafka Configuration

Expand Down
73 changes: 34 additions & 39 deletions docs/products/fast_data_v2/stream_processor/20_Configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ title: Configuration
sidebar_label: Configuration
---

import SPSchema from "@site/static/schemas/fast_data/stream-processor.0.6.0.schema.json"
import SPConfigSchema from "@site/static/schemas/fast_data/examples/stream-processor.0.6.0.example-basic.json"
import SPSchema from "@site/static/schemas/fast_data/stream-processor.0.6.3.schema.json"
import SPConfigSchema from "@site/static/schemas/fast_data/examples/stream-processor.0.6.3.example-basic.json"
import SchemaViewer from "@site/src/components/SchemaViewer"

In order to configure the service, a set of environment variables are adopted for
Expand Down Expand Up @@ -54,8 +54,6 @@ Here it is shown the layout the configuration folder should have:
└── index.js # --> user-defined processing function
```

### Service Settings file

The main configuration file describes the connection details with the stream
platform, which customizations should be applied to the consumer and producer,
whether one or more cache connections should be instantiated and how the sandbox
Expand All @@ -68,7 +66,7 @@ exampled in details.

:::note

The raw JSON schema can also be found <a target="_blank" href="/schemas/fast_data/stream-processor.0.5.2.schema.json">here</a>.
The raw JSON schema can also be found <a target="_blank" href="/schemas/fast_data/stream-processor.0.6.3.schema.json">here</a>.

In addition, Kafka configurations and cache persistence properties
support [**secret resolution**](/products/fast_data_v2/secrets_resolution.md).
Expand All @@ -84,7 +82,7 @@ The configuration requires these main fields:
- **`producer`**: defines how the service produces messages to the output destination
- **`processor`**: defines the processing engine and its settings

### Connections (Optional)
### Connections

The `connections` field is a map where each key is a connection name and its value is
a `ConnectionConfig`.
Expand All @@ -93,7 +91,7 @@ schema.
The connection name has to be referenced in both `consumer` and `producer` configurations
using the `connectionName` field.

#### Consumer Configuration
### Consumer

The `consumer` field configures the input stream source. Currently supports:

Expand Down Expand Up @@ -123,7 +121,7 @@ The `consumer` field configures the input stream source. Currently supports:
}
```

#### Producer Configuration
### Producer

The `producer` field configures the output stream destination. Currently supports:

Expand All @@ -133,23 +131,29 @@ The `producer` field configures the output stream destination. Currently support
- `connectionName`: reference to a connection defined in the `connections` field
- `config`: additional [`librdkafka`](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html)
producer configuration properties (client ID, compression settings, etc.)
- `partitionerSettings` _(optional)_: controls which partitions the producer may select when sending messages.
When not specified, all available partitions are used (equivalent to `"all"`).

**Example:**
```json
{
"producer": {
"type": "kafka",
"topic": "<OUTPUT_TOPIC>",
"connectionName": "kafka",
"config": {
"client.id": "<CLIENT_ID>",
"compression.type": "snappy"
}
}
}
```
This field accepts either a **selection mode** (string) or an **explicit list of partition indices** (array of integers):

**Selection modes:**

| Value | Description |
|-------|-------------|
| `"all"` _(default)_ | All available partitions. Reverts to the topic default settings and, if configured, allows the use of a custom `partitioner` librdkafka option. |
| `"one"` | Always targets partition `0`. |
| `"lowerHalf"` | Lower half of available partitions, count rounded up (at least 1). Equivalent to `consistent_random` using CRC32 hasher on the lower half. |
| `"upperHalf"` | Upper half of available partitions, count rounded down (at least 1). Equivalent to `consistent_random` using CRC32 hasher on the upper half. |

**Explicit partition list:**

#### Processor Configuration
An array of one or more partition indices (e.g. `[0, 1, 2]`) that the producer may target.
Partitions not present in the topic are silently ignored at runtime.
This is equivalent to applying `consistent_random` using CRC32 hasher on the specified subset of partitions.

For usage examples and details on the partition selection algorithm, refer to the [Custom Partitioner](/products/fast_data_v2/stream_processor/30_Usage.md#custom-partitioner) section in the Usage page.

### Processor

The `processor` field configures the processing engine. Currently supports:

Expand All @@ -170,7 +174,7 @@ The `processor` field configures the processing engine. Currently supports:
in the for of a [kafka producer](#producer-configuration) configuration object, which
defines how to connect to the DLQ topic.

#### Caches Configuration (Optional)
#### Caches (Optional)

The `caches` field is an optional object that defines named caches for use within
processing functions. In case no cache is defined, the _Stateful Stream Processing_
Expand All @@ -179,15 +183,15 @@ use case cannot be supported.
Each cache can be:

- **MongoDB Cache** (`type: "mongodb"`): Persistent cache backed by MongoDB
- `url`: MongoDB connection string (supports secrets)
- `connectionName`: reference to a MongoDB connection defined in the `connections` field
- `collection`: collection name for cache storage
- `database`: database name (optional)
- `appName`: application name for MongoDB connection (optional - useful to track
which program is carrying out queries on the database)
- **In-Memory Cache** (`type: "in-memory"`): Simple in-memory key-value storage.
Use only when sharing the stream state across different service
instances is not necessary. However, when it is possible use a sharable stream state.
- **Farm Data Cache** (`type: "farmdata"`): Cache backed by Farm Data Service.
- **Farm Data Cache** (`type: "farm-data"`): Cache backed by Farm Data Service.
- `url`: Farm Data Service endpoint URL
- `head`: Head collection for aggregation
- `http2`: set http2_prior_knowledge to true when connecting to an http2 enabled endpoint (default: false)
Expand All @@ -203,15 +207,12 @@ Each cache can be:
For more details on how to interact with the caches, please read the
[dedicated section](/products/fast_data_v2/stream_processor/30_Usage.md#cache-access-) in the Usage page.

#### Control Plane Configuration (Optional)

The `controlPlane` field is optional and configures integration with the Control Plane Operator:
### Control Plane Support (Optional)

- `grpcAddress`: gRPC server address for service feedback events
- `feedbackInterval`: interval between feedback events in milliseconds (default: `3000ms`)
- `resumeAfterMs`: delay before processing when control plane connection fails (optional)
The `controlPlane` field is optional and configures integration with the Control Plane.
For more details on how to configure it, please refer to the [Workloads Configuration](/products/fast_data_v2/runtime_management/application_configuration.md#workloads-configuration) section of the Runtime Management documentation.

#### Secret Management
### Secret Management

The schema supports flexible secret management through the `Secret` type, which can be:

Expand All @@ -237,12 +238,6 @@ in the `index.js` file.
For more details on how the user-defined processor function can be written
and it how works, please read the [Usage](/products/fast_data_v2/stream_processor/30_Usage.md) page.

## Control Plane Support

The service implements the interface for connecting towards a Control-Plane Operator.
However, complete [Runtime Management](/products/fast_data/runtime_management/overview.mdx) support, that is with Control Plane UI
and a central Control Plane instance will be added in the future.

## Recommended Kafka Configuration

When configuring Kafka consumer, it is advised to set appropriate values for
Expand Down
85 changes: 81 additions & 4 deletions docs/products/fast_data_v2/stream_processor/30_Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ them quickly.
| Cache Type | Get | Set | Update | Delete | Purpose |
|------------|:---:|:---:|:------:|:------:|---------|
| `mongodb` | ✔️ | ✔️ | ✔️ | ✔️ | Full stateful stream processing with versioning |
| `farmdata` | ✔️ | ❌ | ❌ | ❌ | Read-only access to Farm Data service |
| `farm-data` | ✔️ | ❌ | ❌ | ❌ | Read-only access to Farm Data service |
| `http-rest`| ✔️ | ❌ | ❌ | ❌ | Read-only access to external REST APIs |

All cache methods are **asynchronous operations**. When a cache type doesn't support
Expand All @@ -481,10 +481,18 @@ stream processing requirements.

```json
{
"connections": {
"mongo": {
"type": "mongodb",
"config": {
"url": "mongodb://localhost:27017/fast-data"
}
}
},
"caches": {
"customer-cache": {
"type": "mongodb",
"url": "mongodb://localhost:27017/fast-data",
"connectionName": "mongo",
"appName": "eu.miaplatform.fastdata.stream-processor",
"database": "fast-data",
"collection": "stream-processor-state"
Expand Down Expand Up @@ -581,7 +589,7 @@ use the Kafka key with the FarmData cache to retrieve the full content.
{
"caches": {
"farmdata-cache": {
"type": "farmdata",
"type": "farm-data",
"url": "http://farm-data-service:3000",
"head": "aggregations",
"http2": false
Expand Down Expand Up @@ -833,7 +841,7 @@ based on their content or type, the recommended pattern is to deploy **multiple
instances**, each subscribing to the same source topic with distinct consumer groups.

Each Stream Processor filters and processes only the messages relevant to its specific Single View,
discarding all others using the filtering capabilities described in the [Filtering section](#filtering-).
discarding all others using the filtering capabilities described in the [Filtering section](#filtering).

```mermaid
graph LR
Expand Down Expand Up @@ -871,3 +879,72 @@ When implementing the event routing pattern, consider the following recommendati
group ID so that all instances receive every message from the source topic;
- **Efficient Filtering**: Place filtering logic at the beginning of your processing
function to minimize unnecessary computation on irrelevant events

## Custom Partitioner

The _Stream Processor_ producer supports a **custom partition selection strategy** via the `partitionerSettings` field in the producer configuration.
This allows fine-grained control over which partitions of the output topic receive messages, which is particularly useful in scenarios where multiple service replicas need to target non-overlapping sets of partitions to avoid duplicated processing or to enforce ordering guarantees.

For the full list of accepted values and the corresponding JSON schema, refer to the [`partitionerSettings` field description](/products/fast_data_v2/stream_processor/20_Configuration.mdx#producer-configuration) in the Configuration page.

### Partition Selection Algorithm

When `partitionerSettings` is set to any value other than `"all"`, the producer replaces the default librdkafka partitioner with a **CRC32-based** algorithm that operates exclusively on the _usable_ partitions determined by the selected mode:

- **Non-empty key** → `CRC32(key) % number_of_usable_partitions`: the same key is always routed to the same partition, preserving ordering by key within the selected partition set;
- **Empty or null key** → a partition is chosen at random among the usable ones.

This matches the behavior of the `consistent_random` strategy in librdkafka, restricted to the configured subset of partitions.

:::warning

When using `"lowerHalf"`, `"upperHalf"`, or an explicit partition list, ensure that all referenced partitions actually exist in the target topic.
Partition indices that fall outside the topic's partition range are **silently ignored** at runtime. If the resulting usable set is empty, the message will not be delivered.

:::

### Available Modes

| Value | Usable partitions | Notes |
|-------|-------------------|-------|
| `"all"` _(default)_ | All topic partitions | No custom partitioner is applied; uses the librdkafka default (or the `partitioner` config option if set). |
| `"one"` | Partition `0` only | Always targets the first partition regardless of key. |
| `"lowerHalf"` | Partitions `0 … ⌈N/2⌉ − 1` | Lower half, rounded up. At least one partition is always selected. |
| `"upperHalf"` | Partitions `⌊N/2⌋ … N − 1` | Upper half, rounded down. At least one partition is always selected. |
| `[p0, p1, …]` | Explicit list of indices | Only the listed partitions are targeted. Must contain at least one entry. |

### Configuration Examples

**Using a selection mode:**

```json
{
"producer": {
"type": "kafka",
"topic": "<OUTPUT_TOPIC>",
"connectionName": "kafka",
"config": {
"client.id": "<CLIENT_ID>",
"compression.type": "snappy"
},
"partitionerSettings": "lowerHalf"
}
}
```

**Using an explicit partition list:**

```json
{
"producer": {
"type": "kafka",
"topic": "<OUTPUT_TOPIC>",
"connectionName": "kafka",
"config": {
"client.id": "<CLIENT_ID>",
"compression.type": "snappy"
},
"partitionerSettings": [0, 1, 2]
}
}
```
12 changes: 12 additions & 0 deletions docs/products/fast_data_v2/stream_processor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.3] - 2026-03-10

### Added

- Added custom partitioner support for Kafka producer

## [0.6.2] - 2026-02-27

### Fixed

- Handle the case of a promise that never resolves inside sandbox

## [0.6.1] - 2026-02-19

### Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"connections": {
"kafka-main": {
"type": "kafka",
"config": {
"bootstrap.servers": "localhost:9092"
}
}
},
"consumer": {
"type": "kafka",
"connectionName": "kafka-main",
"topic": "input-topic",
"config": {
"group.id": "stream-processor-consumer-group",
"auto.offset.reset": "earliest"
}
},
"producer": {
"type": "kafka",
"connectionName": "kafka-main",
"topic": "output-topic",
"partitionerSettings": "all"
},
"processor": {
"type": "javascript"
}
}
Loading
Loading