diff --git a/CLAUDE.md b/CLAUDE.md index 921f923..7179d76 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -6,7 +6,7 @@ Two binaries share one PostgreSQL+PostGIS database and the domain types in `pkg/ - **`cmd/api`** — REST API on port 8080. OpenAPI-driven via oapi-codegen strict server. Owns place reads, place creation, accessibility profile writes, API-key issuance/revocation. Every write runs through structural validation (`internal/validation`) then the a11y rule engine (`internal/a11y`), which computes audit flags from component properties and rejects hard self-contradictions (e.g. `OverallStatus=accessible` with `step + no ramp`). - **`cmd/ingestion`** — one-shot CLI. Two pipelines selected by `sources.SourceKind`: - - **Canonical** (OSM today): streams `models.Place` → batcher (size 1000) → `place.UpsertBatch` on `(osm_id, osm_type)` → batcher accumulates `RETURNING id` values → `identity.Sweeper` drains `unmatched_external` rows near the touched places. + - **Canonical** (OSM today): streams `(models.Place, *models.AccessibilityProfile)` → batcher (size 1000) → `place.UpsertBatch` on `(osm_id, osm_type)` → batcher accumulates `RETURNING id` values → `place.UpsertProfileIngestion` per non-nil profile (skips `user_verified=true` rows) → `identity.Sweeper` drains `unmatched_external` rows near the touched places. - **External** (Wheelmap and similar): streams `identity.Record` → `identity.Resolver` → Confident/LowConfidence attaches `ExternalRef` via `jsonb_set`; NoMatch enqueues to `unmatched_external` with full matchable signal (name/category/street/housenumber/lat/lng). ## Key packages @@ -17,7 +17,7 @@ Two binaries share one PostgreSQL+PostGIS database and the domain types in `pkg/ | `internal/a11y` | Audit-flag computation, hard-conflict detection, `ComputeEffectiveProfile` for parent inheritance. | | `internal/identity` | Match algorithm (50 m radius, category allowlist, weighted score 0.5/0.4/0.1 across distance/name/address, thresholds 0.80 Confident / 0.55 LowConfidence). `Resolver` is the at-write driver; `Sweeper` is the post-canonical-ingest retry driver. Package itself is I/O-free. | | `internal/sources` + `internal/sources/osm` | Capability interfaces and OSM concrete source (tag allowlist, transformer, PBF streamer). | -| `internal/place` | `places` table repo. `UpsertBatch`, `AttachExternalRef`, `FindCandidates`. Satisfies `identity.CandidateRepo`/`AttachRepo`. | +| `internal/place` | `places` and `accessibility_profiles` repo. `UpsertBatch`, `AttachExternalRef`, `FindCandidates`, `UpsertProfile` (API write path), `UpsertProfileIngestion` (ingestion write path, skips `user_verified`). Satisfies `identity.CandidateRepo`/`AttachRepo`. | | `internal/unmatched` | `unmatched_external` queue repo. `Enqueue`, `FindCandidatesNearTouched` (set-based `ST_DWithin` join), `BumpAttempts`, `Delete`. Satisfies `identity.EnqueueRepo`/`SweepRepo`. | | `internal/validation` | Pure structural validation over `pkg/models` types. Entrypoints: `Place`, `PlacesQuery`, `Email`. | | `internal/db` | GORM + numbered SQL migrations + PostGIS index setup. | diff --git a/cmd/api/cross_binary_integration_test.go b/cmd/api/cross_binary_integration_test.go index 463566e..1251daf 100644 --- a/cmd/api/cross_binary_integration_test.go +++ b/cmd/api/cross_binary_integration_test.go @@ -43,7 +43,7 @@ func runOSMIngestForCrossBinary(t *testing.T, pbfPath string) { } buf = buf[:0] } - sink := func(_ context.Context, p models.Place) error { + sink := func(_ context.Context, p models.Place, _ *models.AccessibilityProfile) error { buf = append(buf, p) if len(buf) >= batchSize { flush() diff --git a/cmd/ingestion/batcher.go b/cmd/ingestion/batcher.go index c437365..c1de556 100644 --- a/cmd/ingestion/batcher.go +++ b/cmd/ingestion/batcher.go @@ -7,22 +7,30 @@ package main import ( "context" + "fmt" "github.com/InWheelOrg/inwheel-api/pkg/models" ) -// batcher buffers places and flushes in fixed-size batches via flush. -// It is single-goroutine; cmd/ingestion runs ingestion serially. +// batcher buffers (place, profile) pairs, writes places first via flush, then +// writes profiles using the UUIDs returned by flush. Single-goroutine. type batcher struct { - size int - flush func(context.Context, []models.Place) error - buffer []models.Place - written int - touchedIDs []string + size int + flush func(context.Context, []models.Place) error + writeProfile func(context.Context, string, *models.AccessibilityProfile) (bool, error) + downgradeProfile func(*models.AccessibilityProfile) int + + buffer []models.Place + pendingProfiles []*models.AccessibilityProfile + written int + touchedIDs []string + profilesWritten int + profilesDowngraded int } -func (b *batcher) sink(ctx context.Context, p models.Place) error { +func (b *batcher) sink(ctx context.Context, p models.Place, profile *models.AccessibilityProfile) error { b.buffer = append(b.buffer, p) + b.pendingProfiles = append(b.pendingProfiles, profile) if len(b.buffer) >= b.size { return b.flushNow(ctx) } @@ -33,15 +41,34 @@ func (b *batcher) flushNow(ctx context.Context) error { if len(b.buffer) == 0 { return nil } + if b.writeProfile != nil && b.downgradeProfile == nil { + return fmt.Errorf("batcher: writeProfile set without downgradeProfile") + } if err := b.flush(ctx, b.buffer); err != nil { return err } b.written += len(b.buffer) - for _, p := range b.buffer { - if p.ID != "" { - b.touchedIDs = append(b.touchedIDs, p.ID) + for i, p := range b.buffer { + if p.ID == "" { + continue + } + b.touchedIDs = append(b.touchedIDs, p.ID) + profile := b.pendingProfiles[i] + if profile == nil || b.writeProfile == nil { + continue + } + if b.downgradeProfile != nil { + b.profilesDowngraded += b.downgradeProfile(profile) + } + ok, err := b.writeProfile(ctx, p.ID, profile) + if err != nil { + return err + } + if ok { + b.profilesWritten++ } } b.buffer = b.buffer[:0] + b.pendingProfiles = b.pendingProfiles[:0] return nil } diff --git a/cmd/ingestion/batcher_test.go b/cmd/ingestion/batcher_test.go index da284c7..03d73b1 100644 --- a/cmd/ingestion/batcher_test.go +++ b/cmd/ingestion/batcher_test.go @@ -28,7 +28,7 @@ func TestBatcher_FlushesWhenFull(t *testing.T) { ctx := context.Background() for i := 0; i < 3; i++ { - if err := b.sink(ctx, models.Place{}); err != nil { + if err := b.sink(ctx, models.Place{}, nil); err != nil { t.Fatalf("sink %d: %v", i, err) } } @@ -58,7 +58,7 @@ func TestBatcher_FlushNow_DrainsPartialBuffer(t *testing.T) { ctx := context.Background() for i := 0; i < 3; i++ { - if err := b.sink(ctx, models.Place{}); err != nil { + if err := b.sink(ctx, models.Place{}, nil); err != nil { t.Fatalf("sink: %v", err) } } @@ -102,7 +102,7 @@ func TestBatcher_PropagatesFlushError(t *testing.T) { return sentinel }, } - err := b.sink(context.Background(), models.Place{}) + err := b.sink(context.Background(), models.Place{}, nil) if !errors.Is(err, sentinel) { t.Fatalf("expected sentinel, got %v", err) } @@ -113,7 +113,6 @@ func TestBatcher_TouchedIDsAccumulateAcrossFlushes(t *testing.T) { b := &batcher{ size: 2, flush: func(_ context.Context, ps []models.Place) error { - // Simulate UpsertBatch back-populating IDs. for i := range ps { ps[i].ID = fmt.Sprintf("id-%d-%d", len(flushed), i) } @@ -125,7 +124,7 @@ func TestBatcher_TouchedIDsAccumulateAcrossFlushes(t *testing.T) { } ctx := context.Background() for i := 0; i < 5; i++ { - if err := b.sink(ctx, models.Place{Name: fmt.Sprintf("p%d", i)}); err != nil { + if err := b.sink(ctx, models.Place{Name: fmt.Sprintf("p%d", i)}, nil); err != nil { t.Fatalf("sink: %v", err) } } @@ -146,3 +145,131 @@ func TestBatcher_TouchedIDsAccumulateAcrossFlushes(t *testing.T) { } } } + +func TestBatcher_WritesProfileWhenAttached(t *testing.T) { + ctx := context.Background() + var captured []capturedProfile + b := &batcher{ + size: 10, + flush: func(_ context.Context, ps []models.Place) error { + for i := range ps { + ps[i].ID = fmt.Sprintf("place-%d", i) + } + return nil + }, + writeProfile: func(_ context.Context, placeID string, p *models.AccessibilityProfile) (bool, error) { + captured = append(captured, capturedProfile{placeID: placeID, profile: p}) + return true, nil + }, + downgradeProfile: func(_ *models.AccessibilityProfile) int { return 0 }, + } + + if err := b.sink(ctx, models.Place{Name: "no-a11y"}, nil); err != nil { + t.Fatalf("sink p1: %v", err) + } + profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible} + if err := b.sink(ctx, models.Place{Name: "with-a11y"}, profile); err != nil { + t.Fatalf("sink p2: %v", err) + } + if err := b.flushNow(ctx); err != nil { + t.Fatalf("flushNow: %v", err) + } + if len(captured) != 1 { + t.Fatalf("captured = %d, want 1 profile write", len(captured)) + } + if captured[0].placeID != "place-1" { + t.Errorf("placeID = %q, want place-1", captured[0].placeID) + } + if b.profilesWritten != 1 { + t.Errorf("profilesWritten = %d, want 1", b.profilesWritten) + } +} + +func TestBatcher_DowngradeCounter(t *testing.T) { + ctx := context.Background() + b := &batcher{ + size: 10, + flush: func(_ context.Context, ps []models.Place) error { + for i := range ps { + ps[i].ID = fmt.Sprintf("place-%d", i) + } + return nil + }, + writeProfile: func(_ context.Context, _ string, _ *models.AccessibilityProfile) (bool, error) { + return true, nil + }, + downgradeProfile: func(_ *models.AccessibilityProfile) int { return 2 }, + } + profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible} + if err := b.sink(ctx, models.Place{Name: "p"}, profile); err != nil { + t.Fatalf("sink: %v", err) + } + if err := b.flushNow(ctx); err != nil { + t.Fatalf("flushNow: %v", err) + } + if b.profilesDowngraded != 2 { + t.Errorf("profilesDowngraded = %d, want 2", b.profilesDowngraded) + } +} + +func TestBatcher_PlaceHasNoAccessibilityFieldInFlush(t *testing.T) { + ctx := context.Background() + var batchSeen []models.Place + b := &batcher{ + size: 10, + flush: func(_ context.Context, ps []models.Place) error { + batchSeen = make([]models.Place, len(ps)) + copy(batchSeen, ps) + for i := range ps { + ps[i].ID = fmt.Sprintf("place-%d", i) + } + return nil + }, + writeProfile: func(_ context.Context, _ string, _ *models.AccessibilityProfile) (bool, error) { + return true, nil + }, + downgradeProfile: func(_ *models.AccessibilityProfile) int { return 0 }, + } + profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible} + if err := b.sink(ctx, models.Place{Name: "with-a11y"}, profile); err != nil { + t.Fatalf("sink: %v", err) + } + if err := b.flushNow(ctx); err != nil { + t.Fatalf("flushNow: %v", err) + } + if len(batchSeen) != 1 { + t.Fatalf("expected 1 place in batch, got %d", len(batchSeen)) + } + if batchSeen[0].Accessibility != nil { + t.Errorf("place.Accessibility must be nil in flush batch, got %+v", batchSeen[0].Accessibility) + } +} + +func TestBatcher_FlushNow_ErrorWhenWriteProfileWithoutDowngrade(t *testing.T) { + ctx := context.Background() + b := &batcher{ + size: 10, + flush: func(_ context.Context, ps []models.Place) error { + for i := range ps { + ps[i].ID = fmt.Sprintf("place-%d", i) + } + return nil + }, + writeProfile: func(_ context.Context, _ string, _ *models.AccessibilityProfile) (bool, error) { + return true, nil + }, + } + profile := &models.AccessibilityProfile{OverallStatus: models.StatusAccessible} + if err := b.sink(ctx, models.Place{Name: "p"}, profile); err != nil { + t.Fatalf("sink: %v", err) + } + err := b.flushNow(ctx) + if err == nil { + t.Fatal("expected error, got nil") + } +} + +type capturedProfile struct { + placeID string + profile *models.AccessibilityProfile +} diff --git a/cmd/ingestion/main.go b/cmd/ingestion/main.go index da0f3b2..1815d31 100644 --- a/cmd/ingestion/main.go +++ b/cmd/ingestion/main.go @@ -14,11 +14,13 @@ import ( "gorm.io/gorm" + "github.com/InWheelOrg/inwheel-api/internal/a11y" "github.com/InWheelOrg/inwheel-api/internal/db" "github.com/InWheelOrg/inwheel-api/internal/identity" "github.com/InWheelOrg/inwheel-api/internal/place" "github.com/InWheelOrg/inwheel-api/internal/sources" "github.com/InWheelOrg/inwheel-api/internal/unmatched" + "github.com/InWheelOrg/inwheel-api/pkg/models" ) const batchSize = 1000 @@ -107,7 +109,18 @@ func runPipeline(ctx context.Context, src sources.Source, command string, gormDB func runCanonical(ctx context.Context, src sources.Source, command string, gormDB *gorm.DB) error { placesRepo := place.NewRepository(gormDB) unmatchedRepo := unmatched.NewRepository(gormDB) - b := &batcher{size: batchSize, flush: placesRepo.UpsertBatch} + engine := &a11y.Engine{} + + b := &batcher{ + size: batchSize, + flush: placesRepo.UpsertBatch, + writeProfile: func(ctx context.Context, placeID string, p *models.AccessibilityProfile) (bool, error) { + return placesRepo.UpsertProfileIngestion(ctx, placeID, p) + }, + downgradeProfile: func(p *models.AccessibilityProfile) int { + return resolveIngestionConflicts(engine, p) + }, + } if err := dispatchCanonical(ctx, src, command, b.sink); err != nil { return fmt.Errorf("source %q: %w", src.Name(), err) } @@ -134,6 +147,8 @@ func runCanonical(ctx context.Context, src sources.Source, command string, gormD "source", src.Name(), "command", command, "written", b.written, + "profiles_written", b.profilesWritten, + "profiles_downgraded", b.profilesDowngraded, "sweep_failed", sweepErr != nil, } if sweepErr == nil { @@ -214,6 +229,33 @@ func dispatchExternal(ctx context.Context, src sources.Source, command string, s } } +// resolveIngestionConflicts applies audit flags and demotes conflicting components +// from accessible to limited. +func resolveIngestionConflicts(engine *a11y.Engine, profile *models.AccessibilityProfile) int { + if profile == nil { + return 0 + } + engine.WithAuditFlags(profile) + conflicts := engine.DetectConflicts(profile) + if len(conflicts) == 0 { + return 0 + } + conflicted := make(map[models.A11yComponentType]bool, len(conflicts)) + for _, c := range conflicts { + conflicted[c.Component] = true + } + downgraded := 0 + for i := range profile.Components { + c := &profile.Components[i] + if conflicted[c.Type] && c.OverallStatus == models.StatusAccessible { + c.OverallStatus = models.StatusLimited + downgraded++ + slog.Info("ingestion downgraded component", "component", c.Type, "flags", c.AuditFlags) + } + } + return downgraded +} + // resolveCounters tallies external-source outcomes for the run summary. type resolveCounters struct { confident int diff --git a/cmd/ingestion/main_integration_test.go b/cmd/ingestion/main_integration_test.go index 4e1935b..ae64369 100644 --- a/cmd/ingestion/main_integration_test.go +++ b/cmd/ingestion/main_integration_test.go @@ -13,6 +13,8 @@ import ( "math" "testing" + "github.com/InWheelOrg/inwheel-api/internal/place" + "github.com/InWheelOrg/inwheel-api/internal/sources" "github.com/InWheelOrg/inwheel-api/internal/testhelpers" "github.com/InWheelOrg/inwheel-api/pkg/models" ) @@ -173,3 +175,150 @@ func TestFullImport_AndorraFixture(t *testing.T) { } }) } + +func TestRunCanonical_WritesAccessibilityProfiles(t *testing.T) { + ctx := context.Background() + db, cleanup, err := testhelpers.StartPostgres(ctx) + if err != nil { + t.Fatalf("start postgres: %v", err) + } + defer cleanup() + + hasStep := true + hasRamp := false + src := &fakeCanonicalSource{ + emit: []fakeEmit{ + { + place: models.Place{ + OSMID: 1001, OSMType: models.OSMNode, Name: "Plain", + Lat: 46.4620, Lng: 6.8400, Category: models.CategoryCafe, + Status: models.PlaceStatusActive, + ExternalIDs: models.ExternalIDs{"osm": {ID: "node/1001", Confidence: 1.0}}, + }, + }, + { + place: models.Place{ + OSMID: 1002, OSMType: models.OSMNode, Name: "Accessible", + Lat: 46.4621, Lng: 6.8401, Category: models.CategoryCafe, + Status: models.PlaceStatusActive, + ExternalIDs: models.ExternalIDs{"osm": {ID: "node/1002", Confidence: 1.0}}, + }, + profile: &models.AccessibilityProfile{OverallStatus: models.StatusAccessible}, + }, + { + place: models.Place{ + OSMID: 1003, OSMType: models.OSMNode, Name: "Hard Conflict", + Lat: 46.4622, Lng: 6.8402, Category: models.CategoryCafe, + Status: models.PlaceStatusActive, + ExternalIDs: models.ExternalIDs{"osm": {ID: "node/1003", Confidence: 1.0}}, + }, + profile: &models.AccessibilityProfile{ + OverallStatus: models.StatusAccessible, + Components: models.A11yComponents{{ + Type: models.ComponentEntrance, + OverallStatus: models.StatusAccessible, + Entrance: &models.EntranceProperties{HasStep: &hasStep, HasRamp: &hasRamp}, + }}, + }, + }, + }, + } + + if err := runCanonical(ctx, src, "full-import", db); err != nil { + t.Fatalf("runCanonical: %v", err) + } + + var profiles []models.AccessibilityProfile + if err := db.Find(&profiles).Error; err != nil { + t.Fatalf("read profiles: %v", err) + } + if len(profiles) != 2 { + t.Fatalf("expected 2 profiles (Plain has none), got %d", len(profiles)) + } + + var conflictProfile models.AccessibilityProfile + if err := db.Joins("JOIN places ON places.id = accessibility_profiles.place_id"). + Where("places.osm_id = ?", 1003). + First(&conflictProfile).Error; err != nil { + t.Fatalf("read conflict profile: %v", err) + } + if len(conflictProfile.Components) != 1 { + t.Fatalf("conflict profile components = %d, want 1", len(conflictProfile.Components)) + } + if conflictProfile.Components[0].OverallStatus != models.StatusLimited { + t.Errorf("component status = %q, want limited (downgraded from accessible)", conflictProfile.Components[0].OverallStatus) + } +} + +func TestRunCanonical_DoesNotOverwriteUserVerified(t *testing.T) { + ctx := context.Background() + db, cleanup, err := testhelpers.StartPostgres(ctx) + if err != nil { + t.Fatalf("start postgres: %v", err) + } + defer cleanup() + + repo := place.NewRepository(db) + seed := models.Place{ + OSMID: 2001, OSMType: models.OSMNode, Name: "Verified", + Lat: 46.5000, Lng: 6.9000, Category: models.CategoryCafe, + Status: models.PlaceStatusActive, + ExternalIDs: models.ExternalIDs{"osm": {ID: "node/2001", Confidence: 1.0}}, + } + if err := db.Create(&seed).Error; err != nil { + t.Fatalf("seed place: %v", err) + } + _, err = repo.UpsertProfile(ctx, seed.ID, &models.AccessibilityProfile{ + OverallStatus: models.StatusInaccessible, + UserVerified: true, + }) + if err != nil { + t.Fatalf("seed profile: %v", err) + } + + src := &fakeCanonicalSource{ + emit: []fakeEmit{{ + place: models.Place{ + OSMID: 2001, OSMType: models.OSMNode, Name: "Verified", + Lat: 46.5000, Lng: 6.9000, Category: models.CategoryCafe, + Status: models.PlaceStatusActive, + ExternalIDs: models.ExternalIDs{"osm": {ID: "node/2001", Confidence: 1.0}}, + }, + profile: &models.AccessibilityProfile{OverallStatus: models.StatusAccessible}, + }}, + } + if err := runCanonical(ctx, src, "full-import", db); err != nil { + t.Fatalf("runCanonical: %v", err) + } + + var stored models.AccessibilityProfile + if err := db.Where("place_id = ?", seed.ID).First(&stored).Error; err != nil { + t.Fatalf("read: %v", err) + } + if stored.OverallStatus != models.StatusInaccessible { + t.Errorf("user-verified profile got overwritten: status = %q, want inaccessible", stored.OverallStatus) + } + if !stored.UserVerified { + t.Errorf("user_verified flag was cleared") + } +} + +type fakeEmit struct { + place models.Place + profile *models.AccessibilityProfile +} + +type fakeCanonicalSource struct { + emit []fakeEmit +} + +func (f *fakeCanonicalSource) Name() string { return "fake" } +func (f *fakeCanonicalSource) Kind() sources.SourceKind { return sources.SourceKindCanonical } +func (f *fakeCanonicalSource) FullImport(ctx context.Context, sink sources.Sink) error { + for _, e := range f.emit { + if err := sink(ctx, e.place, e.profile); err != nil { + return err + } + } + return nil +} diff --git a/internal/place/README.md b/internal/place/README.md index 56aa999..b4560b7 100644 --- a/internal/place/README.md +++ b/internal/place/README.md @@ -1,6 +1,6 @@ # internal/place -Data-access layer for the `places` table. The repository is a small, focused type that exposes the read and write operations the rest of the codebase needs against canonical places — no business logic, no validation, no decisions about accessibility. It is the only package that touches `places` directly. +Data-access layer for the `places` table and the related `accessibility_profiles` table. The repository exposes the read and write operations the rest of the codebase needs against canonical places and their profiles — no business logic, no validation, no decisions about accessibility. ## What it provides @@ -31,6 +31,8 @@ flowchart LR | `UpsertBatch(ctx, places)` | the ingestion batcher | Bulk insert/update on `(osm_id, osm_type)` conflict. Uses `RETURNING id` so GORM back-populates the `ID` field on every place in the slice — the batcher harvests these as `touchedIDs` for the retry sweep. | | `AttachExternalRef(ctx, placeID, source, ref)` | `identity.Resolver`, `identity.Sweeper` | Adds an `ExternalRef` to the place's `external_ids` JSONB map under the given source key, via Postgres `jsonb_set`. Concurrent attaches to different sources on the same place don't clobber each other. | | `FindCandidates(ctx, lat, lng, radiusM, categories)` | `identity.Match` | Active places within `radiusM` of the point whose category is in `categories`. Uses `ST_DWithin` over a `geography(ST_Point(lng, lat))` expression. Backed by a PostGIS GIST index. | +| `UpsertProfile(ctx, placeID, profile)` | `cmd/api` (`PatchPlaceAccessibility`) | Create-or-update the accessibility profile for a place. Always overwrites — user-driven write path. Returns `created=true` when a new row was inserted. | +| `UpsertProfileIngestion(ctx, placeID, profile)` | ingestion batcher | Same as `UpsertProfile` but skips the write when `user_verified=true`, preserving human corrections across automated re-ingests. | ## Compile-time contracts diff --git a/internal/place/repository.go b/internal/place/repository.go index 85605c6..28c117c 100644 --- a/internal/place/repository.go +++ b/internal/place/repository.go @@ -188,10 +188,11 @@ func (r *Repository) UpsertProfileIngestion(ctx context.Context, placeID string, "submitted_by": nil, "submitted_at": nil, } - if err := tx.Model(&existing).Updates(updates).Error; err != nil { - return err + result := tx.Model(&existing).Updates(updates) + if result.Error != nil { + return result.Error } - written = true + written = result.RowsAffected > 0 return nil }) return written, err diff --git a/internal/sources/README.md b/internal/sources/README.md index 87ec34c..0a0f8e4 100644 --- a/internal/sources/README.md +++ b/internal/sources/README.md @@ -8,7 +8,7 @@ Every `Source` declares whether it is canonical or external. The dispatcher uses | Kind | Meaning | Today | |---|---|---| -| `SourceKindCanonical` | The source owns place rows. Emits `models.Place`. | OSM | +| `SourceKindCanonical` | The source owns place rows. Emits `models.Place` + optional `*models.AccessibilityProfile`. | OSM | | `SourceKindExternal` | The source contributes accessibility data and external IDs that attach to existing canonical places via `identity.Match`. Emits `identity.Record`. | (none yet — Wheelmap is the planned first) | ## Sinks @@ -17,7 +17,7 @@ The pipeline gives each source a sink to write into rather than letting the sour | Sink | Receives | Used by | |---|---|---| -| `Sink` | `models.Place` | canonical sources | +| `Sink` | `models.Place` + `*models.AccessibilityProfile` | canonical sources | | `RecordSink` | `identity.Record` | external sources | ## Capability interfaces diff --git a/internal/sources/osm/README.md b/internal/sources/osm/README.md index ebc5b8c..4f1155d 100644 --- a/internal/sources/osm/README.md +++ b/internal/sources/osm/README.md @@ -11,7 +11,9 @@ flowchart LR C -- excluded --> D[Skip] C -- category, true --> E[TransformNode] E --> F[DeriveRank] - F --> G[Sink: models.Place] + E --> G[mapTagsToProfile] + F --> H[Sink: Place + Profile] + G --> H ``` `StreamNodes` decodes the PBF and emits one OSM node at a time. Only matched POIs reach the sink. @@ -35,6 +37,25 @@ Anything else is dropped. The natural key for upserts is `(osm_id, osm_type)`, where `osm_type` is `node`, `way`, or `relation`. Today only nodes are streamed. +## Accessibility tag mapping (v1) + +`mapTagsToProfile` reads accessibility tags from the POI node and produces a `*models.AccessibilityProfile` or `nil`. POI-node only — no traversal to nearby `entrance=*` nodes or parent ways. + +| Tag | Maps to | +|---|---| +| `wheelchair=yes\|designated` | profile `OverallStatus=accessible` | +| `wheelchair=limited` | profile `OverallStatus=limited` | +| `wheelchair=no` | profile `OverallStatus=inaccessible` | +| (no `wheelchair=*`, components present) | profile `OverallStatus=unknown` | +| `toilets:wheelchair=yes\|no` | restroom component | +| `capacity:disabled=N` | parking component, count preserved | +| `automatic_door` ≠ `no` | entrance component, `is_automatic=true` | +| `step_count` or `entrance:step_count` ≥ 1 | entrance component, `has_step=true` | +| `ramp:wheelchair=yes\|no` | entrance component, `has_ramp` (takes precedence over generic `ramp`) | +| `elevator=yes` | elevator component, status `accessible` | + +When a profile has a hard conflict (e.g. `step_count=1` + `ramp:wheelchair=no` with `overall_status=accessible`), the ingestion pipeline demotes the conflicting component to `limited`. The API write path rejects the same input with HTTP 422. + ## Rank derivation `DeriveRank` assigns one of three priorities based on category and tag context: diff --git a/internal/sources/osm/source.go b/internal/sources/osm/source.go index 9668ced..d932aeb 100644 --- a/internal/sources/osm/source.go +++ b/internal/sources/osm/source.go @@ -48,7 +48,7 @@ func (s *Source) FullImport(ctx context.Context, sink sources.Sink) error { return nil } - p, _, err := TransformNode(node.ID, node.Lat, node.Lng, node.Tags, category) + p, profile, err := TransformNode(node.ID, node.Lat, node.Lng, node.Tags, category) if err != nil { skipped++ slog.Warn("skipping node", @@ -60,7 +60,7 @@ func (s *Source) FullImport(ctx context.Context, sink sources.Sink) error { } emitted++ - return sink(ctx, *p) + return sink(ctx, *p, profile) }) if err != nil { return fmt.Errorf("stream: %w", err) diff --git a/internal/sources/osm/source_test.go b/internal/sources/osm/source_test.go index 188251e..e871506 100644 --- a/internal/sources/osm/source_test.go +++ b/internal/sources/osm/source_test.go @@ -37,7 +37,7 @@ func TestSource_ImplementsFullImporter(t *testing.T) { func TestSource_FullImport_OpenError(t *testing.T) { s := &Source{PBFPath: "/no/such/file.pbf"} - err := s.FullImport(context.Background(), func(context.Context, models.Place) error { return nil }) + err := s.FullImport(context.Background(), func(context.Context, models.Place, *models.AccessibilityProfile) error { return nil }) if err == nil { t.Fatal("expected error opening missing file, got nil") } @@ -48,7 +48,7 @@ func TestSource_FullImport_EmitsFromFixture(t *testing.T) { s := &Source{PBFPath: fixturePBF} var emitted int - sink := func(_ context.Context, _ models.Place) error { + sink := func(_ context.Context, _ models.Place, _ *models.AccessibilityProfile) error { emitted++ return nil } @@ -67,7 +67,7 @@ func TestSource_FullImport_PropagatesSinkError(t *testing.T) { s := &Source{PBFPath: fixturePBF} sentinel := errors.New("sink stop") - sink := func(_ context.Context, _ models.Place) error { + sink := func(_ context.Context, _ models.Place, _ *models.AccessibilityProfile) error { return sentinel } diff --git a/internal/sources/source.go b/internal/sources/source.go index 0603347..e82734c 100644 --- a/internal/sources/source.go +++ b/internal/sources/source.go @@ -27,8 +27,8 @@ const ( SourceKindExternal ) -// Sink receives one place at a time. Used by canonical sources. -type Sink func(context.Context, models.Place) error +// Sink receives one place and its optional accessibility profile. Used by canonical sources. +type Sink func(context.Context, models.Place, *models.AccessibilityProfile) error // RecordSink receives one identity.Record at a time. Used by external sources. type RecordSink func(context.Context, identity.Record) error