Skip to content

Commit 23b81dd

Browse files
SOIVclaude
andcommitted
feat(core): Core Scheduler 구현 (node-cron 기반)
- plugins/scheduler/types.ts: TaskDefinition·TaskRecord·TaskInfo 타입 - plugins/scheduler/index.ts: Scheduler 싱글턴 엔진 - register / unregister / runNow / toggle / stopAll - 중복 실행 방지 (runningTasks Set) - 재시도 정책 (retries · retryDelay) - 실행 로그 DB 저장 (scheduler_logs) - 타임존 지원 (기본: Asia/Seoul) - graceful shutdown 연동 (index.ts) - 007_scheduler_logs.sql: 실행 로그 테이블 - AppServices에 scheduler 추가, initServices()에서 초기화 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 212e799 commit 23b81dd

7 files changed

Lines changed: 369 additions & 1 deletion

File tree

apps/api/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"devDependencies": {
1515
"@types/cors": "^2.8.17",
1616
"@types/express": "^5.0.3",
17+
"@types/node-cron": "^3.0.11",
1718
"tsx": "^4.21.0",
1819
"vitest": "^2.1.9"
1920
},
@@ -22,6 +23,7 @@
2223
"cors": "^2.8.5",
2324
"dotenv": "^17.3.1",
2425
"express": "^5.1.0",
26+
"node-cron": "^3.0.3",
2527
"zod": "^3.25.28"
2628
}
2729
}

apps/api/src/app.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { errorHandler } from './middleware/error';
2222
import { requestLogger } from './middleware/logger';
2323
import { EventBus } from './event-bus';
2424
import { ModuleRegistry } from './module-registry';
25+
import { Scheduler } from './plugins/scheduler/index';
2526
import { ServiceRegistry } from './service-registry';
2627
import { createAdminRouter } from './routes/admin';
2728
import { createAuthRouter } from './routes/auth';
@@ -45,6 +46,7 @@ export interface AppServices {
4546
db: DbProvider;
4647
eventBus: EventBus;
4748
serviceRegistry: ServiceRegistry;
49+
scheduler: Scheduler;
4850
}
4951

5052
export function createApp(services?: AppServices) {
@@ -196,6 +198,8 @@ export async function initServices(db: DbProvider): Promise<AppServices> {
196198

197199
const eventBus = EventBus.getInstance();
198200
const serviceRegistry = ServiceRegistry.getInstance();
201+
const scheduler = Scheduler.getInstance();
202+
scheduler.setDb(db);
199203

200-
return { jwtManager, whitelist, adminPin, totpService, userAuth, sharedLink, settings, db, eventBus, serviceRegistry };
204+
return { jwtManager, whitelist, adminPin, totpService, userAuth, sharedLink, settings, db, eventBus, serviceRegistry, scheduler };
201205
}

apps/api/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ async function startApp() {
156156
// ── Graceful shutdown ─────────────────────────────────────────
157157
const shutdown = () => {
158158
log.info('api', `shutting down…`);
159+
if (services) {
160+
services.scheduler.stopAll();
161+
}
159162
server.close(() => {
160163
if (services) {
161164
services.db.disconnect().finally(() => process.exit(0));
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- ── 007_scheduler_logs.sql ───────────────────────────────────────
2+
-- Core Scheduler 실행 로그 테이블.
3+
-- 모든 스케줄 작업의 실행 결과(성공·실패·소요 시간·에러)를 기록한다.
4+
5+
CREATE TABLE IF NOT EXISTS scheduler_logs (
6+
id {{UUID_PRIMARY_KEY}},
7+
task_name TEXT NOT NULL,
8+
executed_at TIMESTAMPTZ NOT NULL DEFAULT {{NOW}},
9+
success BOOLEAN NOT NULL DEFAULT {{BOOLEAN_FALSE}},
10+
duration_ms INTEGER NOT NULL,
11+
error TEXT,
12+
created_at TIMESTAMPTZ NOT NULL DEFAULT {{NOW}}
13+
);
14+
15+
CREATE INDEX IF NOT EXISTS scheduler_logs_task_name_idx
16+
ON scheduler_logs (task_name);
17+
18+
CREATE INDEX IF NOT EXISTS scheduler_logs_executed_at_idx
19+
ON scheduler_logs (executed_at DESC);
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
import cron from 'node-cron';
2+
import type { ScheduledTask } from 'node-cron';
3+
4+
import type { DbProvider } from '@fieldstack/core' with { 'resolution-mode': 'import' };
5+
6+
import { log } from '../../middleware/logger';
7+
import type { TaskDefinition, TaskInfo, TaskRecord } from './types';
8+
9+
const DEFAULT_TIMEZONE = 'Asia/Seoul';
10+
11+
// ── Scheduler 싱글턴 ──────────────────────────────────────────────
12+
//
13+
// 모듈이 주기적 작업을 등록하고 실행할 수 있는 코어 인프라.
14+
// AppServices를 통해 주입되므로 모듈은 @fieldstack/core를 직접 import하지 않아도 된다.
15+
//
16+
// 사용 예:
17+
// // 모듈 초기화 시
18+
// services.scheduler.register({
19+
// name: 'subscription-payment-check',
20+
// cronExpr: '0 0 * * *',
21+
// handler: async () => { ... },
22+
// });
23+
//
24+
// // 모듈 종료 시
25+
// services.scheduler.unregister('subscription-payment-check');
26+
27+
export class Scheduler {
28+
private static _instance: Scheduler | null = null;
29+
30+
private readonly tasks = new Map<string, TaskRecord>();
31+
private readonly cronJobs = new Map<string, ScheduledTask>();
32+
private readonly runningTasks = new Set<string>();
33+
34+
private db: DbProvider | null = null;
35+
36+
private constructor() {}
37+
38+
public static getInstance(): Scheduler {
39+
if (!Scheduler._instance) {
40+
Scheduler._instance = new Scheduler();
41+
}
42+
return Scheduler._instance;
43+
}
44+
45+
/** DB 프로바이더 주입 (initServices에서 호출) */
46+
public setDb(db: DbProvider): void {
47+
this.db = db;
48+
}
49+
50+
// ── 등록 / 해제 ────────────────────────────────────────────────
51+
52+
public register(def: TaskDefinition): void {
53+
if (this.tasks.has(def.name)) {
54+
log.warn('scheduler', `task "${def.name}" already registered — skipping`);
55+
return;
56+
}
57+
58+
if (!cron.validate(def.cronExpr)) {
59+
log.error('scheduler', `task "${def.name}" has invalid cron expression: "${def.cronExpr}"`);
60+
return;
61+
}
62+
63+
const record: TaskRecord = {
64+
name: def.name,
65+
cronExpr: def.cronExpr,
66+
handler: def.handler,
67+
enabled: def.enabled ?? true,
68+
timezone: def.timezone ?? DEFAULT_TIMEZONE,
69+
retries: def.retries ?? 0,
70+
retryDelay: def.retryDelay ?? 5000,
71+
onError: def.onError,
72+
registeredAt: new Date().toISOString(),
73+
};
74+
75+
this.tasks.set(def.name, record);
76+
77+
if (record.enabled) {
78+
this.startJob(record);
79+
}
80+
81+
log.success('scheduler', `task "${def.name}" registered (${def.cronExpr})`);
82+
}
83+
84+
public unregister(name: string): boolean {
85+
const job = this.cronJobs.get(name);
86+
if (job) {
87+
job.stop();
88+
this.cronJobs.delete(name);
89+
}
90+
91+
const existed = this.tasks.delete(name);
92+
if (existed) {
93+
log.info('scheduler', `task "${name}" unregistered`);
94+
}
95+
return existed;
96+
}
97+
98+
// ── 제어 ───────────────────────────────────────────────────────
99+
100+
/** 스케줄과 무관하게 즉시 실행 */
101+
public async runNow(name: string): Promise<void> {
102+
const record = this.tasks.get(name);
103+
if (!record) {
104+
throw new Error(`scheduler: task "${name}" not found`);
105+
}
106+
await this.execute(record);
107+
}
108+
109+
/** 활성화/비활성화 토글 */
110+
public toggle(name: string, enabled: boolean): boolean {
111+
const record = this.tasks.get(name);
112+
if (!record) return false;
113+
114+
record.enabled = enabled;
115+
116+
if (enabled) {
117+
if (!this.cronJobs.has(name)) {
118+
this.startJob(record);
119+
}
120+
log.info('scheduler', `task "${name}" enabled`);
121+
} else {
122+
const job = this.cronJobs.get(name);
123+
if (job) {
124+
job.stop();
125+
this.cronJobs.delete(name);
126+
}
127+
log.info('scheduler', `task "${name}" disabled`);
128+
}
129+
130+
return true;
131+
}
132+
133+
/** graceful shutdown — 모든 cron 작업 중지 */
134+
public stopAll(): void {
135+
for (const [name, job] of this.cronJobs) {
136+
job.stop();
137+
log.info('scheduler', `task "${name}" stopped`);
138+
}
139+
this.cronJobs.clear();
140+
log.info('scheduler', `all tasks stopped`);
141+
}
142+
143+
// ── 조회 ───────────────────────────────────────────────────────
144+
145+
public list(): TaskInfo[] {
146+
return Array.from(this.tasks.values()).map((r) => ({
147+
name: r.name,
148+
cronExpr: r.cronExpr,
149+
enabled: r.enabled,
150+
timezone: r.timezone,
151+
registeredAt: r.registeredAt,
152+
}));
153+
}
154+
155+
public getTask(name: string): TaskInfo | undefined {
156+
const r = this.tasks.get(name);
157+
if (!r) return undefined;
158+
return {
159+
name: r.name,
160+
cronExpr: r.cronExpr,
161+
enabled: r.enabled,
162+
timezone: r.timezone,
163+
registeredAt: r.registeredAt,
164+
};
165+
}
166+
167+
/** 특정 작업의 최근 실행 로그 조회 */
168+
public async getLogs(name: string, limit = 100): Promise<unknown[]> {
169+
if (!this.db) return [];
170+
try {
171+
const rows = await this.db.query(
172+
`SELECT task_name, executed_at, success, duration_ms, error
173+
FROM scheduler_logs
174+
WHERE task_name = $1
175+
ORDER BY executed_at DESC
176+
LIMIT $2`,
177+
[name, limit],
178+
);
179+
return rows;
180+
} catch {
181+
return [];
182+
}
183+
}
184+
185+
// ── 내부 ───────────────────────────────────────────────────────
186+
187+
private startJob(record: TaskRecord): void {
188+
const job = cron.schedule(
189+
record.cronExpr,
190+
async () => {
191+
await this.execute(record);
192+
},
193+
{
194+
timezone: record.timezone,
195+
runOnInit: false,
196+
},
197+
);
198+
this.cronJobs.set(record.name, job);
199+
}
200+
201+
private async execute(record: TaskRecord): Promise<void> {
202+
if (this.runningTasks.has(record.name)) {
203+
log.warn('scheduler', `task "${record.name}" already running — skipping`);
204+
return;
205+
}
206+
207+
this.runningTasks.add(record.name);
208+
const startedAt = Date.now();
209+
let success = false;
210+
let errorMessage: string | null = null;
211+
212+
try {
213+
await this.executeWithRetry(record);
214+
success = true;
215+
log.success('scheduler', `task "${record.name}" completed (${Date.now() - startedAt}ms)`);
216+
} catch (err) {
217+
errorMessage = err instanceof Error ? err.message : String(err);
218+
log.error('scheduler', `task "${record.name}" failed: ${errorMessage}`);
219+
record.onError?.(err instanceof Error ? err : new Error(errorMessage));
220+
} finally {
221+
this.runningTasks.delete(record.name);
222+
await this.writeLog(record.name, startedAt, success, errorMessage);
223+
}
224+
}
225+
226+
private async executeWithRetry(record: TaskRecord): Promise<void> {
227+
let lastError: Error | undefined;
228+
229+
for (let attempt = 0; attempt <= record.retries; attempt++) {
230+
try {
231+
await record.handler();
232+
return;
233+
} catch (err) {
234+
lastError = err instanceof Error ? err : new Error(String(err));
235+
if (attempt < record.retries) {
236+
log.warn(
237+
'scheduler',
238+
`task "${record.name}" attempt ${attempt + 1}/${record.retries + 1} failed — retrying in ${record.retryDelay}ms`,
239+
);
240+
await delay(record.retryDelay);
241+
}
242+
}
243+
}
244+
245+
throw lastError;
246+
}
247+
248+
private async writeLog(
249+
taskName: string,
250+
startedAt: number,
251+
success: boolean,
252+
error: string | null,
253+
): Promise<void> {
254+
if (!this.db) return;
255+
try {
256+
const durationMs = Date.now() - startedAt;
257+
await this.db.query(
258+
`INSERT INTO scheduler_logs (task_name, success, duration_ms, error)
259+
VALUES ($1, $2, $3, $4)`,
260+
[taskName, success, durationMs, error],
261+
);
262+
} catch (err) {
263+
log.warn('scheduler', `failed to write log for task "${taskName}": ${String(err)}`);
264+
}
265+
}
266+
}
267+
268+
function delay(ms: number): Promise<void> {
269+
return new Promise((resolve) => setTimeout(resolve, ms));
270+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// ── Core Scheduler 타입 정의 ──────────────────────────────────────
2+
3+
export interface TaskDefinition {
4+
/** 고유 작업 이름 (중복 등록 불가) */
5+
name: string;
6+
/** Cron 표현식 (5-field: 분 시 일 월 요일) */
7+
cronExpr: string;
8+
/** 실행할 핸들러 */
9+
handler: () => Promise<void> | void;
10+
/** 활성화 여부 (기본: true) */
11+
enabled?: boolean;
12+
/** 타임존 (기본: Asia/Seoul) */
13+
timezone?: string;
14+
/** 실패 시 최대 재시도 횟수 (기본: 0) */
15+
retries?: number;
16+
/** 재시도 간 대기 시간 ms (기본: 5000) */
17+
retryDelay?: number;
18+
/** 모든 재시도 실패 후 호출되는 콜백 */
19+
onError?: (error: Error) => void;
20+
}
21+
22+
export interface TaskRecord {
23+
name: string;
24+
cronExpr: string;
25+
handler: () => Promise<void> | void;
26+
enabled: boolean;
27+
timezone: string;
28+
retries: number;
29+
retryDelay: number;
30+
onError?: (error: Error) => void;
31+
registeredAt: string;
32+
}
33+
34+
export interface SchedulerLogEntry {
35+
taskName: string;
36+
executedAt: string;
37+
success: boolean;
38+
durationMs: number;
39+
error: string | null;
40+
}
41+
42+
export interface TaskInfo {
43+
name: string;
44+
cronExpr: string;
45+
enabled: boolean;
46+
timezone: string;
47+
registeredAt: string;
48+
}

0 commit comments

Comments
 (0)