From 1e7d978bc6dd9cdc4de7f4be3cda9c7b13471549 Mon Sep 17 00:00:00 2001 From: Jonathan Chaput Date: Wed, 3 Jun 2026 15:58:54 -0400 Subject: [PATCH 1/6] jira: add input component for streaming issues, comments, and changelog Adds a `jira` input that streams Jira REST API events into Connect pipelines via JQL with cursor-based incremental polling. The cursor (max issue.updated timestamp) is persisted to a cache resource so progress survives restarts. Available in both the Cloud and Self-Hosted distributions. Authenticates via API token (email + token). Supports resource={issues,comments,changelog}. The existing `jira` processor (enrichment lookups) is unchanged. Shared HTTP/auth setup is extracted into a new internal/impl/jira/jiraauth sub-package consumed by both the processor and the input. Limitations (v1): no OAuth, no worklogs resource, per-issue comment and changelog pagination is limited to the first response page (truncation logged and counted via the jira_input_child_truncated_total metric). --- CHANGELOG.md | 6 + config/examples/jira_input.yaml | 26 + .../modules/components/pages/inputs/jira.adoc | 840 +++++++++++++++++ internal/impl/jira/input_jira.go | 821 +++++++++++++++++ .../impl/jira/input_jira_integration_test.go | 96 ++ internal/impl/jira/input_jira_test.go | 860 ++++++++++++++++++ internal/impl/jira/jiraauth/client.go | 77 ++ internal/impl/jira/jiraauth/client_test.go | 63 ++ internal/impl/jira/jirahttp/client.go | 7 + internal/impl/jira/processor_jira.go | 32 +- internal/plugins/info.csv | 1 + 11 files changed, 2800 insertions(+), 29 deletions(-) create mode 100644 config/examples/jira_input.yaml create mode 100644 docs/modules/components/pages/inputs/jira.adoc create mode 100644 internal/impl/jira/input_jira.go create mode 100644 internal/impl/jira/input_jira_integration_test.go create mode 100644 internal/impl/jira/input_jira_test.go create mode 100644 internal/impl/jira/jiraauth/client.go create mode 100644 internal/impl/jira/jiraauth/client_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d7129bd8dc..c3a39c817c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Changelog All notable changes to this project will be documented in this file. +## Unreleased + +### Added + +- jira: New `jira` input component for streaming Jira issues, comments, or changelog entries via JQL with cursor-based incremental polling. + ## 4.94.1 - 2026-05-29 ### Fixed diff --git a/config/examples/jira_input.yaml b/config/examples/jira_input.yaml new file mode 100644 index 0000000000..4cd1d8519f --- /dev/null +++ b/config/examples/jira_input.yaml @@ -0,0 +1,26 @@ +# Example pipeline: stream Jira issues into stdout. +# +# Set environment variables: +# JIRA_BASE_URL=https://your-domain.atlassian.net +# JIRA_EMAIL=you@example.com +# JIRA_API_TOKEN= + +input: + jira: + base_url: ${JIRA_BASE_URL:https://your-domain.atlassian.net} + auth: + email: ${JIRA_EMAIL:you@example.com} + api_token: ${JIRA_API_TOKEN:xxx} + resource: issues + jql: 'project = ENG' + poll_interval: 60s + cursor: + cache: jira_state + +cache_resources: + - label: jira_state + file: + directory: /var/lib/redpanda-connect/jira + +output: + stdout: {} diff --git a/docs/modules/components/pages/inputs/jira.adoc b/docs/modules/components/pages/inputs/jira.adoc new file mode 100644 index 0000000000..aafa2fee29 --- /dev/null +++ b/docs/modules/components/pages/inputs/jira.adoc @@ -0,0 +1,840 @@ += jira +:type: input +:status: stable +:categories: ["Services"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2026 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Streams Jira issues, comments, or changelog entries via JQL with incremental polling. + +Introduced in version 4.95.0. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +input: + label: "" + jira: + auth: + email: "" # No default (required) + api_token: "" # No default (required) + resource: issues + jql: "" + fields: + - '*all' + expand: [] + page_size: 50 + poll_interval: 60s + cursor: + cache: "" # No default (required) + key: "" + overlap: 60s + auto_replay_nacks: true + base_url: "" # No default (required) + timeout: 5s +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +input: + label: "" + jira: + auth: + email: "" # No default (required) + api_token: "" # No default (required) + resource: issues + jql: "" + fields: + - '*all' + expand: [] + page_size: 50 + poll_interval: 60s + cursor: + cache: "" # No default (required) + key: "" + overlap: 60s + auto_replay_nacks: true + base_url: "" # No default (required) + timeout: 5s + tls: + enabled: false + skip_cert_verify: false + enable_renegotiation: false + root_cas: "" + root_cas_file: "" + client_certs: [] + proxy_url: "" + disable_http2: false + tps_limit: 0 + tps_burst: 1 + backoff: + initial_interval: 1s + max_interval: 30s + max_retries: 3 + tcp: + connect_timeout: 0s + keep_alive: + idle: 15s + interval: 15s + count: 9 + tcp_user_timeout: 0s + http: + max_idle_conns: 100 + max_idle_conns_per_host: 0 + max_conns_per_host: 64 + idle_conn_timeout: 1m30s + tls_handshake_timeout: 10s + expect_continue_timeout: 1s + response_header_timeout: 0s + disable_keep_alives: false + disable_compression: false + max_response_header_bytes: 1048576 + max_response_body_bytes: 10485760 + write_buffer_size: 4096 + read_buffer_size: 4096 + h2: + strict_max_concurrent_requests: false + max_decoder_header_table_size: 4096 + max_encoder_header_table_size: 4096 + max_read_frame_size: 16384 + max_receive_buffer_per_connection: 1048576 + max_receive_buffer_per_stream: 1048576 + send_ping_timeout: 0s + ping_timeout: 15s + write_byte_timeout: 0s + access_log_level: "" + access_log_body_limit: 0 +``` + +-- +====== + +Periodically queries Jira's REST API using a JQL filter and emits one message per resource. The cursor (max issue `updated` timestamp) is persisted via the configured cache resource so progress survives restarts. + +Authentication uses API token (email + token) basic auth. + +Each message body is the raw JSON of the resource. Metadata fields: + +- `jira_id` - issue key (issues) / comment ID / changelog history ID +- `jira_issue_key` - parent issue key (omitted for resource=issues) +- `jira_project` - project key +- `jira_updated` - RFC 3339 timestamp of the resource +- `jira_event_type` - "issue" / "comment" / "changelog" +- `jira_self` - Jira API URL of the resource + +Limitations (v1): OAuth and the worklogs resource are not yet supported. For resource=comments and resource=changelog, only the first page of child resources (up to ~50 comments or ~100 changelog entries per issue update) is emitted; a WARN is logged when truncation is detected. Use a downstream Jira processor to fetch the full child set if your issues exceed this limit. + +== Fields + +=== `auth` + +API token authentication. + + +*Type*: `object` + + +=== `auth.email` + +Email or username of the Jira account. + + +*Type*: `string` + + +=== `auth.api_token` + +Jira API token. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + + +=== `resource` + +Which Jira resource to emit. + + +*Type*: `string` + +*Default*: `"issues"` + +Options: +`issues` +, `comments` +, `changelog` +. + +=== `jql` + +Jira JQL filter. The input appends an `updated >= cursor` predicate and `ORDER BY updated ASC, key ASC`. Empty means all issues visible to the principal. + + +*Type*: `string` + +*Default*: `""` + +=== `fields` + +Jira `fields` query parameter - narrow this for throughput. + + +*Type*: `array` + +*Default*: `["*all"]` + +=== `expand` + +Jira `expand` query parameter. The input automatically adds `changelog` when resource=changelog. + + +*Type*: `array` + +*Default*: `[]` + +=== `page_size` + +Issues per Jira page (Jira max 100). + + +*Type*: `int` + +*Default*: `50` + +=== `poll_interval` + +Time to wait between polls once the input has caught up. Minimum 10s. + + +*Type*: `string` + +*Default*: `"60s"` + +=== `cursor` + +Cursor checkpoint storage. + + +*Type*: `object` + + +=== `cursor.cache` + +Name of a cache resource used to persist the cursor. + + +*Type*: `string` + + +=== `cursor.key` + +Cache key. Defaults to `redpanda_connect_jira_input_`. + + +*Type*: `string` + +*Default*: `""` + +=== `cursor.overlap` + +Widens `updated >= cursor - overlap` to absorb minute-boundary effects. Jira JQL's `updated` operator has minute precision, so this should be set to at least 1m to have an effect. + + +*Type*: `string` + +*Default*: `"60s"` + +=== `auto_replay_nacks` + +Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. + + +*Type*: `bool` + +*Default*: `true` + +=== `base_url` + +Base URL of the target service (e.g., https://api.example.com). TLS is enabled automatically for https URLs. + + +*Type*: `string` + + +=== `timeout` + +HTTP request timeout. + + +*Type*: `string` + +*Default*: `"5s"` + +=== `tls` + +Custom TLS settings can be used to override system defaults. + + +*Type*: `object` + + +=== `tls.enabled` + +Whether custom TLS settings are enabled. + + +*Type*: `bool` + +*Default*: `false` + +=== `tls.skip_cert_verify` + +Whether to skip server side certificate verification. + + +*Type*: `bool` + +*Default*: `false` + +=== `tls.enable_renegotiation` + +Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message `local error: tls: no renegotiation`. + + +*Type*: `bool` + +*Default*: `false` +Requires version 3.45.0 or newer + +=== `tls.root_cas` + +An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +root_cas: |- + -----BEGIN CERTIFICATE----- + ... + -----END CERTIFICATE----- +``` + +=== `tls.root_cas_file` + +An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +root_cas_file: ./root_cas.pem +``` + +=== `tls.client_certs` + +A list of client certificates to use. For each certificate either the fields `cert` and `key`, or `cert_file` and `key_file` should be specified, but not both. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +client_certs: + - cert: foo + key: bar + +client_certs: + - cert_file: ./example.pem + key_file: ./example.key +``` + +=== `tls.client_certs[].cert` + +A plain text certificate to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].key` + +A plain text certificate key to use. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].cert_file` + +The path of a certificate to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].key_file` + +The path of a certificate key to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].password` + +A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format. + +Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +password: foo + +password: ${KEY_PASSWORD} +``` + +=== `proxy_url` + +HTTP proxy URL. Empty string disables proxying. + + +*Type*: `string` + +*Default*: `""` + +=== `disable_http2` + +Disable HTTP/2 and force HTTP/1.1. + + +*Type*: `bool` + +*Default*: `false` + +=== `tps_limit` + +Rate limit in requests per second. 0 disables rate limiting. + + +*Type*: `float` + +*Default*: `0` + +=== `tps_burst` + +Maximum burst size for rate limiting. + + +*Type*: `int` + +*Default*: `1` + +=== `backoff` + +Adaptive backoff configuration for 429 (Too Many Requests) responses. Always active. + + +*Type*: `object` + + +=== `backoff.initial_interval` + +Initial interval between retries on 429 responses. + + +*Type*: `string` + +*Default*: `"1s"` + +=== `backoff.max_interval` + +Maximum interval between retries on 429 responses. + + +*Type*: `string` + +*Default*: `"30s"` + +=== `backoff.max_retries` + +Maximum number of retries on 429 responses. + + +*Type*: `int` + +*Default*: `3` + +=== `tcp` + +TCP socket configuration. + + +*Type*: `object` + + +=== `tcp.connect_timeout` + +Maximum amount of time a dial will wait for a connect to complete. Zero disables. + + +*Type*: `string` + +*Default*: `"0s"` + +=== `tcp.keep_alive` + +TCP keep-alive probe configuration. + + +*Type*: `object` + + +=== `tcp.keep_alive.idle` + +Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes. + + +*Type*: `string` + +*Default*: `"15s"` + +=== `tcp.keep_alive.interval` + +Duration between keep-alive probes. Zero defaults to 15s. + + +*Type*: `string` + +*Default*: `"15s"` + +=== `tcp.keep_alive.count` + +Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9. + + +*Type*: `int` + +*Default*: `9` + +=== `tcp.tcp_user_timeout` + +Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables. + + +*Type*: `string` + +*Default*: `"0s"` + +=== `http` + +HTTP transport settings controlling connection pooling, timeouts, and HTTP/2. + + +*Type*: `object` + + +=== `http.max_idle_conns` + +Maximum total number of idle (keep-alive) connections across all hosts. 0 means unlimited. + + +*Type*: `int` + +*Default*: `100` + +=== `http.max_idle_conns_per_host` + +Maximum idle connections to keep per host. 0 (the default) uses GOMAXPROCS+1. + + +*Type*: `int` + +*Default*: `0` + +=== `http.max_conns_per_host` + +Maximum total connections (active + idle) per host. 0 means unlimited. + + +*Type*: `int` + +*Default*: `64` + +=== `http.idle_conn_timeout` + +How long an idle connection remains in the pool before being closed. 0 disables the timeout. + + +*Type*: `string` + +*Default*: `"1m30s"` + +=== `http.tls_handshake_timeout` + +Maximum time to wait for a TLS handshake to complete. 0 disables the timeout. + + +*Type*: `string` + +*Default*: `"10s"` + +=== `http.expect_continue_timeout` + +Maximum time to wait for a server's 100-continue response before sending the body. 0 means the body is sent immediately. + + +*Type*: `string` + +*Default*: `"1s"` + +=== `http.response_header_timeout` + +Maximum time to wait for response headers after writing the full request. 0 disables the timeout. + + +*Type*: `string` + +*Default*: `"0s"` + +=== `http.disable_keep_alives` + +Disable HTTP keep-alive connections; each request uses a new connection. + + +*Type*: `bool` + +*Default*: `false` + +=== `http.disable_compression` + +Disable automatic decompression of gzip responses. + + +*Type*: `bool` + +*Default*: `false` + +=== `http.max_response_header_bytes` + +Maximum bytes of response headers to allow. + + +*Type*: `int` + +*Default*: `1048576` + +=== `http.max_response_body_bytes` + +Maximum bytes of response body the client will read. The response body is wrapped with a limit reader; reads beyond this cap return EOF. 0 disables the limit. + + +*Type*: `int` + +*Default*: `10485760` + +=== `http.write_buffer_size` + +Size in bytes of the per-connection write buffer. + + +*Type*: `int` + +*Default*: `4096` + +=== `http.read_buffer_size` + +Size in bytes of the per-connection read buffer. + + +*Type*: `int` + +*Default*: `4096` + +=== `http.h2` + +HTTP/2-specific transport settings. Only applied when HTTP/2 is enabled. + + +*Type*: `object` + + +=== `http.h2.strict_max_concurrent_requests` + +When true, new requests block when a connection's concurrency limit is reached instead of opening a new connection. + + +*Type*: `bool` + +*Default*: `false` + +=== `http.h2.max_decoder_header_table_size` + +Upper limit in bytes for the HPACK header table used to decode headers from the peer. Must be less than 4 MiB. + + +*Type*: `int` + +*Default*: `4096` + +=== `http.h2.max_encoder_header_table_size` + +Upper limit in bytes for the HPACK header table used to encode headers sent to the peer. Must be less than 4 MiB. + + +*Type*: `int` + +*Default*: `4096` + +=== `http.h2.max_read_frame_size` + +Largest HTTP/2 frame this endpoint will read. Valid range: 16 KiB to 16 MiB. + + +*Type*: `int` + +*Default*: `16384` + +=== `http.h2.max_receive_buffer_per_connection` + +Maximum flow-control window size in bytes for data received on a connection. Must be at least 64 KiB and less than 4 MiB. + + +*Type*: `int` + +*Default*: `1048576` + +=== `http.h2.max_receive_buffer_per_stream` + +Maximum flow-control window size in bytes for data received on a single stream. Must be less than 4 MiB. + + +*Type*: `int` + +*Default*: `1048576` + +=== `http.h2.send_ping_timeout` + +Idle timeout after which a PING frame is sent to verify connection health. 0 disables health checks. + + +*Type*: `string` + +*Default*: `"0s"` + +=== `http.h2.ping_timeout` + +Timeout waiting for a PING response before closing the connection. + + +*Type*: `string` + +*Default*: `"15s"` + +=== `http.h2.write_byte_timeout` + +Timeout for writing data to a connection. The timer resets whenever bytes are written. 0 disables the timeout. + + +*Type*: `string` + +*Default*: `"0s"` + +=== `access_log_level` + +Log level for HTTP request/response logging. Empty disables logging. + + +*Type*: `string` + +*Default*: `""` + +Options: +`` +, `TRACE` +, `DEBUG` +, `INFO` +, `WARN` +, `ERROR` +. + +=== `access_log_body_limit` + +Maximum bytes of request/response body to include in logs. 0 to skip body logging. + + +*Type*: `int` + +*Default*: `0` + + diff --git a/internal/impl/jira/input_jira.go b/internal/impl/jira/input_jira.go new file mode 100644 index 0000000000..c484a5a116 --- /dev/null +++ b/internal/impl/jira/input_jira.go @@ -0,0 +1,821 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jira + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "slices" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/httpclient" + "github.com/redpanda-data/connect/v4/internal/impl/jira/jiraauth" + "github.com/redpanda-data/connect/v4/internal/impl/jira/jirahttp" +) + +// cursorSchemaVersion is the on-disk format version stamped into the cursor +// JSON; it is consumed by writeCursor when the input advances the cursor. +const cursorSchemaVersion = 1 + +const ( + resourceIssues = "issues" + resourceComments = "comments" + resourceChangelog = "changelog" +) + +var validResources = []string{resourceIssues, resourceComments, resourceChangelog} + +// cursor is the persisted incremental-fetch checkpoint for the jira input. +// It stores the max issue.updated timestamp seen in the last fully-acked page. +// Unknown JSON fields are ignored on decode for forward compatibility. +type cursor struct { + Updated time.Time `json:"updated"` + Version int `json:"v"` +} + +func newJiraInputConfigSpec() *service.ConfigSpec { + spec := service.NewConfigSpec(). + Categories("Services"). + Version("4.95.0"). + Summary("Streams Jira issues, comments, or changelog entries via JQL with incremental polling."). + Description(`Periodically queries Jira's REST API using a JQL filter and emits one message per resource. The cursor (max issue ` + "`updated`" + ` timestamp) is persisted via the configured cache resource so progress survives restarts. + +Authentication uses API token (email + token) basic auth. + +Each message body is the raw JSON of the resource. Metadata fields: + +- ` + "`jira_id`" + ` - issue key (issues) / comment ID / changelog history ID +- ` + "`jira_issue_key`" + ` - parent issue key (omitted for resource=issues) +- ` + "`jira_project`" + ` - project key +- ` + "`jira_updated`" + ` - RFC 3339 timestamp of the resource +- ` + "`jira_event_type`" + ` - "issue" / "comment" / "changelog" +- ` + "`jira_self`" + ` - Jira API URL of the resource + +Limitations (v1): OAuth and the worklogs resource are not yet supported. For resource=comments and resource=changelog, only the first page of child resources (up to ~50 comments or ~100 changelog entries per issue update) is emitted; a WARN is logged when truncation is detected. Use a downstream Jira processor to fetch the full child set if your issues exceed this limit.`). + Field(service.NewObjectField("auth", + service.NewStringField("email"). + Description("Email or username of the Jira account."), + service.NewStringField("api_token"). + Description("Jira API token."). + Secret(), + ).Description("API token authentication.")). + Field(service.NewStringEnumField("resource", validResources...). + Description("Which Jira resource to emit."). + Default(resourceIssues)). + Field(service.NewStringField("jql"). + Description("Jira JQL filter. The input appends an `updated >= cursor` predicate and `ORDER BY updated ASC, key ASC`. Empty means all issues visible to the principal."). + Default("")). + Field(service.NewStringListField("fields"). + Description("Jira `fields` query parameter - narrow this for throughput."). + Default([]any{"*all"})). + Field(service.NewStringListField("expand"). + Description("Jira `expand` query parameter. The input automatically adds `changelog` when resource=changelog."). + Default([]any{})). + Field(service.NewIntField("page_size"). + Description("Issues per Jira page (Jira max 100)."). + Default(50)). + Field(service.NewDurationField("poll_interval"). + Description("Time to wait between polls once the input has caught up. Minimum 10s."). + Default("60s")). + Field(service.NewObjectField("cursor", + service.NewStringField("cache"). + Description("Name of a cache resource used to persist the cursor."), + service.NewStringField("key"). + Description("Cache key. Defaults to `redpanda_connect_jira_input_`."). + Default(""), + service.NewDurationField("overlap"). + Description("Widens `updated >= cursor - overlap` to absorb minute-boundary effects. Jira JQL's `updated` operator has minute precision, so this should be set to at least 1m to have an effect."). + Default("60s"), + ).Description("Cursor checkpoint storage.")). + Field(service.NewAutoRetryNacksToggleField()) + + spec.Fields(httpclient.FieldsWithBaseURL("")...) + return spec +} + +// inputCfg holds the parsed configuration for the jira input. +type inputCfg struct { + httpCfg *httpclient.Config + authEmail string + authAPIToken string + resource string + jql string + fields []string + expand []string + pageSize int + pollInterval time.Duration + + cacheName string + cacheKey string + cursorOverlap time.Duration +} + +func parseInputConfig(conf *service.ParsedConfig) (*inputCfg, error) { + httpCfg, err := httpclient.NewConfigFromParsed(conf) + if err != nil { + return nil, err + } + + email, err := conf.FieldString("auth", "email") + if err != nil { + return nil, err + } + if email == "" { + return nil, errors.New("auth.email must not be empty") + } + apiToken, err := conf.FieldString("auth", "api_token") + if err != nil { + return nil, err + } + if apiToken == "" { + return nil, errors.New("auth.api_token must not be empty") + } + + resource, err := conf.FieldString("resource") + if err != nil { + return nil, err + } + if !isValidResource(resource) { + if resource == "worklogs" { + return nil, fmt.Errorf("resource %q is not supported in v1: the worklogs resource is not yet implemented", resource) + } + return nil, fmt.Errorf("resource %q is not valid; expected one of %v", resource, validResources) + } + + jql, err := conf.FieldString("jql") + if err != nil { + return nil, err + } + fields, err := conf.FieldStringList("fields") + if err != nil { + return nil, err + } + expand, err := conf.FieldStringList("expand") + if err != nil { + return nil, err + } + pageSize, err := conf.FieldInt("page_size") + if err != nil { + return nil, err + } + if pageSize <= 0 || pageSize > 100 { + return nil, errors.New("page_size must be between 1 and 100") + } + pollInterval, err := conf.FieldDuration("poll_interval") + if err != nil { + return nil, err + } + if pollInterval < 10*time.Second { + return nil, errors.New("poll_interval must be at least 10s") + } + + cacheName, err := conf.FieldString("cursor", "cache") + if err != nil { + return nil, err + } + if cacheName == "" { + return nil, errors.New("cursor.cache must not be empty") + } + cacheKey, err := conf.FieldString("cursor", "key") + if err != nil { + return nil, err + } + if cacheKey == "" { + cacheKey = "redpanda_connect_jira_input_" + resource + } + overlap, err := conf.FieldDuration("cursor", "overlap") + if err != nil { + return nil, err + } + + return &inputCfg{ + httpCfg: &httpCfg, + authEmail: email, + authAPIToken: apiToken, + resource: resource, + jql: jql, + fields: fields, + expand: expand, + pageSize: pageSize, + pollInterval: pollInterval, + cacheName: cacheName, + cacheKey: cacheKey, + cursorOverlap: overlap, + }, nil +} + +func isValidResource(r string) bool { + return slices.Contains(validResources, r) +} + +// reader is the jira input implementation. +type reader struct { + cfg *inputCfg + mgr *service.Resources + log *service.Logger + client *jirahttp.Client + + // metrics. + cacheSetErrors *service.MetricCounter + // childTruncated counts how many per-issue child fetches returned fewer + // rows than the server-reported total. Labelled by resource ("comments" + // or "changelog") so operators can alert on either pipeline independently. + childTruncated *service.MetricCounter + + // runtime state populated by Connect. + curMu sync.RWMutex + cur cursor + connected atomic.Bool + page *pageState + nextToken string +} + +// currentCursor returns a copy of the current cursor under read lock. +func (r *reader) currentCursor() cursor { + r.curMu.RLock() + defer r.curMu.RUnlock() + return r.cur +} + +// setCursor replaces the current cursor under write lock. +func (r *reader) setCursor(c cursor) { + r.curMu.Lock() + defer r.curMu.Unlock() + r.cur = c +} + +// issuesPage is the subset of /rest/api/3/search/jql response we use. +type issuesPage struct { + Issues []json.RawMessage `json:"issues"` + NextPageToken string `json:"nextPageToken,omitempty"` +} + +// rawIssue holds the fields we need to derive metadata. +type rawIssue struct { + ID string `json:"id"` + Key string `json:"key"` + Self string `json:"self"` + Fields struct { + Project struct { + Key string `json:"key"` + } `json:"project"` + Updated jiraTime `json:"updated"` + } `json:"fields"` +} + +// jiraTime parses Jira's `2026-06-01T10:00:00.000+0000` format. +type jiraTime struct{ time.Time } + +func (j *jiraTime) UnmarshalJSON(b []byte) error { + s := strings.Trim(string(b), `"`) + if s == "" || s == "null" { + return nil + } + // Jira returns offset without colon ("+0000"); RFC3339 wants "+00:00". + for _, layout := range []string{"2006-01-02T15:04:05.999Z0700", "2006-01-02T15:04:05.999-07:00", time.RFC3339Nano} { + if t, err := time.Parse(layout, s); err == nil { + j.Time = t + return nil + } + } + return fmt.Errorf("unrecognised jira time format: %q", s) +} + +// pageState tracks ack progress for the in-flight page. Buffer/index/maxUpdated +// are mu-guarded. outstandingAcks/pageHasNack are atomic to keep the ack hot +// path lock-free. done is closed by the ack callback when the last ack settles; +// the Read goroutine then advances the cursor before fetching the next page, +// ensuring the cursor write happens-before the next JQL is built. +type pageState struct { + mu sync.Mutex + buffer []*service.Message + bufferIdx int + outstandingAcks atomic.Int32 + pageHasNack atomic.Bool + pageMaxUpdated time.Time + done chan struct{} +} + +// nextBufferedMessage returns the next pending message from the page buffer. +// The outstanding-ack counter is pre-loaded in load(), so dispatching does not +// touch it - that keeps the close-once invariant on pageState.done simple. +// ok is false when the buffer is drained. +func (p *pageState) nextBufferedMessage() (msg *service.Message, ok bool) { + p.mu.Lock() + defer p.mu.Unlock() + if p.bufferIdx >= len(p.buffer) { + return nil, false + } + msg = p.buffer[p.bufferIdx] + p.bufferIdx++ + return msg, true +} + +// isEmpty reports whether the buffer has no remaining messages. +func (p *pageState) isEmpty() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.bufferIdx >= len(p.buffer) +} + +// load atomically replaces the buffer contents with msgs, records the max +// updated timestamp observed for the page, and arms a fresh done channel. +// outstandingAcks is pre-loaded to len(msgs) so the ack callback only needs +// to decrement. An empty page has its done channel closed immediately so the +// Read loop never blocks waiting for acks that will never fire. +func (p *pageState) load(msgs []*service.Message, maxUpdated time.Time) { + p.mu.Lock() + defer p.mu.Unlock() + p.buffer = msgs + p.bufferIdx = 0 + p.pageMaxUpdated = maxUpdated + p.done = make(chan struct{}) + p.outstandingAcks.Store(int32(len(msgs))) + if len(msgs) == 0 { + close(p.done) + } +} + +// reset clears all per-page state in preparation for the next page fetch. +// done is cleared so a subsequent waitForPageAcks call before the next load +// is a no-op. +func (p *pageState) reset() { + p.mu.Lock() + defer p.mu.Unlock() + p.buffer = nil + p.bufferIdx = 0 + p.pageHasNack.Store(false) + p.pageMaxUpdated = time.Time{} + p.done = nil +} + +// bufferLen returns the number of messages in the current page buffer. +func (p *pageState) bufferLen() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.buffer) +} + +// maxUpdated returns the highest issue.updated timestamp seen in the buffer. +func (p *pageState) maxUpdated() time.Time { + p.mu.Lock() + defer p.mu.Unlock() + return p.pageMaxUpdated +} + +// currentDone returns the done channel for the current page under the page +// mutex so callers don't race load() / reset() reassigning the field. +func (p *pageState) currentDone() chan struct{} { + p.mu.Lock() + defer p.mu.Unlock() + return p.done +} + +// allDispatched reports whether every message in the current page has been +// handed out via nextBufferedMessage. Close uses this to decide whether +// waiting on done can ever succeed — if Read returned with un-dispatched +// messages still in the buffer, their acks will never fire and waiting would +// just stall until ctx expires. +func (p *pageState) allDispatched() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.bufferIdx >= len(p.buffer) +} + +func newReader(conf *service.ParsedConfig, mgr *service.Resources) (*reader, error) { + cfg, err := parseInputConfig(conf) + if err != nil { + return nil, err + } + return &reader{ + cfg: cfg, + mgr: mgr, + log: mgr.Logger(), + page: &pageState{}, + cacheSetErrors: mgr.Metrics().NewCounter("jira_input_cache_set_errors_total"), + childTruncated: mgr.Metrics().NewCounter("jira_input_child_truncated_total", "resource"), + }, nil +} + +func (r *reader) Connect(ctx context.Context) error { + if r.connected.Load() { + return nil + } + client, err := jiraauth.BuildClient(r.mgr, r.cfg.httpCfg, r.cfg.authEmail, r.cfg.authAPIToken, r.cfg.pageSize) + if err != nil { + return err + } + r.client = client + + // Validate auth via /myself. + myselfURL, err := url.Parse(r.cfg.httpCfg.BaseURL + "/rest/api/3/myself") + if err != nil { + return fmt.Errorf("invalid base_url: %w", err) + } + if _, err := r.callAPI(ctx, myselfURL); err != nil { + return fmt.Errorf("authenticating with jira: %w", err) + } + + // Load cursor from cache. + c, err := r.readCursor(ctx) + if err != nil { + return fmt.Errorf("reading cursor: %w", err) + } + r.setCursor(c) + r.connected.Store(true) + r.log.Infof("connected to %s as %s", r.cfg.httpCfg.BaseURL, r.cfg.authEmail) + return nil +} + +// callAPI is a thin wrapper around jirahttp.Client.CallAPI. +func (r *reader) callAPI(ctx context.Context, u *url.URL) ([]byte, error) { + return r.client.CallAPI(ctx, u) +} + +func (r *reader) readCursor(ctx context.Context) (cursor, error) { + var c cursor + var inner error + if err := r.mgr.AccessCache(ctx, r.cfg.cacheName, func(cache service.Cache) { + raw, gerr := cache.Get(ctx, r.cfg.cacheKey) + if gerr != nil { + if errors.Is(gerr, service.ErrKeyNotFound) { + return + } + inner = gerr + return + } + if uerr := json.Unmarshal(raw, &c); uerr != nil { + inner = fmt.Errorf("decoding cursor JSON: %w", uerr) + } + }); err != nil { + return cursor{}, err + } + if inner != nil { + return cursor{}, inner + } + return c, nil +} + +func (r *reader) writeCursor(ctx context.Context, c cursor) error { + raw, err := json.Marshal(c) + if err != nil { + return err + } + return r.mgr.AccessCache(ctx, r.cfg.cacheName, func(cache service.Cache) { + if serr := cache.Set(ctx, r.cfg.cacheKey, raw, nil); serr != nil { + r.log.Warnf("failed to write cursor to cache: %v", serr) + r.cacheSetErrors.Incr(1) + } + }) +} + +// Read fetches a new page when the buffer is empty and returns one buffered +// message per call. The ack callback only signals when the last ack settles; +// cursor advancement runs on the Read goroutine inside waitForPageAcks so the +// cursor write strictly happens-before the next fetchNextPage. +func (r *reader) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + for { + if msg, ok := r.page.nextBufferedMessage(); ok { + // Capture the page's done channel at dispatch time so the ack + // closure doesn't race with load() reassigning r.page.done on the + // next fetch (load only happens after the previous done is closed + // and drained, but capturing here keeps the invariant local). + done := r.page.currentDone() + ack := func(_ context.Context, ackErr error) error { + if ackErr != nil { + r.page.pageHasNack.Store(true) + } + if r.page.outstandingAcks.Add(-1) == 0 { + // Just signal; cursor write happens on the Read goroutine + // so it's serialised with the next fetch. + close(done) + } + return nil + } + return msg, ack, nil + } + + // Buffer is drained, but acks for the previous page may still be in + // flight. Wait for them and drain the cursor on this goroutine so the + // cursor write completes before we build the next request - otherwise + // the next JQL would be built against a stale cursor. + if err := r.waitForPageAcks(ctx); err != nil { + return nil, nil, err + } + + if err := r.fetchNextPage(ctx); err != nil { + return nil, nil, err + } + + if r.page.isEmpty() { + // Caught up; sleep before polling again. Do not return + // ErrNotConnected — that would trigger a reconnect cycle. + select { + case <-time.After(r.cfg.pollInterval): + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + // loop and try again + } + } +} + +// waitForPageAcks blocks until the in-flight page's done channel is closed by +// the last-firing ack callback, then runs onPageDrained on the caller's +// goroutine. If no page is loaded (first iteration, or already drained), it +// returns immediately. Context cancellation aborts the wait so a consumer that +// never acks cannot deadlock the input. +func (r *reader) waitForPageAcks(ctx context.Context) error { + done := r.page.currentDone() + if done == nil { + return nil + } + select { + case <-done: + r.onPageDrained(ctx) + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (r *reader) onPageDrained(ctx context.Context) { + if r.page.pageHasNack.Load() { + // don't advance cursor; reset for the refetch on next Read + r.page.reset() + return + } + maxUpdated := r.page.maxUpdated() + bufferLen := r.page.bufferLen() + if !maxUpdated.IsZero() && maxUpdated.After(r.currentCursor().Updated) { + newCur := cursor{Updated: maxUpdated, Version: cursorSchemaVersion} + r.setCursor(newCur) + if err := r.writeCursor(ctx, newCur); err != nil { + r.log.Warnf("writing cursor: %v", err) + } + r.log.Infof("advanced cursor to %s after %d issues", newCur.Updated.Format(time.RFC3339), bufferLen) + } + r.page.reset() +} + +func (r *reader) fetchNextPage(ctx context.Context) error { + u, err := r.buildSearchURL() + if err != nil { + return err + } + body, err := r.callAPI(ctx, u) + if err != nil { + return fmt.Errorf("fetching jira page: %w", err) + } + var page issuesPage + if err := json.Unmarshal(body, &page); err != nil { + return fmt.Errorf("decoding jira page: %w", err) + } + + msgs := make([]*service.Message, 0, len(page.Issues)) + var maxUpdated time.Time + for _, raw := range page.Issues { + var meta rawIssue + if err := json.Unmarshal(raw, &meta); err != nil { + return fmt.Errorf("decoding issue: %w", err) + } + if meta.Fields.Updated.After(maxUpdated) { + maxUpdated = meta.Fields.Updated.Time + } + + switch r.cfg.resource { + case resourceIssues: + msgs = append(msgs, r.messageFromIssue(raw, meta)) + case resourceComments: + children, err := r.fetchComments(ctx, meta) + if err != nil { + return err + } + msgs = append(msgs, children...) + case resourceChangelog: + children, err := r.messagesFromChangelog(raw, meta) + if err != nil { + return err + } + msgs = append(msgs, children...) + } + } + + r.page.load(msgs, maxUpdated) + r.nextToken = page.NextPageToken + return nil +} + +// messageFromIssue converts a single raw issue payload into an emitted message +// with the canonical issue metadata fields. +func (*reader) messageFromIssue(raw json.RawMessage, meta rawIssue) *service.Message { + m := service.NewMessage(raw) + m.MetaSetMut("jira_id", meta.Key) + m.MetaSetMut("jira_project", meta.Fields.Project.Key) + m.MetaSetMut("jira_updated", meta.Fields.Updated.UTC().Format(time.RFC3339)) + m.MetaSetMut("jira_event_type", "issue") + m.MetaSetMut("jira_self", meta.Self) + return m +} + +// fetchComments performs a per-issue child fetch against the comments endpoint +// and returns one message per comment. The issue key is URL-path-escaped so +// keys containing slashes or unicode (which Jira technically allows for some +// projects) round-trip safely. +func (r *reader) fetchComments(ctx context.Context, issue rawIssue) ([]*service.Message, error) { + u, err := url.Parse(r.cfg.httpCfg.BaseURL + "/rest/api/3/issue/" + url.PathEscape(issue.Key) + "/comment") + if err != nil { + return nil, err + } + body, err := r.callAPI(ctx, u) + if err != nil { + return nil, fmt.Errorf("fetching comments for %s: %w", issue.Key, err) + } + var envelope struct { + Comments []json.RawMessage `json:"comments"` + Total int `json:"total"` + MaxResults int `json:"maxResults"` + StartAt int `json:"startAt"` + } + if err := json.Unmarshal(body, &envelope); err != nil { + return nil, fmt.Errorf("decoding comments: %w", err) + } + if envelope.Total > len(envelope.Comments) { + r.log.Warnf("comments for issue %s truncated: page returned %d of %d (v1 reads only the first page)", issue.Key, len(envelope.Comments), envelope.Total) + r.childTruncated.Incr(1, "comments") + } + msgs := make([]*service.Message, 0, len(envelope.Comments)) + for _, raw := range envelope.Comments { + var c struct { + ID string `json:"id"` + Self string `json:"self"` + Updated jiraTime `json:"updated"` + } + if err := json.Unmarshal(raw, &c); err != nil { + return nil, fmt.Errorf("decoding comment: %w", err) + } + m := service.NewMessage(raw) + m.MetaSetMut("jira_id", c.ID) + m.MetaSetMut("jira_issue_key", issue.Key) + m.MetaSetMut("jira_project", issue.Fields.Project.Key) + m.MetaSetMut("jira_updated", c.Updated.UTC().Format(time.RFC3339)) + m.MetaSetMut("jira_event_type", "comment") + m.MetaSetMut("jira_self", c.Self) + msgs = append(msgs, m) + } + return msgs, nil +} + +// messagesFromChangelog decodes the changelog.histories[] array embedded in the +// issue (the search URL auto-augments expand=changelog when resource=changelog) +// and emits one message per history entry. Each message's body is the raw +// history-entry JSON so consumers can inspect items[]; metadata references the +// parent issue and uses history.created for jira_updated. +func (r *reader) messagesFromChangelog(raw json.RawMessage, meta rawIssue) ([]*service.Message, error) { + var envelope struct { + Changelog struct { + Histories []json.RawMessage `json:"histories"` + Total int `json:"total"` + MaxResults int `json:"maxResults"` + StartAt int `json:"startAt"` + } `json:"changelog"` + } + if err := json.Unmarshal(raw, &envelope); err != nil { + return nil, fmt.Errorf("decoding changelog: %w", err) + } + if envelope.Changelog.Total > len(envelope.Changelog.Histories) { + r.log.Warnf("changelog for issue %s truncated: page returned %d of %d (v1 reads only the first page)", meta.Key, len(envelope.Changelog.Histories), envelope.Changelog.Total) + r.childTruncated.Incr(1, "changelog") + } + msgs := make([]*service.Message, 0, len(envelope.Changelog.Histories)) + for _, hraw := range envelope.Changelog.Histories { + var h struct { + ID string `json:"id"` + Created jiraTime `json:"created"` + } + if err := json.Unmarshal(hraw, &h); err != nil { + return nil, fmt.Errorf("decoding history entry: %w", err) + } + m := service.NewMessage(hraw) + m.MetaSetMut("jira_id", h.ID) + m.MetaSetMut("jira_issue_key", meta.Key) + m.MetaSetMut("jira_project", meta.Fields.Project.Key) + m.MetaSetMut("jira_updated", h.Created.UTC().Format(time.RFC3339)) + m.MetaSetMut("jira_event_type", "changelog") + // History entries have no `self` URL of their own; fall back to the + // parent issue's self so the metadata contract holds for every message. + m.MetaSetMut("jira_self", meta.Self) + msgs = append(msgs, m) + } + return msgs, nil +} + +func (r *reader) buildSearchURL() (*url.URL, error) { + u, err := url.Parse(r.cfg.httpCfg.BaseURL + "/rest/api/3/search/jql") + if err != nil { + return nil, err + } + q := u.Query() + q.Set("jql", r.buildJQL()) + q.Set("fields", strings.Join(r.cfg.fields, ",")) + expand := r.cfg.expand + if r.cfg.resource == resourceChangelog { + expand = appendUnique(expand, "changelog") + } + if len(expand) > 0 { + q.Set("expand", strings.Join(expand, ",")) + } + q.Set("maxResults", strconv.Itoa(r.cfg.pageSize)) + if r.nextToken != "" { + q.Set("nextPageToken", r.nextToken) + } + u.RawQuery = q.Encode() + return u, nil +} + +func (r *reader) buildJQL() string { + parts := []string{} + if r.cfg.jql != "" { + parts = append(parts, "("+r.cfg.jql+")") + } + cur := r.currentCursor() + if !cur.Updated.IsZero() { + threshold := cur.Updated.Add(-r.cfg.cursorOverlap) + parts = append(parts, fmt.Sprintf(`updated >= "%s"`, threshold.UTC().Format("2006-01-02 15:04"))) + } + jql := strings.Join(parts, " AND ") + if jql != "" { + jql += " " + } + return jql + "ORDER BY updated ASC, key ASC" +} + +func appendUnique(s []string, v string) []string { + if slices.Contains(s, v) { + return s + } + return append(s, v) +} + +// closeAckDrainTimeout caps how long Close will wait for in-flight page acks +// to settle before returning. The benthos shutdown path abandons un-acked +// messages once the stream finishes tearing down, so blocking on the full +// shutdown ctx would just stall — the overlap window covers the replay. +const closeAckDrainTimeout = 500 * time.Millisecond + +// Close drains any in-flight page acks so the cursor can advance for the last +// page before tearing down, then resets the connected flag so a future Connect +// re-reads the cursor from the cache. The wait is bounded by both the passed +// ctx and a short internal timeout — if neither completes first, the cursor +// stays where it is and the next run will replay (within the overlap window). +// If the Read goroutine exited with un-dispatched messages still in the +// buffer, their acks will never fire, so we skip the wait — those messages +// were never seen by consumers and must not advance the cursor. +func (r *reader) Close(ctx context.Context) error { + if done := r.page.currentDone(); done != nil && r.page.allDispatched() { + drainCtx, cancel := context.WithTimeout(ctx, closeAckDrainTimeout) + defer cancel() + select { + case <-done: + r.onPageDrained(ctx) + case <-drainCtx.Done(): + // Bail out — cursor stays where it is; next Connect will resume. + } + } + r.connected.Store(false) + return nil +} + +func init() { + service.MustRegisterInput( + "jira", newJiraInputConfigSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + r, err := newReader(conf, mgr) + if err != nil { + return nil, err + } + return service.AutoRetryNacksToggled(conf, r) + }, + ) +} diff --git a/internal/impl/jira/input_jira_integration_test.go b/internal/impl/jira/input_jira_integration_test.go new file mode 100644 index 0000000000..7ae270dcd1 --- /dev/null +++ b/internal/impl/jira/input_jira_integration_test.go @@ -0,0 +1,96 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build integration + +package jira + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/benthos/v4/public/service/integration" +) + +// TestIntegration_JiraInput_FirstRunHasNoCursorPredicate exercises the jira +// input end-to-end through a service.StreamBuilder against a mock Jira API. +// +// It asserts that a fresh start (with an empty cache) issues a JQL query that +// does NOT include the `updated >=` cursor predicate. Asserting "restart +// resumes from cursor" against the in-memory cache resource is genuinely +// difficult to express through StreamBuilder (cache state is per-builder), so +// the more rigorous cursor advancement and predicate format coverage lives in +// the unit tests in input_jira_test.go. +func TestIntegration_JiraInput_FirstRunHasNoCursorPredicate(t *testing.T) { + integration.CheckSkip(t) + + var lastJQL atomic.Value + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + lastJQL.Store(r.URL.Query().Get("jql")) + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + out := make(chan struct{}, 4) + b := service.NewStreamBuilder() + require.NoError(t, b.SetLoggerYAML(`level: OFF`)) + require.NoError(t, b.AddCacheYAML(` +label: jira_state +memory: {} +`)) + require.NoError(t, b.AddInputYAML(fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state, overlap: 0s} +`, server.URL))) + require.NoError(t, b.AddConsumerFunc(func(_ context.Context, _ *service.Message) error { + out <- struct{}{} + return nil + })) + s, err := b.Build() + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + select { + case <-out: + case <-ctx.Done(): + t.Fatal("timed out waiting for first message") + } + require.NoError(t, s.StopWithin(time.Second)) + + jql, _ := lastJQL.Load().(string) + assert.NotContains(t, jql, "updated >=", "first run should not include cursor predicate") +} diff --git a/internal/impl/jira/input_jira_test.go b/internal/impl/jira/input_jira_test.go new file mode 100644 index 0000000000..9f17dfebf1 --- /dev/null +++ b/internal/impl/jira/input_jira_test.go @@ -0,0 +1,860 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jira + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" + + _ "github.com/redpanda-data/benthos/v4/public/components/io" + _ "github.com/redpanda-data/benthos/v4/public/components/pure" +) + +// mockJiraServer is a configurable httptest.Server for jira API responses. +type mockJiraServer struct { + *httptest.Server + myselfCalls atomic.Int32 + // handler is the route dispatcher; replace per test. + handler http.HandlerFunc +} + +func newMockJiraServer(t *testing.T) *mockJiraServer { + m := &mockJiraServer{} + m.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + m.myselfCalls.Add(1) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"accountId":"abc","emailAddress":"u@x"}`)) + default: + http.Error(w, "no route in test fixture: "+r.URL.Path, http.StatusInternalServerError) + } + } + m.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + m.handler(w, r) + })) + t.Cleanup(m.Close) + return m +} + +// buildStream wires the jira input into a stream builder with an in-memory cache +// and a transient consumer; returns the assembled stream and a channel of +// received messages (body + metadata) for assertions. +func buildStream(t *testing.T, yaml string) (*service.Stream, <-chan map[string]any) { + t.Helper() + out := make(chan map[string]any, 64) + builder := service.NewStreamBuilder() + require.NoError(t, builder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, builder.AddCacheYAML(` +label: jira_state +memory: {} +`)) + require.NoError(t, builder.AddInputYAML(yaml)) + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, msg *service.Message) error { + b, err := msg.AsBytes() + if err != nil { + return err + } + md := map[string]any{} + err = msg.MetaWalkMut(func(k string, v any) error { + md[k] = v + return nil + }) + if err != nil { + return err + } + out <- map[string]any{"body": string(b), "meta": md} + return nil + })) + s, err := builder.Build() + require.NoError(t, err) + return s, out +} + +func TestInputConfig_Parses(t *testing.T) { + yaml := ` +base_url: https://example.atlassian.net +auth: + email: user@example.com + api_token: secret +resource: issues +jql: "project = ENG" +poll_interval: 30s +cursor: + cache: jira_state +` + confSpec := newJiraInputConfigSpec() + parsed, err := confSpec.ParseYAML(yaml, nil) + require.NoError(t, err) + + cfg, err := parseInputConfig(parsed) + require.NoError(t, err) + assert.Equal(t, "https://example.atlassian.net", cfg.httpCfg.BaseURL) + assert.Equal(t, "issues", cfg.resource) + assert.Equal(t, "user@example.com", cfg.authEmail) + assert.Equal(t, "secret", cfg.authAPIToken) + assert.Equal(t, "project = ENG", cfg.jql) + assert.Equal(t, 30*time.Second, cfg.pollInterval) + assert.Equal(t, "jira_state", cfg.cacheName) + assert.Equal(t, "redpanda_connect_jira_input_issues", cfg.cacheKey) +} + +func TestInputConfig_RejectsWorklogsResource(t *testing.T) { + yaml := ` +base_url: https://example.atlassian.net +auth: + email: user@example.com + api_token: secret +resource: worklogs +cursor: + cache: jira_state +` + confSpec := newJiraInputConfigSpec() + parsed, err := confSpec.ParseYAML(yaml, nil) + require.NoError(t, err) + + _, err = parseInputConfig(parsed) + require.Error(t, err) + assert.Contains(t, err.Error(), "worklogs") +} + +func TestCursor_JSONRoundtrip(t *testing.T) { + original := cursor{Updated: time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC), Version: 1} + b, err := json.Marshal(original) + require.NoError(t, err) + assert.JSONEq(t, `{"updated":"2026-01-02T03:04:05Z","v":1}`, string(b)) + + var decoded cursor + require.NoError(t, json.Unmarshal(b, &decoded)) + assert.True(t, original.Updated.Equal(decoded.Updated)) + assert.Equal(t, original.Version, decoded.Version) +} + +func TestCursor_DecodeIgnoresUnknownFields(t *testing.T) { + var c cursor + require.NoError(t, json.Unmarshal([]byte(`{"updated":"2026-01-02T03:04:05Z","v":1,"future_field":"ignored"}`), &c)) + assert.Equal(t, 1, c.Version) +} + +func TestCursor_ZeroValueRepresentsFirstRun(t *testing.T) { + var c cursor + assert.True(t, c.Updated.IsZero()) + assert.Equal(t, 0, c.Version) +} + +func TestRead_EmitsOneMessagePerIssueWithMetadata(t *testing.T) { + mock := newMockJiraServer(t) + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{ + "issues":[ + {"id":"10001","key":"PROJ-1","self":"https://x/rest/api/3/issue/10001","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}, + {"id":"10002","key":"PROJ-2","self":"https://x/rest/api/3/issue/10002","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:01:00.000+0000"}} + ] + }`)) + default: + http.NotFound(w, r) + } + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state} +`, mock.URL) + + s, out := buildStream(t, yaml) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + got := []map[string]any{} + for len(got) < 2 { + select { + case m := <-out: + got = append(got, m) + case <-ctx.Done(): + t.Fatalf("timed out; got %d messages", len(got)) + } + } + require.NoError(t, s.StopWithin(time.Second)) + + assert.Contains(t, got[0]["body"], `"PROJ-1"`) + meta0 := got[0]["meta"].(map[string]any) + assert.Equal(t, "PROJ-1", meta0["jira_id"]) + assert.Equal(t, "PROJ", meta0["jira_project"]) + assert.Equal(t, "2026-06-01T10:00:00Z", meta0["jira_updated"]) + assert.Equal(t, "issue", meta0["jira_event_type"]) +} + +func TestRead_PaginatesAcrossMultiplePages(t *testing.T) { + mock := newMockJiraServer(t) + var calls atomic.Int32 + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + n := calls.Add(1) + token := r.URL.Query().Get("nextPageToken") + switch n { + case 1: + assert.Empty(t, token) + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}],"nextPageToken":"page2"}`)) + case 2: + assert.Equal(t, "page2", token) + _, _ = w.Write([]byte(`{"issues":[{"id":"2","key":"PROJ-2","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:05:00.000+0000"}}]}`)) + default: + _, _ = w.Write([]byte(`{"issues":[]}`)) + } + default: + http.NotFound(w, r) + } + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state} +`, mock.URL) + + s, out := buildStream(t, yaml) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + keys := []string{} + for len(keys) < 2 { + select { + case m := <-out: + md := m["meta"].(map[string]any) + keys = append(keys, md["jira_id"].(string)) + case <-ctx.Done(): + t.Fatalf("only got %v", keys) + } + } + require.NoError(t, s.StopWithin(time.Second)) + assert.Equal(t, []string{"PROJ-1", "PROJ-2"}, keys) +} + +func TestCursor_AdvancesAfterPageAcked(t *testing.T) { + mock := newMockJiraServer(t) + var calls atomic.Int32 + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + n := calls.Add(1) + jql := r.URL.Query().Get("jql") + if n == 1 { + assert.NotContains(t, jql, "updated >=") + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + } else { + assert.Contains(t, jql, "updated >=") + _, _ = w.Write([]byte(`{"issues":[]}`)) + } + } + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 60s + cursor: {cache: jira_state, overlap: 0s} +`, mock.URL) + + s, out := buildStream(t, yaml) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + select { + case <-out: + case <-ctx.Done(): + t.Fatal("no message") + } + require.Eventually(t, func() bool { return calls.Load() >= 2 }, 2*time.Second, 25*time.Millisecond) + require.NoError(t, s.StopWithin(time.Second)) +} + +func TestNack_PreventsCursorAdvance(t *testing.T) { + mock := newMockJiraServer(t) + var calls atomic.Int32 + var jqlValues atomic.Value + jqlValues.Store([]string{}) + var jqlMu sync.Mutex + recordJQL := func(jql string) { + jqlMu.Lock() + defer jqlMu.Unlock() + jqlValues.Store(append(jqlValues.Load().([]string), jql)) + } + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + n := calls.Add(1) + recordJQL(r.URL.Query().Get("jql")) + if n == 1 { + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + } else { + // Subsequent polls return an empty page so the input goes back + // to its polling sleep instead of spinning. The assertion is + // purely that a second request fires AND its JQL has no + // `updated >=` predicate (cursor did not advance). + _, _ = w.Write([]byte(`{"issues":[]}`)) + } + default: + http.NotFound(w, r) + } + } + + emitted := make(chan struct{}, 4) + builder := service.NewStreamBuilder() + require.NoError(t, builder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, builder.AddCacheYAML(` +label: jira_state +memory: {} +`)) + require.NoError(t, builder.AddInputYAML(fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state, overlap: 0s} + auto_replay_nacks: false +`, mock.URL))) + var nackOnce atomic.Bool + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, _ *service.Message) error { + emitted <- struct{}{} + if nackOnce.CompareAndSwap(false, true) { + return errors.New("forced nack") + } + return nil + })) + s, err := builder.Build() + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + // First emission triggers a nack. + select { + case <-emitted: + case <-ctx.Done(): + t.Fatal("no message emitted") + } + // The cursor must NOT advance after the nack: the next JQL must therefore + // still match the same (zero-cursor) page, so a second request fires with + // no `updated >=` predicate. + require.Eventually(t, func() bool { return calls.Load() >= 2 }, 3*time.Second, 25*time.Millisecond) + require.NoError(t, s.StopWithin(2*time.Second)) + + jqls := jqlValues.Load().([]string) + require.GreaterOrEqual(t, len(jqls), 2, "expected at least 2 JQL requests, got %v", jqls) + assert.NotContains(t, jqls[0], "updated >=", "first request should have no cursor predicate") + assert.NotContains(t, jqls[1], "updated >=", "nacked page must not advance the cursor") +} + +func TestConnect_CallsMyselfAndStartsFromZeroCursor(t *testing.T) { + mock := newMockJiraServer(t) + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + jql: "" + poll_interval: 10s + cursor: {cache: jira_state} +`, mock.URL) + + s, _ := buildStream(t, yaml) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + // run a brief stream lifecycle: assert no error connecting; we don't yet emit messages. + go func() { _ = s.Run(ctx) }() + require.Eventually(t, func() bool { return mock.myselfCalls.Load() >= 1 }, 1500*time.Millisecond, 25*time.Millisecond) + require.NoError(t, s.StopWithin(time.Second)) +} + +// TestResource_Comments verifies that resource=comments performs a per-issue +// child fetch against /rest/api/3/issue//comment and emits one message +// per comment with metadata populated from the comment payload while still +// referencing the parent issue's key and project. +func TestResource_Comments(t *testing.T) { + mock := newMockJiraServer(t) + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + _, _ = w.Write([]byte(`{"issues":[{"id":"10001","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + case "/rest/api/3/issue/PROJ-1/comment": + _, _ = w.Write([]byte(`{"comments":[ + {"id":"100","self":"https://x/rest/api/3/issue/10001/comment/100","updated":"2026-06-01T10:00:01.000+0000","body":"hi"}, + {"id":"101","self":"https://x/rest/api/3/issue/10001/comment/101","updated":"2026-06-01T10:00:02.000+0000","body":"there"} + ]}`)) + default: + http.Error(w, r.URL.Path, http.StatusNotFound) + } + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: comments + poll_interval: 10s + cursor: {cache: jira_state} +`, mock.URL) + + s, out := buildStream(t, yaml) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + got := []map[string]any{} + for len(got) < 2 { + select { + case m := <-out: + got = append(got, m) + case <-ctx.Done(): + t.Fatalf("got only %d", len(got)) + } + } + require.NoError(t, s.StopWithin(time.Second)) + + meta0 := got[0]["meta"].(map[string]any) + assert.Equal(t, "100", meta0["jira_id"]) + assert.Equal(t, "PROJ-1", meta0["jira_issue_key"]) + assert.Equal(t, "PROJ", meta0["jira_project"]) + assert.Equal(t, "2026-06-01T10:00:01Z", meta0["jira_updated"]) + assert.Equal(t, "comment", meta0["jira_event_type"]) + assert.Equal(t, "https://x/rest/api/3/issue/10001/comment/100", meta0["jira_self"]) + + meta1 := got[1]["meta"].(map[string]any) + assert.Equal(t, "101", meta1["jira_id"]) + assert.Equal(t, "PROJ-1", meta1["jira_issue_key"]) +} + +// TestResource_Changelog verifies that when resource=changelog the input +// auto-augments the search URL with expand=changelog and emits one message per +// entry in each issue's changelog.histories[] array, with metadata referencing +// the parent issue. +func TestResource_Changelog(t *testing.T) { + mock := newMockJiraServer(t) + mock.handler = func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/rest/api/3/myself" { + _, _ = w.Write([]byte(`{}`)) + return + } + assert.Contains(t, r.URL.Query().Get("expand"), "changelog") + _, _ = w.Write([]byte(`{"issues":[{ + "id":"1","key":"PROJ-1","self":"https://x/rest/api/3/issue/1", + "fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}, + "changelog":{"histories":[ + {"id":"500","author":{"emailAddress":"a@x"},"created":"2026-06-01T09:00:00.000+0000","items":[{"field":"status","fromString":"Open","toString":"Done"}]}, + {"id":"501","author":{"emailAddress":"a@x"},"created":"2026-06-01T10:00:00.000+0000","items":[{"field":"assignee"}]} + ]} + }]}`)) + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: changelog + poll_interval: 10s + cursor: {cache: jira_state} +`, mock.URL) + + s, out := buildStream(t, yaml) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + got := []map[string]any{} + for len(got) < 2 { + select { + case m := <-out: + got = append(got, m) + case <-ctx.Done(): + t.Fatalf("only got %d messages", len(got)) + } + } + require.NoError(t, s.StopWithin(time.Second)) + + ids := []string{} + for _, m := range got { + md := m["meta"].(map[string]any) + ids = append(ids, md["jira_id"].(string)) + assert.Equal(t, "PROJ-1", md["jira_issue_key"]) + assert.Equal(t, "PROJ", md["jira_project"]) + assert.Equal(t, "changelog", md["jira_event_type"]) + // History entries have no `self` of their own; jira_self falls back to + // the parent issue's self URL so every message satisfies the metadata + // contract advertised in the input's Description. + assert.Equal(t, "https://x/rest/api/3/issue/1", md["jira_self"]) + } + assert.ElementsMatch(t, []string{"500", "501"}, ids) + + // The body of each message should be the raw history entry JSON, including + // the items[] array so consumers can inspect field-level changes. + for _, m := range got { + body := m["body"].(string) + assert.Contains(t, body, `"items"`) + } + + // Per-message jira_updated should reflect history.created, not issue.updated. + for _, m := range got { + md := m["meta"].(map[string]any) + switch md["jira_id"].(string) { + case "500": + assert.Equal(t, "2026-06-01T09:00:00Z", md["jira_updated"]) + case "501": + assert.Equal(t, "2026-06-01T10:00:00Z", md["jira_updated"]) + } + } +} + +// capturingLogger is a service.PrintLogger that records every log line so +// tests can assert against the runtime's reaction to a Connect failure. +type capturingLogger struct { + mu sync.Mutex + lines []string +} + +func (c *capturingLogger) Printf(format string, v ...any) { + c.mu.Lock() + defer c.mu.Unlock() + c.lines = append(c.lines, fmt.Sprintf(format, v...)) +} + +func (c *capturingLogger) Println(v ...any) { + c.mu.Lock() + defer c.mu.Unlock() + c.lines = append(c.lines, fmt.Sprint(v...)) +} + +func (c *capturingLogger) snapshot() []string { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]string, len(c.lines)) + copy(out, c.lines) + return out +} + +// TestConnect_Returns401AsHardError verifies that a 401 from /myself causes +// Connect to fail and the runtime to surface the auth error - the input must +// not emit any messages and the failure must be observable in the logs as a +// 401-flavored error (jirahttp returns it via DropStatuses without retry, +// configured by jiraauth.BuildClient). +func TestConnect_Returns401AsHardError(t *testing.T) { + mock := newMockJiraServer(t) + mock.handler = func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: bad} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state} +`, mock.URL) + + logs := &capturingLogger{} + out := make(chan map[string]any, 4) + builder := service.NewStreamBuilder() + builder.SetPrintLogger(logs) + require.NoError(t, builder.AddCacheYAML(` +label: jira_state +memory: {} +`)) + require.NoError(t, builder.AddInputYAML(yaml)) + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, msg *service.Message) error { + b, _ := msg.AsBytes() + out <- map[string]any{"body": string(b)} + return nil + })) + s, err := builder.Build() + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + runErrCh := make(chan error, 1) + go func() { runErrCh <- s.Run(ctx) }() + + // The 401 must be observable in the logs: jirahttp's HTTPError formats as + // "http error: status=401 ...", wrapped by Connect as "authenticating with + // jira: ...". DropStatuses means no retry on the HTTP layer; the runtime + // will re-call Connect with backoff but every attempt must log the 401. + require.Eventually(t, func() bool { + for _, line := range logs.snapshot() { + if strings.Contains(line, "401") { + return true + } + } + return false + }, 1500*time.Millisecond, 25*time.Millisecond, "expected a 401-flavored error to appear in the logs") + + // No messages can possibly have been emitted - Connect never succeeded. + select { + case msg := <-out: + t.Fatalf("input emitted a message despite 401 on /myself: %v", msg) + default: + } + + // Stop the stream and assert that, if Run returned non-nil during the wait, + // the error is 401-flavored. (Connect retries with backoff so Run typically + // returns only after StopWithin; either way no 401 should be silently + // swallowed.) + require.NoError(t, s.StopWithin(2*time.Second)) + cancel() + + select { + case runErr := <-runErrCh: + if runErr != nil && !errors.Is(runErr, context.Canceled) && !errors.Is(runErr, context.DeadlineExceeded) { + assert.Contains(t, runErr.Error(), "401", + "Run error must reference the 401 status if it propagates") + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not exit after StopWithin") + } +} + +// failingCache implements service.Cache: Get behaves like an empty memory cache, +// Set always returns an error. Used by TestCacheSet_SoftFails to verify the +// input keeps polling when the cursor cache write fails. +type failingCache struct{} + +func (failingCache) Get(_ context.Context, _ string) ([]byte, error) { + return nil, service.ErrKeyNotFound +} + +func (failingCache) Set(_ context.Context, _ string, _ []byte, _ *time.Duration) error { + return errors.New("simulated cache failure") +} + +func (failingCache) Add(ctx context.Context, key string, value []byte, ttl *time.Duration) error { + return failingCache{}.Set(ctx, key, value, ttl) +} + +func (failingCache) Delete(_ context.Context, _ string) error { return nil } +func (failingCache) Close(_ context.Context) error { return nil } + +func init() { + _ = service.RegisterCache("failing_cache_for_test", service.NewConfigSpec(), + func(_ *service.ParsedConfig, _ *service.Resources) (service.Cache, error) { + return failingCache{}, nil + }) +} + +// TestCacheSet_SoftFails asserts that when the cursor cache returns an error +// from Set, the input does not error out the stream and keeps polling. +// The metric increment is exercised via writeCursor but not directly observed +// here (no easy hook on *service.MetricCounter); the assertion is on the +// observable behavior: multiple HTTP polls fire without the stream dying. +func TestCacheSet_SoftFails(t *testing.T) { + mock := newMockJiraServer(t) + var pollCount atomic.Int32 + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + pollCount.Add(1) + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + default: + http.Error(w, "no route in test fixture: "+r.URL.Path, http.StatusInternalServerError) + } + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: failing_jira_state, overlap: 0s} +`, mock.URL) + + builder := service.NewStreamBuilder() + require.NoError(t, builder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, builder.AddCacheYAML(` +label: failing_jira_state +failing_cache_for_test: {} +`)) + require.NoError(t, builder.AddInputYAML(yaml)) + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, _ *service.Message) error { return nil })) + s, err := builder.Build() + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + runErrCh := make(chan error, 1) + go func() { runErrCh <- s.Run(ctx) }() + + // poll_interval is 10s (enforced minimum), but the first two polls fire + // back-to-back: poll #1 happens immediately on first Read, the page acks + // quickly (consumer is a no-op), and the subsequent fetchNextPage runs + // without hitting the inter-poll sleep because the buffer was non-empty. + require.Eventually(t, func() bool { return pollCount.Load() >= 2 }, 4*time.Second, 25*time.Millisecond, + "expected multiple polls — Set should soft-fail and not block the input") + + require.NoError(t, s.StopWithin(2*time.Second)) + cancel() + + select { + case runErr := <-runErrCh: + if runErr != nil && !errors.Is(runErr, context.Canceled) && !errors.Is(runErr, context.DeadlineExceeded) { + t.Fatalf("stream Run returned unexpected error: %v", runErr) + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not exit after StopWithin") + } +} + +// TestRestart_ResumesFromCachedCursor exercises the full restart cycle: the +// first run writes a cursor to a file-backed cache, the second run builds a +// fresh stream pointed at the same cache directory and the first JQL of the +// second run must contain the `updated >=` predicate loaded from the cache. +// This is the only test that validates the spec acceptance criterion "Cursor +// checkpoint survives restarts via cache resource." +func TestRestart_ResumesFromCachedCursor(t *testing.T) { + mock := newMockJiraServer(t) + var calls atomic.Int32 + var jqls []string + var jqlsMu sync.Mutex + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + n := calls.Add(1) + jql := r.URL.Query().Get("jql") + jqlsMu.Lock() + jqls = append(jqls, jql) + jqlsMu.Unlock() + // Return one issue on call 1; empty thereafter so the stream idles + // and the second run also sees an empty page after its first JQL. + if n == 1 { + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + } else { + _, _ = w.Write([]byte(`{"issues":[]}`)) + } + default: + http.Error(w, "no route in test fixture: "+r.URL.Path, http.StatusInternalServerError) + } + } + + cacheDir := t.TempDir() + inputYAML := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state, overlap: 0s} +`, mock.URL) + cacheYAML := fmt.Sprintf(` +label: jira_state +file: + directory: %s +`, cacheDir) + + // run builds a fresh stream wired to the file-backed cache, waits until at + // least one additional poll fires after the initial fetch (so the cursor + // has been written to disk via the ack-drain path), then tears down. + run := func(t *testing.T, minCalls int32) { + t.Helper() + out := make(chan struct{}, 1) + builder := service.NewStreamBuilder() + require.NoError(t, builder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, builder.AddCacheYAML(cacheYAML)) + require.NoError(t, builder.AddInputYAML(inputYAML)) + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, _ *service.Message) error { + select { + case out <- struct{}{}: + default: + } + return nil + })) + s, err := builder.Build() + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + // Wait until at least minCalls JQL calls have fired - this guarantees + // the in-flight page has been acked and the cursor flushed to the cache. + require.Eventually(t, func() bool { return calls.Load() >= minCalls }, 2500*time.Millisecond, 25*time.Millisecond) + require.NoError(t, s.StopWithin(2*time.Second)) + } + + // Run 1: cache starts empty, first JQL must NOT contain "updated >=". + run(t, 2) + callsAfterFirst := calls.Load() + jqlsMu.Lock() + require.NotEmpty(t, jqls, "expected at least one JQL recorded after run 1") + firstJQLOfFirstRun := jqls[0] + jqlsMu.Unlock() + assert.NotContains(t, firstJQLOfFirstRun, "updated >=", + "first JQL of first run must not have cursor predicate (cache is empty)") + + // Run 2 (restart): same cache dir; the cursor written during run 1 must be + // loaded by Connect and applied to the first JQL of this run. + run(t, callsAfterFirst+1) + jqlsMu.Lock() + require.Greater(t, len(jqls), int(callsAfterFirst), "run 2 did not record a new JQL") + firstJQLOfSecondRun := jqls[callsAfterFirst] + jqlsMu.Unlock() + assert.Contains(t, firstJQLOfSecondRun, "updated >=", + "first JQL of second run must include cursor predicate loaded from cache") +} diff --git a/internal/impl/jira/jiraauth/client.go b/internal/impl/jira/jiraauth/client.go new file mode 100644 index 0000000000..a5400b7918 --- /dev/null +++ b/internal/impl/jira/jiraauth/client.go @@ -0,0 +1,77 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package jiraauth provides shared construction of a configured *jirahttp.Client. +// Both the Jira processor and Jira input parse their own config shapes and pass +// resolved values into BuildClient, which wires basic auth and the standard +// Jira retry policy through the httpclient layer. +package jiraauth + +import ( + "errors" + "fmt" + + "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/httpclient" + "github.com/redpanda-data/connect/v4/internal/impl/jira/jirahttp" +) + +// BuildClient constructs a *jirahttp.Client using basic auth (username or +// email + API token) and the standard Jira retry policy: retry on 429 and +// 5xx, hard-fail on 401/403. +// +// httpCfg is mutated in place: AuthSigner, Retry, and MetricPrefix are set. +// Callers should pass a config returned from httpclient.NewConfigFromParsed. +func BuildClient(mgr *service.Resources, httpCfg *httpclient.Config, username, apiToken string, maxResults int) (*jirahttp.Client, error) { + if httpCfg == nil { + return nil, errors.New("http config must not be nil") + } + if httpCfg.BaseURL == "" { + return nil, errors.New("base_url must not be empty") + } + if username == "" { + return nil, errors.New("username must not be empty") + } + if apiToken == "" { + return nil, errors.New("api_token must not be empty") + } + if maxResults <= 0 || maxResults > 5000 { + return nil, errors.New("max_results must be between 1 and 5000") + } + + httpCfg.AuthSigner = httpclient.BasicAuthSigner(username, apiToken) + httpCfg.Retry = &httpclient.RetryConfig{ + MaxRetries: 3, + RetryStatuses: []int{429, 502, 503, 504}, + DropStatuses: []int{401, 403}, + } + if httpCfg.MetricPrefix == "" { + httpCfg.MetricPrefix = "jira_http" + } + + httpClient, err := httpclient.NewClient(*httpCfg, mgr) + if err != nil { + return nil, fmt.Errorf("building http client: %w", err) + } + + headerPolicy := &jirahttp.AuthHeaderPolicy{ + HeaderName: "X-Seraph-LoginReason", + IsProblem: func(reason string) bool { + return reason != "" && reason != "OK" && reason != "AUTHENTICATED_TRUE" + }, + } + + return jirahttp.NewClient(mgr.Logger(), httpCfg.BaseURL, maxResults, httpClient, headerPolicy), nil +} diff --git a/internal/impl/jira/jiraauth/client_test.go b/internal/impl/jira/jiraauth/client_test.go new file mode 100644 index 0000000000..010389da4a --- /dev/null +++ b/internal/impl/jira/jiraauth/client_test.go @@ -0,0 +1,63 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jiraauth_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/httpclient" + "github.com/redpanda-data/connect/v4/internal/impl/jira/jiraauth" +) + +func TestBuildClient_ValidatesRequiredFields(t *testing.T) { + mgr := service.MockResources() + httpCfg := &httpclient.Config{BaseURL: "https://example.atlassian.net"} + + _, err := jiraauth.BuildClient(mgr, httpCfg, "", "tok", 50) + require.Error(t, err) + assert.Contains(t, err.Error(), "username") + + _, err = jiraauth.BuildClient(mgr, httpCfg, "u@x", "", 50) + require.Error(t, err) + assert.Contains(t, err.Error(), "api_token") + + _, err = jiraauth.BuildClient(mgr, httpCfg, "u@x", "tok", 0) + require.Error(t, err) + assert.Contains(t, err.Error(), "max_results") + + _, err = jiraauth.BuildClient(mgr, httpCfg, "u@x", "tok", 6000) + require.Error(t, err) + assert.Contains(t, err.Error(), "max_results") +} + +func TestBuildClient_ReturnsClientWithRetryConfigured(t *testing.T) { + mgr := service.MockResources() + httpCfg := &httpclient.Config{BaseURL: "https://example.atlassian.net"} + + c, err := jiraauth.BuildClient(mgr, httpCfg, "user@example.com", "secret-token", 50) + require.NoError(t, err) + require.NotNil(t, c) + + require.NotNil(t, httpCfg.Retry) + assert.Equal(t, 3, httpCfg.Retry.MaxRetries) + assert.ElementsMatch(t, []int{429, 502, 503, 504}, httpCfg.Retry.RetryStatuses) + assert.ElementsMatch(t, []int{401, 403}, httpCfg.Retry.DropStatuses) + assert.Equal(t, "jira_http", httpCfg.MetricPrefix) +} diff --git a/internal/impl/jira/jirahttp/client.go b/internal/impl/jira/jirahttp/client.go index f20139ea30..73ff45ef34 100644 --- a/internal/impl/jira/jirahttp/client.go +++ b/internal/impl/jira/jirahttp/client.go @@ -41,6 +41,13 @@ type httpDoer interface { Do(req *http.Request) (*http.Response, error) } +// CallAPI is the exported entry point that input/processor callers use to +// dispatch arbitrary Jira REST requests. It is a thin wrapper around the +// internal callJiraApi helper. +func (j *Client) CallAPI(ctx context.Context, u *url.URL) ([]byte, error) { + return j.callJiraApi(ctx, u) +} + // callJiraApi calls the Jira API at the given URL. Auth, retry, metrics, and // rate limiting are handled by the underlying httpDoer (*http.Client assembled // by httpclient.NewClient in production). This method sets Jira-specific headers and performs the diff --git a/internal/impl/jira/processor_jira.go b/internal/impl/jira/processor_jira.go index 2f5e67fe79..0869c71e38 100644 --- a/internal/impl/jira/processor_jira.go +++ b/internal/impl/jira/processor_jira.go @@ -31,6 +31,7 @@ import ( "errors" "github.com/redpanda-data/connect/v4/internal/httpclient" + "github.com/redpanda-data/connect/v4/internal/impl/jira/jiraauth" "github.com/redpanda-data/connect/v4/internal/impl/jira/jirahttp" "github.com/redpanda-data/connect/v4/internal/license" @@ -119,17 +120,11 @@ func newJiraProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*jira if err != nil { return nil, err } - if username == "" { - return nil, errors.New("username must not be empty") - } apiToken, err := conf.FieldString("api_token") if err != nil { return nil, err } - if apiToken == "" { - return nil, errors.New("api_token must not be empty") - } maxResults, err := conf.FieldInt("max_results_per_page") if err != nil { @@ -139,34 +134,13 @@ func newJiraProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*jira return nil, errors.New("max_results_per_page must be between 1 and 5000") } - // Wire Jira basic auth into the httpclient auth signer. - httpCfg.AuthSigner = httpclient.BasicAuthSigner(username, apiToken) - - // Configure retry: retry on 429/5xx, drop on 401/403. - httpCfg.Retry = &httpclient.RetryConfig{ - MaxRetries: 3, - RetryStatuses: []int{429, 502, 503, 504}, - DropStatuses: []int{401, 403}, - } - - httpCfg.MetricPrefix = "jira_http" - - httpClient, err := httpclient.NewClient(httpCfg, mgr) + client, err := jiraauth.BuildClient(mgr, &httpCfg, username, apiToken, maxResults) if err != nil { return nil, err } - headerPolicy := &jirahttp.AuthHeaderPolicy{ - HeaderName: "X-Seraph-LoginReason", - IsProblem: func(reason string) bool { - return reason != "" && reason != "OK" && reason != "AUTHENTICATED_TRUE" - }, - } - - jiraHttp := jirahttp.NewClient(mgr.Logger(), httpCfg.BaseURL, maxResults, httpClient, headerPolicy) - return &jiraProcessor{ - client: jiraHttp, + client: client, log: mgr.Logger(), }, nil } diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 85605d9bd1..ec9ba22ac0 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -120,6 +120,7 @@ inproc ,output ,inproc ,certified ,n insert_part ,processor ,insert_part ,certified ,n ,y ,y , jaeger ,tracer ,jaeger ,community ,n ,n ,n ,cloud uses a managed tracing integration javascript ,processor ,javascript ,certified ,n ,n ,n ,security: arbitrary code execution +jira ,input ,jira ,certified ,n ,y ,n , jira ,processor ,jira ,certified ,n ,y ,n , jmespath ,processor ,JMESPath ,certified ,n ,y ,y , jq ,processor ,jq ,certified ,n ,y ,y , From 29ef5e6a9a32388a60bdd9443e91589def304fc5 Mon Sep 17 00:00:00 2001 From: Jonathan Chaput Date: Thu, 4 Jun 2026 07:45:07 -0400 Subject: [PATCH 2/6] jira: fix cursor advancement during multi-page pagination onPageDrained was advancing the cursor after each acked page and not resetting r.nextToken on nack. This caused two correctness bugs: - Nacked records on a multi-page response were never re-read: the next fetch reused the next-page token, skipping past the nacked page, and a subsequent ack advanced the cursor past the nacked range entirely. - The JQL predicate changed between paginated requests while the opaque nextPageToken was reused, violating Jira's cursor-pagination contract (JQL must remain stable across the token sequence). Accumulate max issue.updated across the run and only advance the persisted cursor when nextPageToken is empty. On nack, reset r.nextToken and the run accumulator so the next fetch restarts the pagination run from the current cursor. Also drop the `//go:build integration` tag from the integration test per project convention (gating via integration.CheckSkip is enough). --- internal/impl/jira/input_jira.go | 71 +++++++++++---- .../impl/jira/input_jira_integration_test.go | 2 - internal/impl/jira/input_jira_test.go | 90 +++++++++++++++++++ 3 files changed, 143 insertions(+), 20 deletions(-) diff --git a/internal/impl/jira/input_jira.go b/internal/impl/jira/input_jira.go index c484a5a116..81c9c7159e 100644 --- a/internal/impl/jira/input_jira.go +++ b/internal/impl/jira/input_jira.go @@ -248,7 +248,23 @@ type reader struct { cur cursor connected atomic.Bool page *pageState + + // runMu serialises onPageDrained and protects nextToken / runMaxUpdated. + // Both the Read goroutine (via waitForPageAcks) and the Close goroutine + // can invoke onPageDrained when the last ack fires, so the run-level + // state needs explicit synchronisation. Held only inside onPageDrained + // and the small write paths in fetchNextPage / buildSearchURL. + runMu sync.Mutex + // nextToken is the opaque pagination cursor returned by Jira. Empty + // when not in a multi-page run. nextToken string + // runMaxUpdated accumulates max issue.updated across the current + // pagination run. The cursor is only persisted at the end of the run + // (when nextPageToken is empty) because Jira's cursor pagination + // requires JQL to remain stable across the token sequence — advancing + // the cursor mid-run would mutate the JQL while still passing the + // previous page's opaque token. + runMaxUpdated time.Time } // currentCursor returns a copy of the current cursor under read lock. @@ -370,13 +386,6 @@ func (p *pageState) reset() { p.done = nil } -// bufferLen returns the number of messages in the current page buffer. -func (p *pageState) bufferLen() int { - p.mu.Lock() - defer p.mu.Unlock() - return len(p.buffer) -} - // maxUpdated returns the highest issue.updated timestamp seen in the buffer. func (p *pageState) maxUpdated() time.Time { p.mu.Lock() @@ -561,20 +570,41 @@ func (r *reader) waitForPageAcks(ctx context.Context) error { } func (r *reader) onPageDrained(ctx context.Context) { + // Serialise with the symmetric Close-side call: if Close races with the + // Read goroutine's waitForPageAcks, both will reach onPageDrained when + // the same done channel closes, so the mutations here must be atomic. + r.runMu.Lock() + defer r.runMu.Unlock() if r.page.pageHasNack.Load() { - // don't advance cursor; reset for the refetch on next Read + // Nack: discard the in-flight page and restart the pagination + // run from the current (unadvanced) cursor on the next Read. + // Resetting r.nextToken is critical — without it the next fetch + // would reuse the previous-page token and skip past the nacked + // records, which would then fall below the cursor predicate + // forever once a subsequent ack advanced it. r.page.reset() + r.nextToken = "" + r.runMaxUpdated = time.Time{} return } - maxUpdated := r.page.maxUpdated() - bufferLen := r.page.bufferLen() - if !maxUpdated.IsZero() && maxUpdated.After(r.currentCursor().Updated) { - newCur := cursor{Updated: maxUpdated, Version: cursorSchemaVersion} - r.setCursor(newCur) - if err := r.writeCursor(ctx, newCur); err != nil { - r.log.Warnf("writing cursor: %v", err) + pageMax := r.page.maxUpdated() + if pageMax.After(r.runMaxUpdated) { + r.runMaxUpdated = pageMax + } + // Only advance the persisted cursor at the end of a pagination run. + // Mutating the JQL mid-run while reusing nextPageToken is unsound - + // Jira's cursor pagination expects the JQL to remain stable across + // the token sequence. + if r.nextToken == "" { + if !r.runMaxUpdated.IsZero() && r.runMaxUpdated.After(r.currentCursor().Updated) { + newCur := cursor{Updated: r.runMaxUpdated, Version: cursorSchemaVersion} + r.setCursor(newCur) + if err := r.writeCursor(ctx, newCur); err != nil { + r.log.Warnf("writing cursor: %v", err) + } + r.log.Infof("advanced cursor to %s after pagination run", newCur.Updated.Format(time.RFC3339)) } - r.log.Infof("advanced cursor to %s after %d issues", newCur.Updated.Format(time.RFC3339), bufferLen) + r.runMaxUpdated = time.Time{} } r.page.reset() } @@ -623,7 +653,9 @@ func (r *reader) fetchNextPage(ctx context.Context) error { } r.page.load(msgs, maxUpdated) + r.runMu.Lock() r.nextToken = page.NextPageToken + r.runMu.Unlock() return nil } @@ -747,8 +779,11 @@ func (r *reader) buildSearchURL() (*url.URL, error) { q.Set("expand", strings.Join(expand, ",")) } q.Set("maxResults", strconv.Itoa(r.cfg.pageSize)) - if r.nextToken != "" { - q.Set("nextPageToken", r.nextToken) + r.runMu.Lock() + tok := r.nextToken + r.runMu.Unlock() + if tok != "" { + q.Set("nextPageToken", tok) } u.RawQuery = q.Encode() return u, nil diff --git a/internal/impl/jira/input_jira_integration_test.go b/internal/impl/jira/input_jira_integration_test.go index 7ae270dcd1..4ace3216c3 100644 --- a/internal/impl/jira/input_jira_integration_test.go +++ b/internal/impl/jira/input_jira_integration_test.go @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build integration - package jira import ( diff --git a/internal/impl/jira/input_jira_test.go b/internal/impl/jira/input_jira_test.go index 9f17dfebf1..d4c757679e 100644 --- a/internal/impl/jira/input_jira_test.go +++ b/internal/impl/jira/input_jira_test.go @@ -400,6 +400,96 @@ jira: assert.NotContains(t, jqls[1], "updated >=", "nacked page must not advance the cursor") } +// TestNack_RestartsPaginationFromCurrentCursor verifies that a nack on a +// multi-page response restarts the pagination run from the current +// (unadvanced) cursor. The mock returns page 1 with nextPageToken=page2; the +// consumer nacks the first message. The next /search/jql request after the +// nack must NOT carry nextPageToken=page2 in its query string — otherwise the +// nacked records would be skipped past forever (the next page-2 ack would +// advance the cursor beyond them). +func TestNack_RestartsPaginationFromCurrentCursor(t *testing.T) { + mock := newMockJiraServer(t) + var calls atomic.Int32 + var tokensMu sync.Mutex + var tokens []string + recordToken := func(tok string) { + tokensMu.Lock() + defer tokensMu.Unlock() + tokens = append(tokens, tok) + } + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + n := calls.Add(1) + tok := r.URL.Query().Get("nextPageToken") + recordToken(tok) + switch n { + case 1: + // First request of the (nacked) run: returns page 1 with a + // nextPageToken so the input would normally fetch page 2. + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}],"nextPageToken":"page2"}`)) + default: + // After the nack we expect a fresh request with NO + // nextPageToken; serve an empty page so the input idles. + _, _ = w.Write([]byte(`{"issues":[]}`)) + } + default: + http.NotFound(w, r) + } + } + + emitted := make(chan struct{}, 4) + builder := service.NewStreamBuilder() + require.NoError(t, builder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, builder.AddCacheYAML(` +label: jira_state +memory: {} +`)) + require.NoError(t, builder.AddInputYAML(fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state, overlap: 0s} + auto_replay_nacks: false +`, mock.URL))) + var nackOnce atomic.Bool + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, _ *service.Message) error { + emitted <- struct{}{} + if nackOnce.CompareAndSwap(false, true) { + return errors.New("forced nack") + } + return nil + })) + s, err := builder.Build() + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + // First emission triggers a nack on the only message of page 1. + select { + case <-emitted: + case <-ctx.Done(): + t.Fatal("no message emitted") + } + // Wait until at least a second JQL request fires after the nack. The + // invariant under test is that this second request carries NO + // nextPageToken — the pagination run restarts from the cursor. + require.Eventually(t, func() bool { return calls.Load() >= 2 }, 3*time.Second, 25*time.Millisecond) + require.NoError(t, s.StopWithin(2*time.Second)) + + tokensMu.Lock() + defer tokensMu.Unlock() + require.GreaterOrEqual(t, len(tokens), 2, "expected at least 2 search/jql requests, got %v", tokens) + assert.Empty(t, tokens[0], "first request should have no nextPageToken") + assert.Empty(t, tokens[1], "after nack the next request must NOT reuse nextPageToken=page2") +} + func TestConnect_CallsMyselfAndStartsFromZeroCursor(t *testing.T) { mock := newMockJiraServer(t) From 91c249681fdcb005318da0f16d455f2e145fe501 Mon Sep 17 00:00:00 2001 From: Jonathan Chaput Date: Thu, 4 Jun 2026 09:35:13 -0400 Subject: [PATCH 3/6] jira: gate input on enterprise license to match sibling processor --- internal/impl/jira/input_jira.go | 4 ++++ internal/impl/jira/input_jira_integration_test.go | 3 +++ internal/impl/jira/input_jira_test.go | 8 ++++++++ 3 files changed, 15 insertions(+) diff --git a/internal/impl/jira/input_jira.go b/internal/impl/jira/input_jira.go index 81c9c7159e..f37735e6fd 100644 --- a/internal/impl/jira/input_jira.go +++ b/internal/impl/jira/input_jira.go @@ -32,6 +32,7 @@ import ( "github.com/redpanda-data/connect/v4/internal/httpclient" "github.com/redpanda-data/connect/v4/internal/impl/jira/jiraauth" "github.com/redpanda-data/connect/v4/internal/impl/jira/jirahttp" + "github.com/redpanda-data/connect/v4/internal/license" ) // cursorSchemaVersion is the on-disk format version stamped into the cursor @@ -413,6 +414,9 @@ func (p *pageState) allDispatched() bool { } func newReader(conf *service.ParsedConfig, mgr *service.Resources) (*reader, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } cfg, err := parseInputConfig(conf) if err != nil { return nil, err diff --git a/internal/impl/jira/input_jira_integration_test.go b/internal/impl/jira/input_jira_integration_test.go index 4ace3216c3..8b41f53389 100644 --- a/internal/impl/jira/input_jira_integration_test.go +++ b/internal/impl/jira/input_jira_integration_test.go @@ -28,6 +28,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/benthos/v4/public/service/integration" + + "github.com/redpanda-data/connect/v4/internal/license" ) // TestIntegration_JiraInput_FirstRunHasNoCursorPredicate exercises the jira @@ -77,6 +79,7 @@ jira: })) s, err := b.Build() require.NoError(t, err) + license.InjectTestService(s.Resources()) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/internal/impl/jira/input_jira_test.go b/internal/impl/jira/input_jira_test.go index d4c757679e..19b566e6ae 100644 --- a/internal/impl/jira/input_jira_test.go +++ b/internal/impl/jira/input_jira_test.go @@ -34,6 +34,8 @@ import ( _ "github.com/redpanda-data/benthos/v4/public/components/io" _ "github.com/redpanda-data/benthos/v4/public/components/pure" + + "github.com/redpanda-data/connect/v4/internal/license" ) // mockJiraServer is a configurable httptest.Server for jira API responses. @@ -94,6 +96,7 @@ memory: {} })) s, err := builder.Build() require.NoError(t, err) + license.InjectTestService(s.Resources()) return s, out } @@ -377,6 +380,7 @@ jira: })) s, err := builder.Build() require.NoError(t, err) + license.InjectTestService(s.Resources()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -466,6 +470,7 @@ jira: })) s, err := builder.Build() require.NoError(t, err) + license.InjectTestService(s.Resources()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -716,6 +721,7 @@ memory: {} })) s, err := builder.Build() require.NoError(t, err) + license.InjectTestService(s.Resources()) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -826,6 +832,7 @@ failing_cache_for_test: {} require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, _ *service.Message) error { return nil })) s, err := builder.Build() require.NoError(t, err) + license.InjectTestService(s.Resources()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -919,6 +926,7 @@ file: })) s, err := builder.Build() require.NoError(t, err) + license.InjectTestService(s.Resources()) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() go func() { _ = s.Run(ctx) }() From a8dda1805647bda64d10f1ffa0a1e2f676519c8d Mon Sep 17 00:00:00 2001 From: Jonathan Chaput Date: Fri, 12 Jun 2026 08:08:37 -0400 Subject: [PATCH 4/6] jira: dedup boundary issues, checkpoint per page, deprecate processor The `updated >=` cursor predicate re-matches boundary issues on every poll, so an idle project re-emitted them once per poll interval forever. The cursor now carries a pruned seen-set of emitted issue versions and suppresses duplicates while still emitting genuinely newer updates. Progress is persisted after every fully-acked page (the in-flight run's JQL is frozen so Jira's token/JQL pairing stays stable), making large backfills resumable mid-run. The jira processor is marked deprecated in favour of the input, and the input's version stamp is fixed to 4.96.0. --- CHANGELOG.md | 4 + .../modules/components/pages/inputs/jira.adoc | 6 +- .../components/pages/processors/jira.adoc | 8 +- internal/impl/jira/input_jira.go | 162 ++++++++++--- internal/impl/jira/input_jira_test.go | 226 ++++++++++++++++++ internal/impl/jira/processor_jira.go | 3 + internal/plugins/info.csv | 2 +- 7 files changed, 373 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3a39c817c..63a50fb0b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ All notable changes to this project will be documented in this file. - jira: New `jira` input component for streaming Jira issues, comments, or changelog entries via JQL with cursor-based incremental polling. +### Changed + +- jira: The `jira` processor is deprecated in favour of the new `jira` input; it remains available for enrichment and lookup style operations. + ## 4.94.1 - 2026-05-29 ### Fixed diff --git a/docs/modules/components/pages/inputs/jira.adoc b/docs/modules/components/pages/inputs/jira.adoc index aafa2fee29..977ea1feeb 100644 --- a/docs/modules/components/pages/inputs/jira.adoc +++ b/docs/modules/components/pages/inputs/jira.adoc @@ -25,7 +25,7 @@ component_type_dropdown::[] Streams Jira issues, comments, or changelog entries via JQL with incremental polling. -Introduced in version 4.95.0. +Introduced in version 4.96.0. [tabs] @@ -138,9 +138,9 @@ input: -- ====== -Periodically queries Jira's REST API using a JQL filter and emits one message per resource. The cursor (max issue `updated` timestamp) is persisted via the configured cache resource so progress survives restarts. +Periodically queries Jira's REST API using a JQL filter and emits one message per resource. The cursor (max issue `updated` timestamp, plus the set of issue versions already emitted at the boundary) is persisted via the configured cache resource after every fully-acknowledged page, so progress survives restarts — including mid-backfill — and boundary issues are not re-emitted on every poll. -Authentication uses API token (email + token) basic auth. +Authentication uses API token (email + token) basic auth. The `backoff` settings govern the adaptive backoff applied to 429 responses; retries of 502/503/504 responses use a fixed three-attempt policy. Each message body is the raw JSON of the resource. Metadata fields: diff --git a/docs/modules/components/pages/processors/jira.adoc b/docs/modules/components/pages/processors/jira.adoc index c50a6dd9ca..d2f8200535 100644 --- a/docs/modules/components/pages/processors/jira.adoc +++ b/docs/modules/components/pages/processors/jira.adoc @@ -1,6 +1,6 @@ = jira :type: processor -:status: stable +:status: deprecated :categories: ["Services"] @@ -23,6 +23,12 @@ component_type_dropdown::[] +[WARNING] +.Deprecated +==== +This component is deprecated in favour of the xref:components:inputs/jira.adoc[`jira` input], which streams issues, comments, and changelog entries with cursor-based incremental polling. The processor remains available for enrichment and lookup style operations. +==== + Queries Jira resources and returns structured data Introduced in version 4.68.0. diff --git a/internal/impl/jira/input_jira.go b/internal/impl/jira/input_jira.go index f37735e6fd..2dc770d8e0 100644 --- a/internal/impl/jira/input_jira.go +++ b/internal/impl/jira/input_jira.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "maps" "net/url" "slices" "strconv" @@ -37,7 +38,8 @@ import ( // cursorSchemaVersion is the on-disk format version stamped into the cursor // JSON; it is consumed by writeCursor when the input advances the cursor. -const cursorSchemaVersion = 1 +// Version 2 added the Seen map; version 1 cursors decode cleanly (Seen nil). +const cursorSchemaVersion = 2 const ( resourceIssues = "issues" @@ -52,17 +54,39 @@ var validResources = []string{resourceIssues, resourceComments, resourceChangelo // Unknown JSON fields are ignored on decode for forward compatibility. type cursor struct { Updated time.Time `json:"updated"` - Version int `json:"v"` + // Seen maps issue keys to the updated timestamp at which they were last + // emitted, for issues inside the window the next JQL query re-matches + // (Updated - overlap, minus JQL's minute truncation). Because the cursor + // predicate is `updated >=`, boundary issues match again on every poll; + // this set suppresses re-emission of issue versions that were already + // delivered and acked. + Seen map[string]time.Time `json:"seen,omitempty"` + Version int `json:"v"` +} + +// pruneSeen drops seen entries that the next JQL query can no longer +// re-match: anything older than cur - overlap, with one extra minute of slack +// because the JQL threshold is truncated to minute precision. +func pruneSeen(seen map[string]time.Time, cur time.Time, overlap time.Duration) { + if cur.IsZero() { + return + } + threshold := cur.Add(-overlap).Add(-time.Minute) + for k, v := range seen { + if v.Before(threshold) { + delete(seen, k) + } + } } func newJiraInputConfigSpec() *service.ConfigSpec { spec := service.NewConfigSpec(). Categories("Services"). - Version("4.95.0"). + Version("4.96.0"). Summary("Streams Jira issues, comments, or changelog entries via JQL with incremental polling."). - Description(`Periodically queries Jira's REST API using a JQL filter and emits one message per resource. The cursor (max issue ` + "`updated`" + ` timestamp) is persisted via the configured cache resource so progress survives restarts. + Description(`Periodically queries Jira's REST API using a JQL filter and emits one message per resource. The cursor (max issue ` + "`updated`" + ` timestamp, plus the set of issue versions already emitted at the boundary) is persisted via the configured cache resource after every fully-acknowledged page, so progress survives restarts — including mid-backfill — and boundary issues are not re-emitted on every poll. -Authentication uses API token (email + token) basic auth. +Authentication uses API token (email + token) basic auth. The ` + "`backoff`" + ` settings govern the adaptive backoff applied to 429 responses; retries of 502/503/504 responses use a fixed three-attempt policy. Each message body is the raw JSON of the resource. Metadata fields: @@ -259,12 +283,15 @@ type reader struct { // nextToken is the opaque pagination cursor returned by Jira. Empty // when not in a multi-page run. nextToken string + // runJQL is the JQL string for the current pagination run, frozen when + // the run starts (nextToken empty). Jira's cursor pagination requires + // the JQL to remain stable across the token sequence, so mid-run cursor + // persistence must not leak into the query until the next run starts. + runJQL string // runMaxUpdated accumulates max issue.updated across the current - // pagination run. The cursor is only persisted at the end of the run - // (when nextPageToken is empty) because Jira's cursor pagination - // requires JQL to remain stable across the token sequence — advancing - // the cursor mid-run would mutate the JQL while still passing the - // previous page's opaque token. + // pagination run. Progress (cursor + seen set) is persisted after every + // fully-acked page so a restart mid-backfill resumes from the last acked + // page; runJQL keeps the in-flight query stable regardless. runMaxUpdated time.Time } @@ -331,7 +358,10 @@ type pageState struct { outstandingAcks atomic.Int32 pageHasNack atomic.Bool pageMaxUpdated time.Time - done chan struct{} + // pageSeen records (issue key -> updated) for every issue emitted in + // this page; merged into cursor.Seen once the page is fully acked. + pageSeen map[string]time.Time + done chan struct{} } // nextBufferedMessage returns the next pending message from the page buffer. @@ -361,12 +391,13 @@ func (p *pageState) isEmpty() bool { // outstandingAcks is pre-loaded to len(msgs) so the ack callback only needs // to decrement. An empty page has its done channel closed immediately so the // Read loop never blocks waiting for acks that will never fire. -func (p *pageState) load(msgs []*service.Message, maxUpdated time.Time) { +func (p *pageState) load(msgs []*service.Message, maxUpdated time.Time, seen map[string]time.Time) { p.mu.Lock() defer p.mu.Unlock() p.buffer = msgs p.bufferIdx = 0 p.pageMaxUpdated = maxUpdated + p.pageSeen = seen p.done = make(chan struct{}) p.outstandingAcks.Store(int32(len(msgs))) if len(msgs) == 0 { @@ -384,6 +415,7 @@ func (p *pageState) reset() { p.bufferIdx = 0 p.pageHasNack.Store(false) p.pageMaxUpdated = time.Time{} + p.pageSeen = nil p.done = nil } @@ -394,6 +426,13 @@ func (p *pageState) maxUpdated() time.Time { return p.pageMaxUpdated } +// seen returns the (issue key -> updated) map recorded for the current page. +func (p *pageState) seen() map[string]time.Time { + p.mu.Lock() + defer p.mu.Unlock() + return p.pageSeen +} + // currentDone returns the done channel for the current page under the page // mutex so callers don't race load() / reset() reassigning the field. func (p *pageState) currentDone() chan struct{} { @@ -487,6 +526,11 @@ func (r *reader) readCursor(ctx context.Context) (cursor, error) { if inner != nil { return cursor{}, inner } + if c.Version > cursorSchemaVersion { + // Written by a newer binary; the fields we understand still decode, + // so resume from them best-effort rather than re-backfilling. + r.log.Warnf("cursor schema version %d is newer than supported %d; resuming best-effort from its updated timestamp", c.Version, cursorSchemaVersion) + } return c, nil } @@ -542,6 +586,12 @@ func (r *reader) Read(ctx context.Context) (*service.Message, service.AckFunc, e } if r.page.isEmpty() { + if r.hasNextToken() { + // Empty page mid-run (every issue deduped, or Jira returned + // an empty page with a continuation token): keep chaining + // the token without sleeping the full poll interval. + continue + } // Caught up; sleep before polling again. Do not return // ErrNotConnected — that would trigger a reconnect cycle. select { @@ -595,19 +645,29 @@ func (r *reader) onPageDrained(ctx context.Context) { if pageMax.After(r.runMaxUpdated) { r.runMaxUpdated = pageMax } - // Only advance the persisted cursor at the end of a pagination run. - // Mutating the JQL mid-run while reusing nextPageToken is unsound - - // Jira's cursor pagination expects the JQL to remain stable across - // the token sequence. - if r.nextToken == "" { - if !r.runMaxUpdated.IsZero() && r.runMaxUpdated.After(r.currentCursor().Updated) { - newCur := cursor{Updated: r.runMaxUpdated, Version: cursorSchemaVersion} - r.setCursor(newCur) - if err := r.writeCursor(ctx, newCur); err != nil { - r.log.Warnf("writing cursor: %v", err) - } - r.log.Infof("advanced cursor to %s after pagination run", newCur.Updated.Format(time.RFC3339)) + // Persist progress after every fully-acked page, not just at the end of + // the pagination run, so a restart mid-backfill resumes from the last + // acked page. Jira's cursor pagination requires the JQL to remain stable + // across the token sequence; runJQL freezes the in-flight query at run + // start, so advancing the cursor here cannot mutate it mid-run. + cur := r.currentCursor() + newUpdated := cur.Updated + if r.runMaxUpdated.After(newUpdated) { + newUpdated = r.runMaxUpdated + } + if pageSeen := r.page.seen(); len(pageSeen) > 0 || newUpdated.After(cur.Updated) { + seen := make(map[string]time.Time, len(cur.Seen)+len(pageSeen)) + maps.Copy(seen, cur.Seen) + maps.Copy(seen, pageSeen) + pruneSeen(seen, newUpdated, r.cfg.cursorOverlap) + newCur := cursor{Updated: newUpdated, Seen: seen, Version: cursorSchemaVersion} + r.setCursor(newCur) + if err := r.writeCursor(ctx, newCur); err != nil { + r.log.Warnf("writing cursor: %v", err) } + r.log.Debugf("checkpointed cursor at %s (%d boundary entries) after acked page", newCur.Updated.Format(time.RFC3339), len(seen)) + } + if r.nextToken == "" { r.runMaxUpdated = time.Time{} } r.page.reset() @@ -627,15 +687,27 @@ func (r *reader) fetchNextPage(ctx context.Context) error { return fmt.Errorf("decoding jira page: %w", err) } + cur := r.currentCursor() msgs := make([]*service.Message, 0, len(page.Issues)) + pageSeen := make(map[string]time.Time, len(page.Issues)) var maxUpdated time.Time for _, raw := range page.Issues { var meta rawIssue if err := json.Unmarshal(raw, &meta); err != nil { return fmt.Errorf("decoding issue: %w", err) } - if meta.Fields.Updated.After(maxUpdated) { - maxUpdated = meta.Fields.Updated.Time + upd := meta.Fields.Updated.Time + if !upd.IsZero() { + // The `updated >=` predicate re-matches boundary issues on every + // poll; skip versions that were already emitted and acked. A zero + // updated (field excluded by config) disables dedup for the issue + // rather than risking suppression of a genuinely new version. + if prev, ok := cur.Seen[meta.Key]; ok && !upd.After(prev) { + continue + } + } + if upd.After(maxUpdated) { + maxUpdated = upd } switch r.cfg.resource { @@ -654,9 +726,12 @@ func (r *reader) fetchNextPage(ctx context.Context) error { } msgs = append(msgs, children...) } + if !upd.IsZero() { + pageSeen[meta.Key] = upd + } } - r.page.load(msgs, maxUpdated) + r.page.load(msgs, maxUpdated, pageSeen) r.runMu.Lock() r.nextToken = page.NextPageToken r.runMu.Unlock() @@ -773,9 +848,26 @@ func (r *reader) buildSearchURL() (*url.URL, error) { return nil, err } q := u.Query() - q.Set("jql", r.buildJQL()) - q.Set("fields", strings.Join(r.cfg.fields, ",")) - expand := r.cfg.expand + // Freeze the JQL at the start of a pagination run: Jira's nextPageToken + // is only valid for the exact JQL it was issued against, and the cursor + // may now advance between pages of the same run. + r.runMu.Lock() + tok := r.nextToken + if tok == "" { + r.runJQL = r.buildJQL() + } + jql := r.runJQL + r.runMu.Unlock() + q.Set("jql", jql) + fields := slices.Clone(r.cfg.fields) + if !slices.Contains(fields, "*all") { + // Cursor advancement and metadata depend on these two fields; keep + // them present even when the user narrows the field list. + fields = appendUnique(fields, "updated") + fields = appendUnique(fields, "project") + } + q.Set("fields", strings.Join(fields, ",")) + expand := slices.Clone(r.cfg.expand) if r.cfg.resource == resourceChangelog { expand = appendUnique(expand, "changelog") } @@ -783,9 +875,6 @@ func (r *reader) buildSearchURL() (*url.URL, error) { q.Set("expand", strings.Join(expand, ",")) } q.Set("maxResults", strconv.Itoa(r.cfg.pageSize)) - r.runMu.Lock() - tok := r.nextToken - r.runMu.Unlock() if tok != "" { q.Set("nextPageToken", tok) } @@ -793,6 +882,13 @@ func (r *reader) buildSearchURL() (*url.URL, error) { return u, nil } +// hasNextToken reports whether a pagination run is in flight. +func (r *reader) hasNextToken() bool { + r.runMu.Lock() + defer r.runMu.Unlock() + return r.nextToken != "" +} + func (r *reader) buildJQL() string { parts := []string{} if r.cfg.jql != "" { diff --git a/internal/impl/jira/input_jira_test.go b/internal/impl/jira/input_jira_test.go index 19b566e6ae..7540def687 100644 --- a/internal/impl/jira/input_jira_test.go +++ b/internal/impl/jira/input_jira_test.go @@ -21,6 +21,8 @@ import ( "fmt" "net/http" "net/http/httptest" + "os" + "path/filepath" "strings" "sync" "sync/atomic" @@ -956,3 +958,227 @@ file: assert.Contains(t, firstJQLOfSecondRun, "updated >=", "first JQL of second run must include cursor predicate loaded from cache") } + +// TestIdle_BoundaryIssueNotReemittedEveryPoll pins the seen-set dedup: an idle +// Jira instance keeps returning the same boundary issue on every poll because +// the cursor predicate is `updated >=`, so without dedup the input would emit +// a duplicate of every boundary issue once per poll interval, forever. +func TestIdle_BoundaryIssueNotReemittedEveryPoll(t *testing.T) { + mock := newMockJiraServer(t) + var calls atomic.Int32 + var secondJQL atomic.Value + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + if calls.Add(1) == 2 { + secondJQL.Store(r.URL.Query().Get("jql")) + } + // Same issue on every poll, exactly as real Jira behaves while + // no new updates arrive. + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + default: + http.NotFound(w, r) + } + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state, overlap: 0s} +`, mock.URL) + + s, out := buildStream(t, yaml) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + select { + case <-out: + case <-ctx.Done(): + t.Fatal("no first message") + } + // The second poll fires immediately (page 1 was non-empty); its response + // contains the same issue version, which must be suppressed. + require.Eventually(t, func() bool { return calls.Load() >= 2 }, 3*time.Second, 25*time.Millisecond) + select { + case m := <-out: + t.Fatalf("boundary issue re-emitted on idle poll: %v", m) + case <-time.After(300 * time.Millisecond): + } + require.NoError(t, s.StopWithin(2*time.Second)) + + jql, _ := secondJQL.Load().(string) + assert.Contains(t, jql, "updated >=", "second poll must carry the cursor predicate") +} + +// TestUpdatedIssue_IsReemitted is the counterpart to the idle dedup test: when +// a boundary issue is genuinely updated again (newer `updated` timestamp), the +// new version must be emitted despite the issue key being in the seen set. +func TestUpdatedIssue_IsReemitted(t *testing.T) { + mock := newMockJiraServer(t) + var calls atomic.Int32 + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + if calls.Add(1) == 1 { + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}]}`)) + } else { + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:05:00.000+0000"}}]}`)) + } + default: + http.NotFound(w, r) + } + } + + yaml := fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state, overlap: 0s} +`, mock.URL) + + s, out := buildStream(t, yaml) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + updated := []string{} + for len(updated) < 2 { + select { + case m := <-out: + md := m["meta"].(map[string]any) + updated = append(updated, md["jira_updated"].(string)) + case <-ctx.Done(): + t.Fatalf("only got %d messages: %v", len(updated), updated) + } + } + require.NoError(t, s.StopWithin(2*time.Second)) + assert.Equal(t, []string{"2026-06-01T10:00:00Z", "2026-06-01T10:05:00Z"}, updated) +} + +// TestCursor_PersistedAfterEachAckedPage pins mid-backfill restartability: the +// cursor checkpoint must be written to the cache after page 1 is acked and +// BEFORE the page-2 request is issued, so a restart mid-run resumes from the +// last acked page instead of the beginning of the backfill. The write and the +// next fetch happen on the same goroutine, so observing the cache from the +// page-2 handler is deterministic. +func TestCursor_PersistedAfterEachAckedPage(t *testing.T) { + mock := newMockJiraServer(t) + cacheDir := t.TempDir() + var midRunCursor atomic.Value + mock.handler = func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/rest/api/3/myself": + _, _ = w.Write([]byte(`{}`)) + case "/rest/api/3/search/jql": + if r.URL.Query().Get("nextPageToken") == "page2" { + var content strings.Builder + entries, _ := os.ReadDir(cacheDir) + for _, e := range entries { + b, _ := os.ReadFile(filepath.Join(cacheDir, e.Name())) + content.Write(b) + } + midRunCursor.Store(content.String()) + _, _ = w.Write([]byte(`{"issues":[{"id":"2","key":"PROJ-2","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:05:00.000+0000"}}]}`)) + return + } + _, _ = w.Write([]byte(`{"issues":[{"id":"1","key":"PROJ-1","fields":{"project":{"key":"PROJ"},"updated":"2026-06-01T10:00:00.000+0000"}}],"nextPageToken":"page2"}`)) + default: + http.NotFound(w, r) + } + } + + builder := service.NewStreamBuilder() + require.NoError(t, builder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, builder.AddCacheYAML(fmt.Sprintf(` +label: jira_state +file: + directory: %s +`, cacheDir))) + require.NoError(t, builder.AddInputYAML(fmt.Sprintf(` +jira: + base_url: %q + auth: {email: u@x, api_token: tok} + resource: issues + poll_interval: 10s + cursor: {cache: jira_state, overlap: 0s} +`, mock.URL))) + got := make(chan string, 8) + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, msg *service.Message) error { + id, _ := msg.MetaGet("jira_id") + got <- id + return nil + })) + s, err := builder.Build() + require.NoError(t, err) + license.InjectTestService(s.Resources()) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { _ = s.Run(ctx) }() + + keys := []string{} + for len(keys) < 2 { + select { + case k := <-got: + keys = append(keys, k) + case <-ctx.Done(): + t.Fatalf("only got %v", keys) + } + } + require.NoError(t, s.StopWithin(2*time.Second)) + + assert.Equal(t, []string{"PROJ-1", "PROJ-2"}, keys) + cur, _ := midRunCursor.Load().(string) + require.NotEmpty(t, cur, "cursor must be on disk before the page-2 request is issued") + assert.Contains(t, cur, "2026-06-01T10:00:00Z", "mid-run checkpoint must carry page 1's max updated") + assert.Contains(t, cur, "PROJ-1", "mid-run checkpoint must carry page 1's seen entries") +} + +func TestPruneSeen(t *testing.T) { + now := time.Date(2026, 6, 1, 10, 10, 0, 0, time.UTC) + seen := map[string]time.Time{ + "OLD-1": now.Add(-10 * time.Minute), + "EDGE-1": now.Add(-90 * time.Second), // inside cursor - overlap(1m) - 1m slack + "NEW-1": now, + } + pruneSeen(seen, now, time.Minute) + assert.NotContains(t, seen, "OLD-1") + assert.Contains(t, seen, "EDGE-1") + assert.Contains(t, seen, "NEW-1") + + // A zero cursor (first run) must not prune anything. + seen2 := map[string]time.Time{"K-1": now} + pruneSeen(seen2, time.Time{}, time.Minute) + assert.Contains(t, seen2, "K-1") +} + +func TestCursor_SeenRoundtripAndV1Compat(t *testing.T) { + orig := cursor{ + Updated: time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC), + Seen: map[string]time.Time{"PROJ-1": time.Date(2026, 1, 2, 3, 4, 0, 0, time.UTC)}, + Version: cursorSchemaVersion, + } + b, err := json.Marshal(orig) + require.NoError(t, err) + var decoded cursor + require.NoError(t, json.Unmarshal(b, &decoded)) + assert.True(t, orig.Updated.Equal(decoded.Updated)) + require.Contains(t, decoded.Seen, "PROJ-1") + assert.True(t, orig.Seen["PROJ-1"].Equal(decoded.Seen["PROJ-1"])) + + // A v1 cursor (no seen field) must decode cleanly with a nil seen map. + var v1 cursor + require.NoError(t, json.Unmarshal([]byte(`{"updated":"2026-01-02T03:04:05Z","v":1}`), &v1)) + assert.Nil(t, v1.Seen) + assert.Equal(t, 1, v1.Version) +} diff --git a/internal/impl/jira/processor_jira.go b/internal/impl/jira/processor_jira.go index 0869c71e38..a8d3768f73 100644 --- a/internal/impl/jira/processor_jira.go +++ b/internal/impl/jira/processor_jira.go @@ -50,9 +50,12 @@ func newJiraProcessorConfigSpec() *service.ConfigSpec { spec := service.NewConfigSpec(). Categories("Services"). Version("4.68.0"). + Deprecated(). Summary("Queries Jira resources and returns structured data"). Description(`Executes Jira API queries based on input messages and returns structured results. The processor handles pagination, retries, and field expansion automatically. +This processor is deprecated in favour of the `+"`jira`"+` input, which streams issues, comments, and changelog entries with cursor-based incremental polling. The processor remains available for enrichment and lookup style operations. + Supports querying the following Jira resources: - Issues (JQL queries) - Issue transitions diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index ec9ba22ac0..53d7ea1b68 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -121,7 +121,7 @@ insert_part ,processor ,insert_part ,certified ,n jaeger ,tracer ,jaeger ,community ,n ,n ,n ,cloud uses a managed tracing integration javascript ,processor ,javascript ,certified ,n ,n ,n ,security: arbitrary code execution jira ,input ,jira ,certified ,n ,y ,n , -jira ,processor ,jira ,certified ,n ,y ,n , +jira ,processor ,jira ,certified ,y ,y ,n , jmespath ,processor ,JMESPath ,certified ,n ,y ,y , jq ,processor ,jq ,certified ,n ,y ,y , json_api ,metric ,json_api ,certified ,n ,n ,n ,cloud uses a managed metrics integration From d71f97f49a59d5efdad01cb0efffa1bb3154c688 Mon Sep 17 00:00:00 2001 From: Jonathan Chaput Date: Tue, 16 Jun 2026 10:34:54 -0400 Subject: [PATCH 5/6] test fix --- cmd/tools/docs_gen/schema_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/tools/docs_gen/schema_test.go b/cmd/tools/docs_gen/schema_test.go index 22b310e56f..34df58b1a7 100644 --- a/cmd/tools/docs_gen/schema_test.go +++ b/cmd/tools/docs_gen/schema_test.go @@ -45,7 +45,10 @@ func TestComponentExamples(t *testing.T) { require.NoError(t, err) for _, l := range lints { // TODO: Remove this once kafka is out of the benthos repo examples - if !strings.Contains(l.What, "component kafka is deprecated") { + // The jira processor is deprecated in favour of the jira input but + // retains examples for its still-supported enrichment use case. + if !strings.Contains(l.What, "component kafka is deprecated") && + !strings.Contains(l.What, "component jira is deprecated") { t.Error(l.Error()) } } From 6392dee260305ac96887e3ce0a44d60a0e6006b6 Mon Sep 17 00:00:00 2001 From: Jonathan Chaput Date: Fri, 19 Jun 2026 11:08:23 -0400 Subject: [PATCH 6/6] jira: render JQL cursor predicate in the account timezone Jira evaluates JQL date literals in the requesting account's timezone, not UTC, and there is no syntax to attach an offset to a literal. Rendering the `updated >=` threshold in UTC therefore shifted the effective instant by the account's offset; for accounts behind UTC this opened a silent data-loss window the default overlap could not absorb. Connect already calls /myself for auth validation, so read its `timeZone` field and render the threshold in that location (falling back to UTC with a warning when absent/unknown). Also addresses two minor review nits: - trim a trailing slash from base_url so request URLs don't get a double slash. - back off briefly when the server returns an empty page that still carries a continuation token, so a misbehaving server cannot drive a tight request loop; fully-deduped issue-bearing pages still chain immediately. --- internal/impl/jira/input_jira.go | 71 ++++++++++++++++++++++++--- internal/impl/jira/input_jira_test.go | 63 ++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 6 deletions(-) diff --git a/internal/impl/jira/input_jira.go b/internal/impl/jira/input_jira.go index 2dc770d8e0..80af1d99e1 100644 --- a/internal/impl/jira/input_jira.go +++ b/internal/impl/jira/input_jira.go @@ -161,6 +161,9 @@ func parseInputConfig(conf *service.ParsedConfig) (*inputCfg, error) { if err != nil { return nil, err } + // Every request URL is built as BaseURL + "/rest/api/3/...", so a trailing + // slash on base_url would yield a double slash. Trim it defensively. + httpCfg.BaseURL = strings.TrimRight(httpCfg.BaseURL, "/") email, err := conf.FieldString("auth", "email") if err != nil { @@ -273,6 +276,17 @@ type reader struct { cur cursor connected atomic.Bool page *pageState + // accountLoc is the timezone of the authenticated Jira account, read from + // /myself in Connect. JQL evaluates date literals in this timezone, so the + // cursor predicate is rendered in it (not UTC). Defaults to UTC if the + // account timezone is absent or unknown to the runtime. Set once in Connect + // before any Read, so it needs no synchronisation. + accountLoc *time.Location + // lastRawIssueCount is the number of issues in the most recent raw page + // response (before dedup). Distinguishes a genuinely empty server page from + // a full page whose issues were all deduped, so Read only backs off on the + // former. Read-goroutine only. + lastRawIssueCount int // runMu serialises onPageDrained and protects nextToken / runMaxUpdated. // Both the Read goroutine (via waitForPageAcks) and the Close goroutine @@ -480,14 +494,18 @@ func (r *reader) Connect(ctx context.Context) error { } r.client = client - // Validate auth via /myself. + // Validate auth via /myself, and capture the account timezone: JQL date + // literals are evaluated in the requesting account's timezone, so the + // cursor predicate must be rendered in it rather than UTC. myselfURL, err := url.Parse(r.cfg.httpCfg.BaseURL + "/rest/api/3/myself") if err != nil { return fmt.Errorf("invalid base_url: %w", err) } - if _, err := r.callAPI(ctx, myselfURL); err != nil { + myselfBody, err := r.callAPI(ctx, myselfURL) + if err != nil { return fmt.Errorf("authenticating with jira: %w", err) } + r.accountLoc = accountLocationFromMyself(myselfBody, r.log) // Load cursor from cache. c, err := r.readCursor(ctx) @@ -587,9 +605,18 @@ func (r *reader) Read(ctx context.Context) (*service.Message, service.AckFunc, e if r.page.isEmpty() { if r.hasNextToken() { - // Empty page mid-run (every issue deduped, or Jira returned - // an empty page with a continuation token): keep chaining - // the token without sleeping the full poll interval. + // Empty page mid-run with a continuation token. A full page + // whose issues were all deduped is genuine progress, so chain + // the token immediately. But a server returning an empty raw + // page alongside a token would otherwise be polled in a tight + // request loop, so back off briefly in that case. + if r.lastRawIssueCount == 0 { + select { + case <-time.After(emptyPagePollBackoff): + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } continue } // Caught up; sleep before polling again. Do not return @@ -731,6 +758,7 @@ func (r *reader) fetchNextPage(ctx context.Context) error { } } + r.lastRawIssueCount = len(page.Issues) r.page.load(msgs, maxUpdated, pageSeen) r.runMu.Lock() r.nextToken = page.NextPageToken @@ -889,6 +917,28 @@ func (r *reader) hasNextToken() bool { return r.nextToken != "" } +// accountLocationFromMyself extracts the account timezone from a /myself +// response so the JQL cursor predicate can be rendered in it. Jira evaluates +// JQL date literals in the requesting account's timezone and has no syntax to +// attach an offset to a literal, so rendering in UTC would shift the effective +// threshold by the account's offset. Falls back to UTC (with a warning) when +// the field is absent or the zone is unknown to the runtime. +func accountLocationFromMyself(body []byte, log *service.Logger) *time.Location { + var myself struct { + TimeZone string `json:"timeZone"` + } + if err := json.Unmarshal(body, &myself); err != nil || myself.TimeZone == "" { + log.Warn("could not determine Jira account timezone from /myself; rendering JQL cursor in UTC, which may cause a data-loss window if the account is not configured to UTC") + return time.UTC + } + loc, err := time.LoadLocation(myself.TimeZone) + if err != nil { + log.Warnf("Jira account timezone %q is not known to this runtime; rendering JQL cursor in UTC, which may cause a data-loss window: %v", myself.TimeZone, err) + return time.UTC + } + return loc +} + func (r *reader) buildJQL() string { parts := []string{} if r.cfg.jql != "" { @@ -896,8 +946,12 @@ func (r *reader) buildJQL() string { } cur := r.currentCursor() if !cur.Updated.IsZero() { + loc := r.accountLoc + if loc == nil { + loc = time.UTC + } threshold := cur.Updated.Add(-r.cfg.cursorOverlap) - parts = append(parts, fmt.Sprintf(`updated >= "%s"`, threshold.UTC().Format("2006-01-02 15:04"))) + parts = append(parts, fmt.Sprintf(`updated >= "%s"`, threshold.In(loc).Format("2006-01-02 15:04"))) } jql := strings.Join(parts, " AND ") if jql != "" { @@ -919,6 +973,11 @@ func appendUnique(s []string, v string) []string { // shutdown ctx would just stall — the overlap window covers the replay. const closeAckDrainTimeout = 500 * time.Millisecond +// emptyPagePollBackoff bounds how fast Read re-requests when the server returns +// an empty page that still carries a continuation token, preventing a tight +// request loop. Issue-bearing pages (even fully deduped) are not subject to it. +const emptyPagePollBackoff = time.Second + // Close drains any in-flight page acks so the cursor can advance for the last // page before tearing down, then resets the connected flag so a future Connect // re-reads the cursor from the cache. The wait is bounded by both the passed diff --git a/internal/impl/jira/input_jira_test.go b/internal/impl/jira/input_jira_test.go index 7540def687..708bac3509 100644 --- a/internal/impl/jira/input_jira_test.go +++ b/internal/impl/jira/input_jira_test.go @@ -149,6 +149,69 @@ cursor: assert.Contains(t, err.Error(), "worklogs") } +func TestInputConfig_TrimsTrailingSlashFromBaseURL(t *testing.T) { + yaml := ` +base_url: https://example.atlassian.net/ +auth: + email: user@example.com + api_token: secret +cursor: + cache: jira_state +` + confSpec := newJiraInputConfigSpec() + parsed, err := confSpec.ParseYAML(yaml, nil) + require.NoError(t, err) + + cfg, err := parseInputConfig(parsed) + require.NoError(t, err) + assert.Equal(t, "https://example.atlassian.net", cfg.httpCfg.BaseURL) +} + +func TestAccountLocationFromMyself(t *testing.T) { + log := service.MockResources().Logger() + ny, err := time.LoadLocation("America/New_York") + require.NoError(t, err) + + cases := []struct { + name string + body string + want *time.Location + }{ + {"valid timezone", `{"timeZone":"America/New_York"}`, ny}, + {"missing field falls back to UTC", `{"accountId":"abc"}`, time.UTC}, + {"empty timezone falls back to UTC", `{"timeZone":""}`, time.UTC}, + {"unknown zone falls back to UTC", `{"timeZone":"Mars/Phobos"}`, time.UTC}, + {"invalid json falls back to UTC", `not json`, time.UTC}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := accountLocationFromMyself([]byte(tc.body), log) + assert.Equal(t, tc.want.String(), got.String()) + }) + } +} + +func TestBuildJQL_RendersThresholdInAccountTimezone(t *testing.T) { + ny, err := time.LoadLocation("America/New_York") + require.NoError(t, err) + + r := &reader{cfg: &inputCfg{cursorOverlap: 0}, accountLoc: ny} + // 2026-01-15T12:00:00Z is 07:00 on the same day in New York (UTC-5 in winter). + r.setCursor(cursor{Updated: time.Date(2026, 1, 15, 12, 0, 0, 0, time.UTC), Version: cursorSchemaVersion}) + + jql := r.buildJQL() + assert.Contains(t, jql, `updated >= "2026-01-15 07:00"`) + assert.NotContains(t, jql, "12:00", "threshold must be rendered in the account timezone, not UTC") +} + +func TestBuildJQL_DefaultsToUTCWhenNoAccountLocation(t *testing.T) { + r := &reader{cfg: &inputCfg{cursorOverlap: 0}} // accountLoc nil + r.setCursor(cursor{Updated: time.Date(2026, 1, 15, 12, 0, 0, 0, time.UTC), Version: cursorSchemaVersion}) + + jql := r.buildJQL() + assert.Contains(t, jql, `updated >= "2026-01-15 12:00"`) +} + func TestCursor_JSONRoundtrip(t *testing.T) { original := cursor{Updated: time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC), Version: 1} b, err := json.Marshal(original)