Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Two binaries share one PostgreSQL+PostGIS database and the domain types in `pkg/

- **`cmd/api`** — REST API on port 8080. OpenAPI-driven via oapi-codegen strict server. Owns place reads, place creation, accessibility profile writes, API-key issuance/revocation. Every write runs through structural validation (`internal/validation`) then the a11y rule engine (`internal/a11y`), which computes audit flags from component properties and rejects hard self-contradictions (e.g. `OverallStatus=accessible` with `step + no ramp`).
- **`cmd/ingestion`** — one-shot CLI. Two pipelines selected by `sources.SourceKind`:
- **Canonical** (OSM today): streams `models.Place` → batcher (size 1000) → `place.UpsertBatch` on `(osm_id, osm_type)` → batcher accumulates `RETURNING id` values → `identity.Sweeper` drains `unmatched_external` rows near the touched places.
- **Canonical** (OSM today): streams `(models.Place, *models.AccessibilityProfile)` → batcher (size 1000) → `place.UpsertBatch` on `(osm_id, osm_type)` → batcher accumulates `RETURNING id` values → `place.UpsertProfileIngestion` per non-nil profile (skips `user_verified=true` rows) → `identity.Sweeper` drains `unmatched_external` rows near the touched places.
- **External** (Wheelmap and similar): streams `identity.Record` → `identity.Resolver` → Confident/LowConfidence attaches `ExternalRef` via `jsonb_set`; NoMatch enqueues to `unmatched_external` with full matchable signal (name/category/street/housenumber/lat/lng).

## Key packages
Expand All @@ -17,7 +17,7 @@ Two binaries share one PostgreSQL+PostGIS database and the domain types in `pkg/
| `internal/a11y` | Audit-flag computation, hard-conflict detection, `ComputeEffectiveProfile` for parent inheritance. |
| `internal/identity` | Match algorithm (50 m radius, category allowlist, weighted score 0.5/0.4/0.1 across distance/name/address, thresholds 0.80 Confident / 0.55 LowConfidence). `Resolver` is the at-write driver; `Sweeper` is the post-canonical-ingest retry driver. Package itself is I/O-free. |
| `internal/sources` + `internal/sources/osm` | Capability interfaces and OSM concrete source (tag allowlist, transformer, PBF streamer). |
| `internal/place` | `places` table repo. `UpsertBatch`, `AttachExternalRef`, `FindCandidates`. Satisfies `identity.CandidateRepo`/`AttachRepo`. |
| `internal/place` | `places` and `accessibility_profiles` repo. `UpsertBatch`, `AttachExternalRef`, `FindCandidates`, `UpsertProfile` (API write path), `UpsertProfileIngestion` (ingestion write path, skips `user_verified`). Satisfies `identity.CandidateRepo`/`AttachRepo`. |
| `internal/unmatched` | `unmatched_external` queue repo. `Enqueue`, `FindCandidatesNearTouched` (set-based `ST_DWithin` join), `BumpAttempts`, `Delete`. Satisfies `identity.EnqueueRepo`/`SweepRepo`. |
| `internal/validation` | Pure structural validation over `pkg/models` types. Entrypoints: `Place`, `PlacesQuery`, `Email`. |
| `internal/db` | GORM + numbered SQL migrations + PostGIS index setup. |
Expand Down
2 changes: 1 addition & 1 deletion cmd/api/cross_binary_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func runOSMIngestForCrossBinary(t *testing.T, pbfPath string) {
}
buf = buf[:0]
}
sink := func(_ context.Context, p models.Place) error {
sink := func(_ context.Context, p models.Place, _ *models.AccessibilityProfile) error {
buf = append(buf, p)
if len(buf) >= batchSize {
flush()
Expand Down
49 changes: 38 additions & 11 deletions cmd/ingestion/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,30 @@ package main

import (
"context"
"fmt"

"github.com/InWheelOrg/inwheel-api/pkg/models"
)

// batcher buffers places and flushes in fixed-size batches via flush.
// It is single-goroutine; cmd/ingestion runs ingestion serially.
// batcher buffers (place, profile) pairs, writes places first via flush, then
// writes profiles using the UUIDs returned by flush. Single-goroutine.
type batcher struct {
size int
flush func(context.Context, []models.Place) error
buffer []models.Place
written int
touchedIDs []string
size int
flush func(context.Context, []models.Place) error
writeProfile func(context.Context, string, *models.AccessibilityProfile) (bool, error)
downgradeProfile func(*models.AccessibilityProfile) int

buffer []models.Place
pendingProfiles []*models.AccessibilityProfile
written int
touchedIDs []string
profilesWritten int
profilesDowngraded int
}

func (b *batcher) sink(ctx context.Context, p models.Place) error {
func (b *batcher) sink(ctx context.Context, p models.Place, profile *models.AccessibilityProfile) error {
b.buffer = append(b.buffer, p)
b.pendingProfiles = append(b.pendingProfiles, profile)
if len(b.buffer) >= b.size {
return b.flushNow(ctx)
}
Expand All @@ -33,15 +41,34 @@ func (b *batcher) flushNow(ctx context.Context) error {
if len(b.buffer) == 0 {
return nil
}
if b.writeProfile != nil && b.downgradeProfile == nil {
return fmt.Errorf("batcher: writeProfile set without downgradeProfile")
}
if err := b.flush(ctx, b.buffer); err != nil {
return err
}
b.written += len(b.buffer)
for _, p := range b.buffer {
if p.ID != "" {
b.touchedIDs = append(b.touchedIDs, p.ID)
for i, p := range b.buffer {
if p.ID == "" {
continue
}
b.touchedIDs = append(b.touchedIDs, p.ID)
profile := b.pendingProfiles[i]
if profile == nil || b.writeProfile == nil {
continue
}
if b.downgradeProfile != nil {
b.profilesDowngraded += b.downgradeProfile(profile)
}
ok, err := b.writeProfile(ctx, p.ID, profile)
if err != nil {
return err
}
if ok {
b.profilesWritten++
}
}
b.buffer = b.buffer[:0]
b.pendingProfiles = b.pendingProfiles[:0]
return nil
}
137 changes: 132 additions & 5 deletions cmd/ingestion/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestBatcher_FlushesWhenFull(t *testing.T) {

ctx := context.Background()
for i := 0; i < 3; i++ {
if err := b.sink(ctx, models.Place{}); err != nil {
if err := b.sink(ctx, models.Place{}, nil); err != nil {
t.Fatalf("sink %d: %v", i, err)
}
}
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestBatcher_FlushNow_DrainsPartialBuffer(t *testing.T) {

ctx := context.Background()
for i := 0; i < 3; i++ {
if err := b.sink(ctx, models.Place{}); err != nil {
if err := b.sink(ctx, models.Place{}, nil); err != nil {
t.Fatalf("sink: %v", err)
}
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestBatcher_PropagatesFlushError(t *testing.T) {
return sentinel
},
}
err := b.sink(context.Background(), models.Place{})
err := b.sink(context.Background(), models.Place{}, nil)
if !errors.Is(err, sentinel) {
t.Fatalf("expected sentinel, got %v", err)
}
Expand All @@ -113,7 +113,6 @@ func TestBatcher_TouchedIDsAccumulateAcrossFlushes(t *testing.T) {
b := &batcher{
size: 2,
flush: func(_ context.Context, ps []models.Place) error {
// Simulate UpsertBatch back-populating IDs.
for i := range ps {
ps[i].ID = fmt.Sprintf("id-%d-%d", len(flushed), i)
}
Expand All @@ -125,7 +124,7 @@ func TestBatcher_TouchedIDsAccumulateAcrossFlushes(t *testing.T) {
}
ctx := context.Background()
for i := 0; i < 5; i++ {
if err := b.sink(ctx, models.Place{Name: fmt.Sprintf("p%d", i)}); err != nil {
if err := b.sink(ctx, models.Place{Name: fmt.Sprintf("p%d", i)}, nil); err != nil {
t.Fatalf("sink: %v", err)
}
}
Expand All @@ -146,3 +145,131 @@ func TestBatcher_TouchedIDsAccumulateAcrossFlushes(t *testing.T) {
}
}
}

func TestBatcher_WritesProfileWhenAttached(t *testing.T) {
ctx := context.Background()
var captured []capturedProfile
b := &batcher{
size: 10,
flush: func(_ context.Context, ps []models.Place) error {
for i := range ps {
ps[i].ID = fmt.Sprintf("place-%d", i)
}
return nil
},
writeProfile: func(_ context.Context, placeID string, p *models.AccessibilityProfile) (bool, error) {
captured = append(captured, capturedProfile{placeID: placeID, profile: p})
return true, nil
},
downgradeProfile: func(_ *models.AccessibilityProfile) int { return 0 },
}

if err := b.sink(ctx, models.Place{Name: "no-a11y"}, nil); err != nil {
t.Fatalf("sink p1: %v", err)
}
profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible}
if err := b.sink(ctx, models.Place{Name: "with-a11y"}, profile); err != nil {
t.Fatalf("sink p2: %v", err)
}
if err := b.flushNow(ctx); err != nil {
t.Fatalf("flushNow: %v", err)
}
if len(captured) != 1 {
t.Fatalf("captured = %d, want 1 profile write", len(captured))
}
if captured[0].placeID != "place-1" {
t.Errorf("placeID = %q, want place-1", captured[0].placeID)
}
if b.profilesWritten != 1 {
t.Errorf("profilesWritten = %d, want 1", b.profilesWritten)
}
}

func TestBatcher_DowngradeCounter(t *testing.T) {
ctx := context.Background()
b := &batcher{
size: 10,
flush: func(_ context.Context, ps []models.Place) error {
for i := range ps {
ps[i].ID = fmt.Sprintf("place-%d", i)
}
return nil
},
writeProfile: func(_ context.Context, _ string, _ *models.AccessibilityProfile) (bool, error) {
return true, nil
},
downgradeProfile: func(_ *models.AccessibilityProfile) int { return 2 },
}
profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible}
if err := b.sink(ctx, models.Place{Name: "p"}, profile); err != nil {
t.Fatalf("sink: %v", err)
}
if err := b.flushNow(ctx); err != nil {
t.Fatalf("flushNow: %v", err)
}
if b.profilesDowngraded != 2 {
t.Errorf("profilesDowngraded = %d, want 2", b.profilesDowngraded)
}
}

func TestBatcher_PlaceHasNoAccessibilityFieldInFlush(t *testing.T) {
ctx := context.Background()
var batchSeen []models.Place
b := &batcher{
size: 10,
flush: func(_ context.Context, ps []models.Place) error {
batchSeen = make([]models.Place, len(ps))
copy(batchSeen, ps)
for i := range ps {
ps[i].ID = fmt.Sprintf("place-%d", i)
}
return nil
},
writeProfile: func(_ context.Context, _ string, _ *models.AccessibilityProfile) (bool, error) {
return true, nil
},
downgradeProfile: func(_ *models.AccessibilityProfile) int { return 0 },
}
profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible}
if err := b.sink(ctx, models.Place{Name: "with-a11y"}, profile); err != nil {
t.Fatalf("sink: %v", err)
}
if err := b.flushNow(ctx); err != nil {
t.Fatalf("flushNow: %v", err)
}
if len(batchSeen) != 1 {
t.Fatalf("expected 1 place in batch, got %d", len(batchSeen))
}
if batchSeen[0].Accessibility != nil {
t.Errorf("place.Accessibility must be nil in flush batch, got %+v", batchSeen[0].Accessibility)
}
}

func TestBatcher_FlushNow_ErrorWhenWriteProfileWithoutDowngrade(t *testing.T) {
ctx := context.Background()
b := &batcher{
size: 10,
flush: func(_ context.Context, ps []models.Place) error {
for i := range ps {
ps[i].ID = fmt.Sprintf("place-%d", i)
}
return nil
},
writeProfile: func(_ context.Context, _ string, _ *models.AccessibilityProfile) (bool, error) {
return true, nil
},
}
profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible}
if err := b.sink(ctx, models.Place{Name: "p"}, profile); err != nil {
t.Fatalf("sink: %v", err)
}
err := b.flushNow(ctx)
if err == nil {
t.Fatal("expected error, got nil")
}
}

type capturedProfile struct {
placeID string
profile *models.AccessibilityProfile
}
44 changes: 43 additions & 1 deletion cmd/ingestion/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (

"gorm.io/gorm"

"github.com/InWheelOrg/inwheel-api/internal/a11y"
"github.com/InWheelOrg/inwheel-api/internal/db"
"github.com/InWheelOrg/inwheel-api/internal/identity"
"github.com/InWheelOrg/inwheel-api/internal/place"
"github.com/InWheelOrg/inwheel-api/internal/sources"
"github.com/InWheelOrg/inwheel-api/internal/unmatched"
"github.com/InWheelOrg/inwheel-api/pkg/models"
)

const batchSize = 1000
Expand Down Expand Up @@ -107,7 +109,18 @@ func runPipeline(ctx context.Context, src sources.Source, command string, gormDB
func runCanonical(ctx context.Context, src sources.Source, command string, gormDB *gorm.DB) error {
placesRepo := place.NewRepository(gormDB)
unmatchedRepo := unmatched.NewRepository(gormDB)
b := &batcher{size: batchSize, flush: placesRepo.UpsertBatch}
engine := &a11y.Engine{}

b := &batcher{
size: batchSize,
flush: placesRepo.UpsertBatch,
writeProfile: func(ctx context.Context, placeID string, p *models.AccessibilityProfile) (bool, error) {
return placesRepo.UpsertProfileIngestion(ctx, placeID, p)
},
downgradeProfile: func(p *models.AccessibilityProfile) int {
return resolveIngestionConflicts(engine, p)
},
}
if err := dispatchCanonical(ctx, src, command, b.sink); err != nil {
return fmt.Errorf("source %q: %w", src.Name(), err)
}
Expand All @@ -134,6 +147,8 @@ func runCanonical(ctx context.Context, src sources.Source, command string, gormD
"source", src.Name(),
"command", command,
"written", b.written,
"profiles_written", b.profilesWritten,
"profiles_downgraded", b.profilesDowngraded,
"sweep_failed", sweepErr != nil,
}
if sweepErr == nil {
Expand Down Expand Up @@ -214,6 +229,33 @@ func dispatchExternal(ctx context.Context, src sources.Source, command string, s
}
}

// resolveIngestionConflicts applies audit flags and demotes conflicting components
// from accessible to limited.
func resolveIngestionConflicts(engine *a11y.Engine, profile *models.AccessibilityProfile) int {
if profile == nil {
return 0
}
engine.WithAuditFlags(profile)
conflicts := engine.DetectConflicts(profile)
if len(conflicts) == 0 {
return 0
}
conflicted := make(map[models.A11yComponentType]bool, len(conflicts))
for _, c := range conflicts {
conflicted[c.Component] = true
}
downgraded := 0
for i := range profile.Components {
c := &profile.Components[i]
if conflicted[c.Type] && c.OverallStatus == models.StatusAccessible {
c.OverallStatus = models.StatusLimited
downgraded++
slog.Info("ingestion downgraded component", "component", c.Type, "flags", c.AuditFlags)
}
}
return downgraded
}

// resolveCounters tallies external-source outcomes for the run summary.
type resolveCounters struct {
confident int
Expand Down
Loading
Loading