From addf4009de61afb68de809ce4a1171fbe517b905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Sun, 19 Apr 2026 11:23:42 +0200 Subject: [PATCH 1/3] feat(deploy): add kafscale-broker-standalone Helm chart (smoke testing) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Minimal chart that deploys a single kafscale-broker Pod pointing at external etcd + S3 (MinIO or cloud). Intended for quick smoke tests and blueprint convergence on KIND — NOT a replacement for the full operator-based chart at deploy/helm/kafscale/. Used by scalytics-all-in-one bp-001 Ops Foundation smoke suite: COMP-kafscale-01 (pod Ready) + COMP-kafscale-02 (Kafka TCP reachable). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../kafscale-broker-standalone/Chart.yaml | 6 +++ .../templates/broker.yaml | 52 +++++++++++++++++++ .../kafscale-broker-standalone/values.yaml | 17 ++++++ 3 files changed, 75 insertions(+) create mode 100644 deploy/helm/kafscale-broker-standalone/Chart.yaml create mode 100644 deploy/helm/kafscale-broker-standalone/templates/broker.yaml create mode 100644 deploy/helm/kafscale-broker-standalone/values.yaml diff --git a/deploy/helm/kafscale-broker-standalone/Chart.yaml b/deploy/helm/kafscale-broker-standalone/Chart.yaml new file mode 100644 index 0000000..b5dc1b5 --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: kafscale-broker-standalone +description: Minimal standalone KafScale broker (etcd + MinIO external). Smoke chart only. +type: application +version: 0.1.0 +appVersion: "v1.5.0" diff --git a/deploy/helm/kafscale-broker-standalone/templates/broker.yaml b/deploy/helm/kafscale-broker-standalone/templates/broker.yaml new file mode 100644 index 0000000..9aa0f5e --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/templates/broker.yaml @@ -0,0 +1,52 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafscale-broker + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: kafscale-broker + app.kubernetes.io/part-of: ops-foundation +spec: + replicas: 1 + selector: { matchLabels: { app.kubernetes.io/name: kafscale-broker } } + template: + metadata: + labels: + app.kubernetes.io/name: kafscale-broker + app.kubernetes.io/part-of: ops-foundation + spec: + enableServiceLinks: false + containers: + - name: broker + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + env: + - { name: KAFSCALE_BROKER_ID, value: "0" } + - { name: KAFSCALE_BROKER_ADDR, value: ":9092" } + - { name: KAFSCALE_BROKER_HOST, value: "kafscale-broker" } + - { name: KAFSCALE_BROKER_PORT, value: "9092" } + - { name: KAFSCALE_BROKER_ETCD_ENDPOINTS, value: "{{ .Values.etcdEndpoints }}" } + - { name: KAFSCALE_BROKER_DATA_DIR, value: "/tmp/data" } + - { name: KAFSCALE_BROKER_LOG_LEVEL, value: "info" } + - { name: KAFSCALE_S3_BUCKET, value: "{{ .Values.s3Bucket }}" } + - { name: KAFSCALE_S3_REGION, value: "us-east-1" } + - { name: KAFSCALE_S3_ENDPOINT, value: "{{ .Values.s3Endpoint }}" } + - { name: KAFSCALE_S3_ACCESS_KEY, value: "{{ .Values.s3AccessKey }}" } + - { name: KAFSCALE_S3_SECRET_KEY, value: "{{ .Values.s3SecretKey }}" } + - { name: KAFSCALE_S3_PATH_STYLE, value: "true" } + ports: + - { name: kafka, containerPort: 9092 } +--- +apiVersion: v1 +kind: Service +metadata: + name: kafscale-broker + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: kafscale-broker +spec: + type: {{ .Values.service.type }} + ports: + - { name: kafka, port: {{ .Values.service.port }}, targetPort: 9092 } + selector: + app.kubernetes.io/name: kafscale-broker diff --git a/deploy/helm/kafscale-broker-standalone/values.yaml b/deploy/helm/kafscale-broker-standalone/values.yaml new file mode 100644 index 0000000..635f164 --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/values.yaml @@ -0,0 +1,17 @@ +image: + repository: ghcr.io/kafscale/kafscale-broker + tag: dev + pullPolicy: IfNotPresent + +etcdEndpoints: "http://etcd:2379" +s3Endpoint: "http://minio:9000" +s3Bucket: "kafscale" +s3AccessKey: "scalytics" +s3SecretKey: "scalytics" + +service: + type: ClusterIP + port: 9092 + +labels: + app.kubernetes.io/part-of: ops-foundation From 914da96d0d198306b543d4ab3b37393927de0916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Sun, 19 Apr 2026 14:19:05 +0200 Subject: [PATCH 2/3] fix(broker): advertise LIST_GROUPS versions 0-5 (was 5-5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves OPS-005 item #2. The Java admin client (kafka-consumer-groups, AdminClient.listConsumerGroups, Schema Registry) negotiates LIST_GROUPS in range [0,4]. Advertising a narrow 5-5 window caused: UnsupportedVersionException: Error listing groups ... The broker does not support LIST_GROUPS with version in range [0,4]. The supported range is [5,5]. The underlying h.coordinator.ListGroups handler is version-agnostic; the encoder handles v0 just as well as v5. The fix is one line — widen the advertised range. Verified: kafka-consumer-groups --bootstrap-server kafscale-broker:9092 --list now exits 0 cleanly (from UnsupportedVersionException pre-fix). The remaining OPS-005 items (INIT_PRODUCER_ID, transactional APIs, Schema Registry NPE on verifySchemaTopic) are substantive broker-engineering work and are not addressed here. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/broker/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index c1ba108..070a0bb 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -2711,7 +2711,10 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { {key: protocol.APIKeyOffsetCommit, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyOffsetFetch, minVersion: 5, maxVersion: 5}, {key: protocol.APIKeyDescribeGroups, minVersion: 5, maxVersion: 5}, - {key: protocol.APIKeyListGroups, minVersion: 5, maxVersion: 5}, + // OPS-005: Java admin-client negotiates LIST_GROUPS in range [0,4]. + // Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5 + // — the underlying coordinator handler is version-agnostic. + {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, {key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1}, From 4f5666afbf3a62e5306a93dab20d31d05eb0db2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Sun, 19 Apr 2026 15:11:42 +0200 Subject: [PATCH 3/3] feat(broker): INIT_PRODUCER_ID stub handler (OPS-005 #1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a minimum-viable implementation of the Kafka INIT_PRODUCER_ID API (API key 22). Allocates a monotonically-increasing producer ID with epoch 0; does not yet track sequence numbers or deduplicate on replay. Sufficient to unblock Java AdminClient default producers, franz-go idempotent producers, and Schema Registry's producer-init probe. Changes: - pkg/protocol/api.go: add APIKeyInitProducerID = 22 - cmd/broker/main.go: * handler gains nextProducerID int64 (atomic allocator) * dispatch case for *kmsg.InitProducerIDRequest returns pid + epoch=0 * apiVersions: InitProducerID moves from unsupported to {0, 4} * import sync/atomic Verified: - `kafka-console-producer --producer-property enable.idempotence=true` now succeeds (was: UnsupportedVersionException). - kaf-mirror (franz-go) replicates primary→standby end-to-end: PRIMARY offsets = STANDBY offsets, measured lag <1s. - SCEN-bp002-06_Replication scenario test flipped SKIP → PASS. Known limitations (production correctness gap, tracked in OPS-005): - no sequence-number tracking: duplicate-on-retry semantics not enforced - no epoch management: fencing of stale producers on rebalance not implemented - PID allocator is process-local, not persisted across broker restart Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/broker/main.go | 27 ++++++++++++++++++++++++++- pkg/protocol/api.go | 1 + 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 070a0bb..a58aaf5 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -60,6 +61,11 @@ const ( type handler struct { apiVersions []kmsg.ApiVersionsResponseApiKey + // nextProducerID is a monotonically-increasing allocator for + // InitProducerID responses (OPS-005 #1 stub). Accessed via sync/atomic. + // Replace with a proper persistent allocator when full idempotent-producer + // dedup lands. + nextProducerID int64 store metadata.Store s3 storage.S3Client cache *cache.SegmentCache @@ -206,6 +212,20 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.ProduceRequest: return h.handleProduce(ctx, header, req.(*kmsg.ProduceRequest)) + case *kmsg.InitProducerIDRequest: + // OPS-005 #1: minimum-viable INIT_PRODUCER_ID stub. Allocates a + // monotonically-increasing producer ID with epoch 0. Does NOT track + // sequence numbers or deduplicate (that work is in scope for a later + // broker-engineering sprint). This is sufficient to unblock idempotent + // producers for development + bp-002 BDR convergence. For production + // correctness, full dedup-on-replay is still required. + pid := atomic.AddInt64(&h.nextProducerID, 1) + resp := kmsg.NewPtrInitProducerIDResponse() + resp.ErrorCode = protocol.NONE + resp.ProducerID = pid + resp.ProducerEpoch = 0 + resp.ThrottleMillis = 0 + return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.FetchRequest: return h.handleFetch(ctx, header, req.(*kmsg.FetchRequest)) case *kmsg.FindCoordinatorRequest: @@ -2715,6 +2735,11 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { // Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5 // — the underlying coordinator handler is version-agnostic. {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, + // OPS-005 #1: idempotent-producer init. Handler is a stub that + // allocates a monotonically-increasing producer ID; sequence-number + // dedup is NOT yet implemented. Sufficient to unblock franz-go and + // Java default producers; production correctness gap tracked. + {key: protocol.APIKeyInitProducerID, minVersion: 0, maxVersion: 4}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, {key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1}, @@ -2725,7 +2750,7 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { } unsupported := []int16{ 4, 5, 6, 7, - 21, 22, + 21, // 22 (InitProducerID) moved to supported — OPS-005 #1 stub handler 24, 25, 26, } diff --git a/pkg/protocol/api.go b/pkg/protocol/api.go index b237578..f4d81af 100644 --- a/pkg/protocol/api.go +++ b/pkg/protocol/api.go @@ -33,6 +33,7 @@ const ( APIKeyApiVersion int16 = 18 APIKeyCreateTopics int16 = 19 APIKeyDeleteTopics int16 = 20 + APIKeyInitProducerID int16 = 22 // OPS-005: idempotent-producer init (stub handler) APIKeyOffsetForLeaderEpoch int16 = 23 APIKeyDescribeConfigs int16 = 32 APIKeyAlterConfigs int16 = 33