Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ Library catalog search uses a facade pattern that routes to either Elasticsearch
- **`elasticsearch.client.ts`** -- Singleton ES client. Returns `null` when `ELASTICSEARCH_URL` is unset (graceful degradation).
- **`elasticsearch.indices.ts`** -- Index mapping and lifecycle (`ensureLibraryIndex()` called at startup).
- **`elasticsearch.search.ts`** -- ES query implementations (`searchLibraryES`, `findSimilarArtistES`, `searchAlbumsByTitleES`, `searchByArtistES`). All return `LibraryArtistViewEntry[]`.
- **`elasticsearch.sync.ts`** -- Stub for dual-write sync and bulk reindex (PR 2).
- **`elasticsearch.sync.ts`** -- Dual-write sync and bulk reindex. `indexLibraryDocumentById` is called fire-and-forget from `library.service.ts` on album insert, rotation add, and rotation kill. `bulkIndexLibrary` reads all rows from `library_artist_view` and bulk-indexes into ES in batches of 500. `removeLibraryDocument` deletes a single document by ID (ignores 404).
- **`index.ts`** -- Facade: tries ES first, falls back to pg_trgm on error. Exports `searchLibrary`, `findSimilarArtist`, `searchAlbumsByTitle`, `searchByArtist`.

The original pg_trgm implementations in `library.service.ts` are renamed with `pgTrgm` prefix (e.g., `pgTrgmSearchLibrary`) and re-exported via the facade under the original names. No callers need to change imports.

Dual-write sync errors are logged but never propagated -- library operations (insert, rotation add/kill) succeed even if ES is temporarily unavailable. The sync module has a safe circular dependency with `library.service.ts` (both imports are function-scoped, not evaluated at module load time).

`POST /library/reindex` triggers a full bulk reindex. Requires `catalog: ['write']` permission (musicDirector or stationManager). Should be called after ETL jobs that modify library data outside the normal API flow.

Feature flag: set `ELASTICSEARCH_URL` to enable ES, unset to disable. Instant rollback by unsetting the env var.

### Auth Server (`apps/auth`)
Expand Down
12 changes: 12 additions & 0 deletions apps/backend/controllers/library.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
RotationRelease,
} from '@wxyc/database';
import * as libraryService from '../services/library.service.js';
import { bulkIndexLibrary } from '../services/search/elasticsearch.sync.js';

type NewAlbumRequest = {
album_title: string;
Expand Down Expand Up @@ -318,6 +319,17 @@ export const addGenre: RequestHandler = async (req, res, next) => {
}
};

export const reindexLibrary: RequestHandler = async (req, res, next) => {
try {
const { indexed, errors } = await bulkIndexLibrary();
res.status(200).json({ message: 'Reindex complete', indexed, errors });
} catch (e) {
console.error('Error: Failed to reindex library');
console.error(e);
next(e);
}
};

export const getAlbum: RequestHandler<object, unknown, unknown, { album_id: string }> = async (req, res, next) => {
const { query } = req;
if (query.album_id === undefined) {
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/routes/library.route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ library_route.get('/genres', requirePermissions({ catalog: ['read'] }), libraryC
library_route.post('/genres', requirePermissions({ catalog: ['write'] }), libraryController.addGenre);

library_route.get('/info', requirePermissions({ catalog: ['read'] }), libraryController.getAlbum);

library_route.post('/reindex', requirePermissions({ catalog: ['write'] }), libraryController.reindexLibrary);
17 changes: 14 additions & 3 deletions apps/backend/services/library.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import {
} from '@wxyc/database';
import { LibraryResult, EnrichedLibraryResult, enrichLibraryResult } from './requestLine/types.js';
import { extractSignificantWords } from './requestLine/matching/index.js';
// Circular dependency with elasticsearch.sync is safe — both imports are used
// only inside function bodies, not at module evaluation time.
import { indexLibraryDocumentById } from './search/elasticsearch.sync.js';

export const getFormatsFromDB = async () => {
const formats = await db
Expand Down Expand Up @@ -90,7 +93,9 @@ export const getRotationFromDB = async (): Promise<Rotation[]> => {

export const addToRotation = async (newRotation: RotationAddRequest) => {
const insertedRotation: RotationRelease[] = await db.insert(rotation).values(newRotation).returning();
return insertedRotation[0];
const result = insertedRotation[0];
indexLibraryDocumentById(result.album_id).catch(() => {});
return result;
};

export const killRotationInDB = async (rotationId: number, updatedKillDate?: string) => {
Expand All @@ -99,12 +104,18 @@ export const killRotationInDB = async (rotationId: number, updatedKillDate?: str
.set({ kill_date: updatedKillDate || sql`CURRENT_DATE` })
.where(eq(rotation.id, rotationId))
.returning();
return updatedRotation[0];
const result = updatedRotation[0];
if (result) {
indexLibraryDocumentById(result.album_id).catch(() => {});
}
return result;
};

export const insertAlbum = async (newAlbum: NewAlbum) => {
const response = await db.insert(library).values(newAlbum).returning();
return response[0];
const inserted = response[0];
indexLibraryDocumentById(inserted.id).catch(() => {});
return inserted;
};

//based on artist name and album title, retrieve n best matches from db
Expand Down
139 changes: 130 additions & 9 deletions apps/backend/services/search/elasticsearch.sync.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,146 @@
import type { LibraryArtistViewEntry } from '@wxyc/database';
import { db } from '@wxyc/database';
import { sql } from 'drizzle-orm';
import { getElasticsearchClient } from './elasticsearch.client.js';
import { ensureLibraryIndex, getLibraryIndexName } from './elasticsearch.indices.js';

const BULK_BATCH_SIZE = 500;

/**
* Convert a view entry to an ES document body, serializing dates for the ES date mapping.
*/
function toEsDocument(doc: LibraryArtistViewEntry): Record<string, unknown> {
return {
id: doc.id,
artist_name: doc.artist_name,
alphabetical_name: doc.alphabetical_name,
album_title: doc.album_title,
label: doc.label,
genre_name: doc.genre_name,
format_name: doc.format_name,
rotation_bin: doc.rotation_bin,
code_letters: doc.code_letters,
code_artist_number: doc.code_artist_number,
code_number: doc.code_number,
add_date: doc.add_date instanceof Date
? doc.add_date.toISOString()
: new Date(String(doc.add_date)).toISOString(),
};
}

// Use raw SQL for view queries to avoid Drizzle's column name confusion:
// Drizzle references underlying table columns (e.g. artist_genre_code) instead
// of view column aliases (e.g. code_artist_number) when using db.select().from(view).

/**
* Query `library_artist_view` by album ID to get the full denormalized row.
*/
async function getLibraryViewEntryById(albumId: number): Promise<LibraryArtistViewEntry | null> {
const rows = await db.execute(
sql`SELECT * FROM wxyc_schema.library_artist_view WHERE id = ${albumId} LIMIT 1`
);
return (rows[0] as LibraryArtistViewEntry | undefined) ?? null;
}

/**
* Index a single library document into Elasticsearch.
* Stub — implemented in PR 2.
* No-ops when ES is disabled. Errors are logged, never thrown.
*/
async function indexLibraryDocument(doc: LibraryArtistViewEntry): Promise<void> {
const client = getElasticsearchClient();
if (!client) return;

await client.index({
index: getLibraryIndexName(),
id: String(doc.id),
body: toEsDocument(doc),
});
}

/**
* Index a library document by album ID. Queries the view, then indexes into ES.
* Entire body is wrapped in try/catch — logs errors, never throws.
* Safe to call fire-and-forget from dual-write callers.
*/
export async function indexLibraryDocument(_doc: LibraryArtistViewEntry): Promise<void> {
// TODO: PR 2 — dual-write sync
export async function indexLibraryDocumentById(albumId: number): Promise<void> {
try {
const client = getElasticsearchClient();
if (!client) return;

const doc = await getLibraryViewEntryById(albumId);
if (!doc) {
console.warn(`[Elasticsearch] Album ${albumId} not found in library_artist_view, skipping index`);
return;
}

await indexLibraryDocument(doc);
} catch (error) {
console.error(`[Elasticsearch] Failed to index album ${albumId}:`, error);
}
}

/**
* Remove a library document from the Elasticsearch index by ID.
* Stub — implemented in PR 2.
* Ignores 404 (document may not exist in ES yet). Never throws.
*/
export async function removeLibraryDocument(_id: number): Promise<void> {
// TODO: PR 2 — dual-write sync
export async function removeLibraryDocument(id: number): Promise<void> {
try {
const client = getElasticsearchClient();
if (!client) return;

await client.delete({
index: getLibraryIndexName(),
id: String(id),
});
} catch (error: unknown) {
const statusCode = (error as { meta?: { statusCode?: number } })?.meta?.statusCode;
if (statusCode === 404) return;
console.error(`[Elasticsearch] Failed to remove document ${id}:`, error);
}
}

/**
* Full reindex: read all rows from library_artist_view and bulk-index into ES.
* Stub — implemented in PR 2.
* Returns counts for observability. No-ops gracefully when ES is disabled.
*/
export async function bulkIndexLibrary(): Promise<void> {
// TODO: PR 2 — bulk reindex job
export async function bulkIndexLibrary(): Promise<{ indexed: number; errors: number }> {
const client = getElasticsearchClient();
if (!client) return { indexed: 0, errors: 0 };

await ensureLibraryIndex();

const rows = (await db.execute(
sql`SELECT * FROM wxyc_schema.library_artist_view`
)) as unknown as LibraryArtistViewEntry[];

if (rows.length === 0) {
return { indexed: 0, errors: 0 };
}

const indexName = getLibraryIndexName();
let totalErrors = 0;

for (let i = 0; i < rows.length; i += BULK_BATCH_SIZE) {
const batch = rows.slice(i, i + BULK_BATCH_SIZE);
const operations = batch.flatMap((doc) => [
{ index: { _index: indexName, _id: String(doc.id) } },
toEsDocument(doc),
]);

const response = await client.bulk({ operations });

if (response.errors) {
const errorItems = (response.items as Array<{ index?: { status?: number; error?: unknown } }>).filter(
(item) => item.index && item.index.status && item.index.status >= 400
);
totalErrors += errorItems.length;
console.error(`[Elasticsearch] Bulk batch had ${errorItems.length} errors`);
errorItems.forEach((item) =>
console.error('[Elasticsearch] Bulk item error:', JSON.stringify(item.index?.error))
);
}
}

console.log(`[Elasticsearch] Reindex complete: ${rows.length} documents, ${totalErrors} errors`);
return { indexed: rows.length, errors: totalErrors };
}
83 changes: 79 additions & 4 deletions tests/unit/services/library.service.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
// Mock dependencies before importing the service
import { jest } from '@jest/globals';

// Mock ES sync before importing the service
const mockIndexLibraryDocumentById = jest.fn().mockResolvedValue(undefined);
jest.mock('../../../apps/backend/services/search/elasticsearch.sync', () => ({
indexLibraryDocumentById: mockIndexLibraryDocumentById,
}));

// Build a chainable mock DB
const mockReturning = jest.fn();
const mockValues = jest.fn().mockReturnValue({ returning: mockReturning });
const mockInsert = jest.fn().mockReturnValue({ values: mockValues });

const mockSet = jest.fn();
const mockUpdateWhere = jest.fn().mockReturnValue({ returning: mockReturning });
mockSet.mockReturnValue({ where: mockUpdateWhere });
const mockUpdate = jest.fn().mockReturnValue({ set: mockSet });

jest.mock('@wxyc/database', () => ({
db: {
select: jest.fn().mockReturnThis(),
insert: jest.fn().mockReturnThis(),
update: jest.fn().mockReturnThis(),
insert: mockInsert,
update: mockUpdate,
delete: jest.fn().mockReturnThis(),
},
library: {},
Expand All @@ -16,16 +33,21 @@ jest.mock('@wxyc/database', () => ({

jest.mock('drizzle-orm', () => ({
eq: jest.fn((a, b) => ({ eq: [a, b] })),
and: jest.fn((...args: unknown[]) => ({ and: args })),
sql: Object.assign(
jest.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ sql: strings, values })),
{ raw: jest.fn((s: string) => ({ raw: s })) }
),
desc: jest.fn((col) => ({ desc: col })),
}));

import { isISODate } from '../../../apps/backend/services/library.service';
import { isISODate, insertAlbum, addToRotation, killRotationInDB } from '../../../apps/backend/services/library.service';

describe('library.service', () => {
beforeEach(() => {
jest.clearAllMocks();
});

describe('isISODate', () => {
it('returns true for valid ISO date format YYYY-MM-DD', () => {
expect(isISODate('2024-01-15')).toBe(true);
Expand Down Expand Up @@ -56,4 +78,57 @@ describe('library.service', () => {
expect(isISODate('2024-13-01')).toBe(true); // invalid month but correct format
});
});

describe('dual-write to Elasticsearch', () => {
it('insertAlbum calls indexLibraryDocumentById with the inserted album ID', async () => {
mockReturning.mockResolvedValue([{ id: 99, album_title: 'Segundo' }]);

await insertAlbum({ artist_id: 1, genre_id: 1, format_id: 1, album_title: 'Segundo', label: 'Domino', code_number: 1 });

expect(mockIndexLibraryDocumentById).toHaveBeenCalledWith(99);
});

it('addToRotation calls indexLibraryDocumentById with the album_id', async () => {
mockReturning.mockResolvedValue([{ id: 5, album_id: 42, rotation_bin: 'H' }]);

await addToRotation({ album_id: 42, rotation_bin: 'H' });

expect(mockIndexLibraryDocumentById).toHaveBeenCalledWith(42);
});

it('killRotationInDB calls indexLibraryDocumentById with the album_id', async () => {
mockReturning.mockResolvedValue([{ id: 5, album_id: 42, kill_date: '2024-06-01' }]);

await killRotationInDB(5, '2024-06-01');

expect(mockIndexLibraryDocumentById).toHaveBeenCalledWith(42);
});

it('insertAlbum still returns normally when sync fails', async () => {
mockReturning.mockResolvedValue([{ id: 100, album_title: 'Moon Pix' }]);
mockIndexLibraryDocumentById.mockRejectedValue(new Error('ES down'));

const result = await insertAlbum({ artist_id: 2, genre_id: 1, format_id: 1, album_title: 'Moon Pix', label: 'Matador Records', code_number: 1 });

expect(result).toEqual({ id: 100, album_title: 'Moon Pix' });
});

it('addToRotation still returns normally when sync fails', async () => {
mockReturning.mockResolvedValue([{ id: 6, album_id: 43, rotation_bin: 'S' }]);
mockIndexLibraryDocumentById.mockRejectedValue(new Error('ES down'));

const result = await addToRotation({ album_id: 43, rotation_bin: 'S' });

expect(result).toEqual({ id: 6, album_id: 43, rotation_bin: 'S' });
});

it('killRotationInDB still returns normally when sync fails', async () => {
mockReturning.mockResolvedValue([{ id: 7, album_id: 44, kill_date: '2024-06-01' }]);
mockIndexLibraryDocumentById.mockRejectedValue(new Error('ES down'));

const result = await killRotationInDB(7, '2024-06-01');

expect(result).toEqual({ id: 7, album_id: 44, kill_date: '2024-06-01' });
});
});
});
Loading