Skip to content

Commit aef937a

Browse files
committed
feat: add ForceReindex flag to VersionSyncWorkflow
Allows re-triggering asset indexing for all existing versions, not just newly discovered ones. Useful for backfilling data after schema changes like the new checksum columns. Also trims the Versions slice on continue-as-new in VersionBatchIndexWorkflow to avoid re-serializing already-processed entries on every cycle. Signed-off-by: Gabriel Harris-Rouquette <gabizou@me.com>
1 parent fb2b57b commit aef937a

3 files changed

Lines changed: 21 additions & 11 deletions

File tree

docs/WORKFLOWS.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ HTTP API (RegisterArtifact → triggers VersionSyncWorkflow)
1212
VersionSyncWorkflow ─────────────────────────────────────────────────┐
1313
├── FetchVersions (Sonatype maven-metadata.xml) |
1414
├── StoreNewVersions (DB: compare + insert new versions) |
15-
├── VersionBatchIndexWorkflow (child, if new versions found) |
15+
├── VersionBatchIndexWorkflow (child, if new versions or ForceReindex)|
1616
│ └── VersionIndexWorkflow (per version, sliding window of 5) |
1717
│ ├── FetchVersionAssets (Sonatype REST API) |
1818
│ ├── StoreVersionAssets (DB) |
@@ -57,13 +57,15 @@ The top-level orchestrator, triggered when an artifact is registered via the HTT
5757

5858
**Trigger:** `POST /groups/{groupID}/artifacts` (fire-and-forget)
5959

60+
**Input:** `VersionSyncInput{GroupID, ArtifactID, ForceReindex}`
61+
6062
**Steps:**
6163
1. Fetch all version strings from Sonatype Nexus (`maven-metadata.xml`)
6264
2. Compare against DB and store only new versions
63-
3. Launch batch indexing for new versions (assets + commits)
65+
3. Launch batch indexing for new versions (assets + commits) — or **all** versions when `ForceReindex` is true
6466
4. Compute version ordering and extract schema-driven tags for **all** versions
6567

66-
The ordering step runs for all versions (not just new ones) because `sort_order` is a relative ranking that must be recomputed when new versions are inserted.
68+
The ordering step runs for all versions (not just new ones) because `sort_order` is a relative ranking that must be recomputed when new versions are inserted. The `ForceReindex` flag is useful for backfilling data after schema changes (e.g. new asset columns).
6769

6870
### VersionBatchIndexWorkflow
6971

internal/workflow/version_batch_index.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ func (s *versionBatchState) continueOrComplete(ctx workflow.Context) (int, error
118118
s.drainCompletionSignals(ctx)
119119

120120
return 0, workflow.NewContinueAsNewError(ctx, VersionBatchIndexWorkflow, VersionBatchIndexInput{
121-
Versions: s.input.Versions,
121+
Versions: s.input.Versions[s.offset:],
122122
WindowSize: s.input.WindowSize,
123-
Offset: s.offset,
123+
Offset: 0,
124124
Progress: s.progress,
125125
CurrentRecords: s.currentRecords,
126126
})

internal/workflow/version_sync.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ const (
1717

1818
// VersionSyncInput is the input payload for the VersionSyncWorkflow.
1919
type VersionSyncInput struct {
20-
GroupID string
21-
ArtifactID string
20+
GroupID string
21+
ArtifactID string
22+
ForceReindex bool
2223
}
2324

2425
// VersionSyncOutput is the result of the VersionSyncWorkflow.
@@ -95,15 +96,19 @@ func VersionSyncWorkflow(ctx workflow.Context, input VersionSyncInput) (*Version
9596
// Ordering needs version rows but not assets or commits.
9697
// Both can run concurrently.
9798

98-
// Start indexing (only for new versions)
99+
// Start indexing — for new versions, or all versions when force-reindexing.
100+
versionsToIndex := storeResult.NewVersions
101+
if input.ForceReindex {
102+
versionsToIndex = fetchResult.Versions
103+
}
99104
var indexFuture workflow.ChildWorkflowFuture
100-
if len(storeResult.NewVersions) > 0 {
105+
if len(versionsToIndex) > 0 {
101106
indexOpts := workflow.ChildWorkflowOptions{
102107
WorkflowID: fmt.Sprintf("version-batch-index-%s-%s", input.GroupID, input.ArtifactID),
103108
}
104109
indexCtx := workflow.WithChildOptions(ctx, indexOpts)
105110
indexFuture = workflow.ExecuteChildWorkflow(indexCtx, VersionBatchIndexWorkflow, VersionBatchIndexInput{
106-
Versions: storeResult.NewVersions,
111+
Versions: versionsToIndex,
107112
WindowSize: versionBatchDefaultWindowSize,
108113
})
109114
}
@@ -138,7 +143,10 @@ func VersionSyncWorkflow(ctx workflow.Context, input VersionSyncInput) (*Version
138143
enrichCtx := workflow.WithChildOptions(ctx, enrichOpts)
139144

140145
var enrichResult CommitEnrichmentOutput
141-
err = workflow.ExecuteChildWorkflow(enrichCtx, CommitEnrichmentWorkflow, CommitEnrichmentInput(input)).Get(ctx, &enrichResult)
146+
err = workflow.ExecuteChildWorkflow(enrichCtx, CommitEnrichmentWorkflow, CommitEnrichmentInput{
147+
GroupID: input.GroupID,
148+
ArtifactID: input.ArtifactID,
149+
}).Get(ctx, &enrichResult)
142150
if err != nil {
143151
return nil, fmt.Errorf("enriching commits: %w", err)
144152
}

0 commit comments

Comments
 (0)