jira: add input component for streaming issues, comments, and changelog#4484
jira: add input component for streaming issues, comments, and changelog#4484squiidz wants to merge 7 commits into
Conversation
Adds a `jira` input that streams Jira REST API events into Connect
pipelines via JQL with cursor-based incremental polling. The cursor
(max issue.updated timestamp) is persisted to a cache resource so
progress survives restarts.
Available in both the Cloud and Self-Hosted distributions. Authenticates
via API token (email + token). Supports resource={issues,comments,changelog}.
The existing `jira` processor (enrichment lookups) is unchanged.
Shared HTTP/auth setup is extracted into a new internal/impl/jira/jiraauth
sub-package consumed by both the processor and the input.
Limitations (v1): no OAuth, no worklogs resource, per-issue comment and
changelog pagination is limited to the first response page (truncation
logged and counted via the jira_input_child_truncated_total metric).
| if r.page.pageHasNack.Load() { | ||
| // don't advance cursor; reset for the refetch on next Read | ||
| r.page.reset() | ||
| return | ||
| } |
There was a problem hiding this comment.
Nacked messages can be permanently lost during multi-page pagination.
On nack, onPageDrained calls r.page.reset() and returns without advancing the cursor. The comment states the intent is to "reset for the refetch on next Read", but r.nextToken is not reset here. In fetchNextPage, r.nextToken was already set to the current page's NextPageToken (pointing at the next page). So the subsequent Read → fetchNextPage fetches the next page rather than re-fetching the nacked page.
Because issues are ordered updated ASC, later pages have higher updated values. Once a subsequent page is acked, the cursor advances past the nacked page's range, and on the next poll cycle the updated >= cursor predicate excludes those records entirely — the nacked messages are never re-read.
Consider resetting r.nextToken (and restarting pagination from the cursor) on the nack path, or otherwise ensuring the nacked page is actually re-fetched. See onPageDrained.
| q.Set("jql", r.buildJQL()) | ||
| q.Set("fields", strings.Join(r.cfg.fields, ",")) | ||
| expand := r.cfg.expand | ||
| if r.cfg.resource == resourceChangelog { | ||
| expand = appendUnique(expand, "changelog") | ||
| } | ||
| if len(expand) > 0 { | ||
| q.Set("expand", strings.Join(expand, ",")) | ||
| } | ||
| q.Set("maxResults", strconv.Itoa(r.cfg.pageSize)) | ||
| if r.nextToken != "" { | ||
| q.Set("nextPageToken", r.nextToken) | ||
| } |
There was a problem hiding this comment.
The JQL filter changes between paginated requests while nextPageToken is reused.
onPageDrained runs after every page's acks settle, so the cursor advances mid-pagination: page 1 is fetched with the old cursor, then after it is acked the cursor moves to page 1's maxUpdated, and fetchNextPage for page 2 rebuilds the query here with a new updated >= cursor predicate (via buildJQL) while still passing the previous page's nextPageToken.
Jira's /rest/api/3/search/jql cursor pagination expects the JQL to remain stable across the page sequence — the opaque nextPageToken encodes a position in the original result set. Changing the JQL while reusing the token is unsound and can fail or return inconsistent results for any query exceeding page_size. The existing test passes only because the mock server doesn't validate JQL/token consistency.
Consider advancing the cursor only after the full pagination run completes (when nextPageToken is empty), rather than after each individual page. See buildSearchURL.
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| //go:build integration |
There was a problem hiding this comment.
Integration tests in this project should not use build tags — the test patterns state: "Do not use build tags. Use integration.CheckSkip(t) at the start of every integration test function." This file already calls integration.CheckSkip(t), so the //go:build integration tag is both redundant and against convention. Consider removing it so the test is gated solely by CheckSkip.
|
Commits Review
|
onPageDrained was advancing the cursor after each acked page and not resetting r.nextToken on nack. This caused two correctness bugs: - Nacked records on a multi-page response were never re-read: the next fetch reused the next-page token, skipping past the nacked page, and a subsequent ack advanced the cursor past the nacked range entirely. - The JQL predicate changed between paginated requests while the opaque nextPageToken was reused, violating Jira's cursor-pagination contract (JQL must remain stable across the token sequence). Accumulate max issue.updated across the run and only advance the persisted cursor when nextPageToken is empty. On nack, reset r.nextToken and the run accumulator so the next fetch restarts the pagination run from the current cursor. Also drop the `//go:build integration` tag from the integration test per project convention (gating via integration.CheckSkip is enough).
| func newReader(conf *service.ParsedConfig, mgr *service.Resources) (*reader, error) { | ||
| cfg, err := parseInputConfig(conf) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return &reader{ | ||
| cfg: cfg, | ||
| mgr: mgr, | ||
| log: mgr.Logger(), | ||
| page: &pageState{}, | ||
| cacheSetErrors: mgr.Metrics().NewCounter("jira_input_cache_set_errors_total"), | ||
| childTruncated: mgr.Metrics().NewCounter("jira_input_child_truncated_total", "resource"), | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
The new jira input constructor does not enforce an enterprise license, but its sibling jira processor in this same package gates on one as its very first step:
func newJiraProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*jiraProcessor, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}
...(see processor_jira.go#L109-L112)
Both components are classified identically in info.csv (certified, cloud=y) and live in the same package, yet newReader has no license.CheckRunningEnterprise(mgr) call. In the full self-hosted redpanda-connect binary this means the jira input runs without an enterprise license while the jira processor refuses to. If the Jira connector family is meant to be enterprise-gated, the input is missing this check; please confirm the intended licensing and add the check here if it should mirror the processor.
|
Commits Review
|
|
Commits LGTM Review Reviewed the new LGTM |
The `updated >=` cursor predicate re-matches boundary issues on every poll, so an idle project re-emitted them once per poll interval forever. The cursor now carries a pruned seen-set of emitted issue versions and suppresses duplicates while still emitting genuinely newer updates. Progress is persisted after every fully-acked page (the in-flight run's JQL is frozen so Jira's token/JQL pairing stays stable), making large backfills resumable mid-run. The jira processor is marked deprecated in favour of the input, and the input's version stamp is fixed to 4.96.0.
|
Commits Message format and quality are good across all commits (
(The iterative fix commits Review New LGTM — no high-signal issues found.
|
|
Commits
The remaining Review Reviewed the new Checked in particular: the ack/ LGTM |
Jeffail
left a comment
There was a problem hiding this comment.
Quite impressed with my claude, looks like it caught a certified doozy, theres also two minor nits that should probably be looked at
🤖 ─────── my bot's two cents ───────
Thank you for this — it is a genuinely impressive piece of work, and the concurrency model and test coverage in particular are excellent. One correctness concern that I believe warrants a change before merge, plus two minor points:
1. (Blocking) The incremental cursor predicate is rendered in UTC, but JQL interprets date literals in the requesting account's timezone.
In buildJQL, the threshold is formatted as a UTC wall-clock string:
threshold := cur.Updated.Add(-r.cfg.cursorOverlap)
parts = append(parts, fmt.Sprintf(`updated >= "%s"`, threshold.UTC().Format("2006-01-02 15:04")))Jira evaluates JQL date/time literals relative to the profile timezone of the account making the request, not UTC, and JQL has no syntax to attach an explicit offset to a date literal. The effective threshold instant is therefore shifted by the account's timezone offset:
- For an account behind UTC (e.g. any of the Americas), the effective threshold lands later than intended. Issues whose
updatedfalls within that gap drop below the predicate on the subsequent poll and are never re-matched — a silent data-loss window that the default 60soverlapcannot absorb. - For an account ahead of UTC, the threshold lands earlier, producing a wider window that the seen-set dedup absorbs harmlessly (at the cost of some redundant fetching).
Conveniently, the fix is already within reach: Connect calls /rest/api/3/myself for auth validation and currently discards the body. That response includes a timeZone field, so the threshold could be formatted in the account's location (threshold.In(accountLoc).Format(...)) to match Jira's interpretation exactly. If reading the account timezone is undesirable for any reason, the minimum bar would be to document prominently that the token's account must be configured to UTC — but deriving it from the /myself response already in hand seems both more robust and only marginally more code.
References for the JQL timezone behaviour:
- https://community.atlassian.com/forums/App-Central-articles/Timezones-in-Jira-reporting-why-dates-look-off/ba-p/3205133
- https://community.developer.atlassian.com/t/jql-on-search-with-updated-date-time-does-not-work-correctly/58754
- https://jira.atlassian.com/browse/JRASERVER-68522 (confirms JQL has no per-literal offset syntax)
2. (Minor) base_url with a trailing slash produces a double slash.
Every request URL is constructed as BaseURL + "/rest/api/3/...". A base_url configured with a trailing / yields //rest/.... Jira most likely tolerates this, but trimming a trailing slash at parse time would be a tidy, defensive guard.
3. (Minor) The empty-page-with-continuation-token path has no backoff.
In Read, an empty page that still carries a nextPageToken falls through to continue with no pause. It is network-bound rather than a CPU spin (each iteration issues an HTTP request), so it is not dangerous, but a server that persistently returns empty pages alongside a continuation token would be polled in a tight request loop. A small guard there would be prudent.
Everything else looks great — happy to re-review once the timezone handling is addressed.
Jira evaluates JQL date literals in the requesting account's timezone, not UTC, and there is no syntax to attach an offset to a literal. Rendering the `updated >=` threshold in UTC therefore shifted the effective instant by the account's offset; for accounts behind UTC this opened a silent data-loss window the default overlap could not absorb. Connect already calls /myself for auth validation, so read its `timeZone` field and render the threshold in that location (falling back to UTC with a warning when absent/unknown). Also addresses two minor review nits: - trim a trailing slash from base_url so request URLs don't get a double slash. - back off briefly when the server returns an empty page that still carries a continuation token, so a misbehaving server cannot drive a tight request loop; fully-deduped issue-bearing pages still chain immediately.
|
Commits
Review Reviewed the new LGTM — no high-signal correctness, security, or pattern issues found. The cursor handling (freeze JQL per pagination run, advance only on fully-acked pages, reset token on nack, prune the seen-set with a minute-of-slack to match JQL truncation) is carefully reasoned and well covered by unit + integration tests. License headers (Apache 2.0) and string-literal field names are consistent with the existing |
Adds a
jirainput that streams Jira REST API events into Connect pipelines via JQL with cursor-based incremental polling. The cursor (max issue.updated timestamp) is persisted to a cache resource so progress survives restarts.Available in both the Cloud and Self-Hosted distributions. Authenticates via API token (email + token). Supports resource={issues,comments,changelog}. The existing
jiraprocessor (enrichment lookups) is unchanged.Shared HTTP/auth setup is extracted into a new internal/impl/jira/jiraauth sub-package consumed by both the processor and the input.
Limitations (v1): no OAuth, no worklogs resource, per-issue comment and changelog pagination is limited to the first response page (truncation logged and counted via the jira_input_child_truncated_total metric).