From 149e10399c1de2468655083e22a33202d8053dc9 Mon Sep 17 00:00:00 2001 From: kilhyeonjun Date: Mon, 15 Dec 2025 21:02:23 +0900 Subject: [PATCH] =?UTF-8?q?docs:=20Chapter=2013=20=EB=A9=94=EC=8B=9C?= =?UTF-8?q?=EC=A7=95=EA=B3=BC=20=ED=86=B5=ED=95=A9=20=ED=8C=A8=ED=84=B4=20?= =?UTF-8?q?=EB=B0=9C=ED=91=9C=20=EC=9E=90=EB=A3=8C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PRESENTATION.md: 발표 슬라이드 (Pub/Sub, 작업 분배, 요청/응답) - README.md: 챕터 요약 및 Mermaid 다이어그램 - DEEP-DIVE.md: At-least-once 처리와 중복 처리 전략 딥다이브 - code/: 예제 코드 및 연습문제 구현 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- chapter13/kilhyeonjun/DEEP-DIVE.md | 518 +++++++++++++++ chapter13/kilhyeonjun/PRESENTATION.md | 278 ++++++++ chapter13/kilhyeonjun/README.md | 617 ++++++++++++++++++ .../code/01-request-reply-process.js | 164 +++++ .../code/02-correlation-id-pattern.js | 175 +++++ chapter13/kilhyeonjun/code/README.md | 65 ++ .../code/exercises/13.3-stop-workers.js | 228 +++++++ .../code/exercises/13.5-data-collector.js | 304 +++++++++ 8 files changed, 2349 insertions(+) create mode 100644 chapter13/kilhyeonjun/DEEP-DIVE.md create mode 100644 chapter13/kilhyeonjun/PRESENTATION.md create mode 100644 chapter13/kilhyeonjun/README.md create mode 100644 chapter13/kilhyeonjun/code/01-request-reply-process.js create mode 100644 chapter13/kilhyeonjun/code/02-correlation-id-pattern.js create mode 100644 chapter13/kilhyeonjun/code/README.md create mode 100644 chapter13/kilhyeonjun/code/exercises/13.3-stop-workers.js create mode 100644 chapter13/kilhyeonjun/code/exercises/13.5-data-collector.js diff --git a/chapter13/kilhyeonjun/DEEP-DIVE.md b/chapter13/kilhyeonjun/DEEP-DIVE.md new file mode 100644 index 0000000..8b0a82d --- /dev/null +++ b/chapter13/kilhyeonjun/DEEP-DIVE.md @@ -0,0 +1,518 @@ +# 딥다이브: At-least-once 처리와 중복 처리 전략 + +> **발표자**: 길현준 +> **주제**: SQS + Lambda 환경에서의 메시지 중복 처리 방지 + +--- + +## "왜 메시지는 두 번 처리될 수 있을까?" + +--- + +## 이 딥다이브의 목표 + +> **"SQS + Lambda 환경에서 메시지가 왜 중복 처리될 수 있는지 이해하고, +> 그걸 어떻게 안전하게 설계해야 하는지 안다."** + +--- + +## 먼저 결론부터 + +> **메시지는 한 번만 처리된다고 보장되지 않는다** +> 대신 **최소 한 번(at-least-once)** 처리된다 + +그래서 **중복 처리를 전제로 설계해야 한다** + +--- + +## 일반적인 이벤트 처리 구조 + +```mermaid +flowchart LR + E[이벤트 발생] --> SQS[[SQS]] + SQS --> L[Lambda] + L --> DB[(Database)] +``` + +이 구조에서 중복 처리는: +- ❌ 버그가 아니라 +- ✅ **정상 동작** + +--- + +## Part 1: At-least-once란? + +### 의미 + +> **"최소 한 번은 실행된다"** + +- 한 번일 수도 있고 +- 두 번일 수도 있고 +- (드물지만) 그 이상일 수도 있다 + +**중요**: "정확히 한 번(exactly-once)"이 아님 + +--- + +## 왜 중복 처리가 발생할까? + +```mermaid +sequenceDiagram + participant SQS + participant Lambda + participant DB + + SQS->>Lambda: 메시지 전달 + Lambda->>DB: 처리 시도 + Note over Lambda: 타임아웃 발생! + Lambda--xSQS: (응답 없음) + SQS->>Lambda: 같은 메시지 재전달 + Lambda->>DB: 중복 처리 위험! +``` + +--- + +### 원인 1: Lambda가 처리 중 실패하는 경우 + +- 실행 중 에러 발생 +- 타임아웃 +- 메모리 초과 +- 외부 API 장애 + +SQS 입장에서는: +> "이 메시지, 처리 안 된 것 같은데?" + +→ 다시 전달 + +--- + +### 원인 2: Visibility Timeout 문제 + +- Lambda가 메시지를 가져가면 +- 해당 메시지는 잠시 **보이지 않게(hidden)** 됨 +- 이 시간이 **Visibility Timeout** + +이 시간 안에: +- 처리 완료 ❌ +- 메시지 삭제 ❌ + +→ **다른 Lambda가 같은 메시지를 다시 가져감** + +--- + +### 원인 3: Lambda는 재실행될 수 있다 + +- 같은 메시지 +- 같은 코드 +- 다른 실행 환경 + +→ **중복 실행은 설계상 허용된 동작** + +--- + +## 실제로 벌어지는 사고들 + +- 결제 두 번 처리 +- 이메일 두 번 발송 +- 포인트 두 번 적립 +- 쿠폰 중복 사용 + +로그는 정상, 비즈니스만 망가짐 + +--- + +## 흔한 오해 + +> "에러 안 나게 잘 짜면 되지 않나요?" + +❌ **불가능** + +- 네트워크는 항상 실패함 +- Lambda는 언제든 종료됨 +- AWS도 완벽하지 않음 + +→ **실패는 전제 조건** +→ **중복을 견디는 설계가 필요** + +--- + +## Part 1.5: "FIFO 쓰면 해결 아닌가요?" + +### 잠깐, SQS에는 두 가지 타입이 있다 + +| 특성 | Standard | FIFO | +|------|----------|------| +| 처리량 | **무제한** | 300-3,000 TPS | +| 전달 보장 | At-least-once | **Exactly-once** | +| 순서 | Best-effort | First-In-First-Out | +| 중복 제거 | 없음 | **5분 deduplication** | +| 큐 이름 | `my-queue` | `my-queue.fifo` | + +--- + +### FIFO의 Exactly-once, 어떻게 작동할까? + +**핵심 메커니즘:** + +1. **MessageDeduplicationId** + - 발행 시 고유 ID 지정 + - 5분 내 동일 ID 메시지 → **발행 자체가 차단됨** + +2. **Content-based Deduplication** + - 메시지 본문의 SHA-256 해시로 자동 중복 제거 + - 설정으로 활성화 가능 + +3. **Message Group ID** + - 그룹 내 순서 보장 + - 다른 그룹은 병렬 처리 가능 + +```mermaid +flowchart LR + subgraph Standard["Standard Queue"] + P1[Producer] --> |메시지 A| Q1[[Queue]] + P1 --> |메시지 A 재전송| Q1 + Q1 --> |메시지 A| C1[Consumer] + Q1 --> |메시지 A 중복!| C1 + end +``` + +```mermaid +flowchart LR + subgraph FIFO["FIFO Queue"] + P2[Producer] --> |"메시지 A (ID: 123)"| Q2[["Queue.fifo"]] + P2 -.-> |"메시지 A 재전송 (ID: 123)"| Q2 + Q2 --> |메시지 A 한 번만| C2[Consumer] + end +``` + +--- + +### 그럼 FIFO 쓰면 중복 걱정 끝? + +## ❌ 아니다! + +**AWS 공식 문서:** +> "if the function encounters an error while processing a batch, all messages become visible again after the visibility timeout expires, **which can result in duplicate processing**" + +--- + +### FIFO의 exactly-once는 **발행자 측**만! + +```mermaid +sequenceDiagram + participant Producer + participant FIFO as FIFO Queue + participant Lambda + participant DB + + Producer->>FIFO: 메시지 발행 (ID: 123) + Producer->>FIFO: 메시지 재발행 (ID: 123) + Note over FIFO: 5분 내 중복 차단!
두 번째 발행 무시 + + FIFO->>Lambda: 메시지 전달 + Lambda->>DB: 처리 시도 + Note over Lambda: 타임아웃 발생! + Lambda--xFIFO: (응답 없음) + + Note over FIFO: visibility timeout 후
메시지 다시 보임 + FIFO->>Lambda: 같은 메시지 재전달 + Note over Lambda,DB: 소비자 측 중복 처리!
→ 멱등성 여전히 필요 +``` + +--- + +### 정리: FIFO가 해결하는 것 vs 안 하는 것 + +| 구간 | FIFO가 해결? | 설명 | +|------|-------------|------| +| **발행자 → 큐** | ✅ Yes | 5분 내 동일 ID 중복 발행 차단 | +| **큐 → 소비자** | ❌ No | Lambda 실패 시 재전달 (at-least-once) | +| **소비자 내부** | ❌ No | 비즈니스 로직 중복 실행 가능 | + +--- + +### 언제 FIFO를 선택할까? + +| 선택 | 상황 | +|------|------| +| **Standard** | 높은 처리량 필요 (수천 TPS 이상), 멱등성 구현 가능 | +| **FIFO** | 순서가 중요, 처리량 낮음, **발행 측** 중복 방지 필요 | + +**결론:** +> **FIFO를 써도 소비자 측 멱등성 구현은 필수!** +> Insert First 패턴, Deduplication은 여전히 필요하다. + +--- + +## Part 2: 해결 전략 + +--- + +## 해결 전략 1: Idempotency (멱등성) + +### 정의 + +> **"같은 작업을 여러 번 실행해도 +> 결과는 한 번 실행한 것과 같아야 한다"** + +--- + +### 예시 (결제 처리) + +**멱등하지 않은 방식** + +``` +결제 요청 → 결제 처리 +``` + +메시지가 두 번 오면 → 결제도 두 번 됨 + +--- + +**멱등한 방식** + +``` +결제 요청 (paymentId) +→ 이 paymentId를 이미 처리했는지 확인 +→ 처음이면 처리 +→ 이미 처리했으면 무시 +``` + +핵심은 **고유 ID** + +--- + +### 구현 아이디어 + +- 메시지에 `eventId`, `paymentId`, `requestId` +- DynamoDB / Redis / RDB에 처리 기록 저장 +- 처리 전 반드시 "이미 처리됨?" 확인 + +--- + +## 해결 전략 2: Deduplication (중복 제거) + +### 개념 + +> **"이미 처리한 메시지는 다시 처리하지 않는다"** + +--- + +### 처리 흐름 + +1. 메시지 수신 +2. 고유 ID 확인 +3. 이미 처리됨? + - Yes → skip + - No → 처리 + 기록 + +모든 작업의 **시작 지점**에서 수행 + +--- + +## 해결 전략 3: 메시지를 믿지 말고 결과를 믿기 + +> ❌ "이 메시지는 한 번만 올 거야" +> ✅ "여러 번 와도 결과는 한 번만 반영되면 된다" + +사고방식 전환이 핵심 + +--- + +## Part 3: 실무 구현 가이드 + +--- + +## Insert First 패턴 (핵심!) + +> **"먼저 삽입을 시도하고, 실패하면 기존 레코드를 확인한다"** + +### 왜 이 패턴인가? + +일반적인 방식 (Select → Insert): +``` +1. SELECT로 존재 여부 확인 +2. 없으면 INSERT +``` +문제: 동시에 두 요청이 오면 **둘 다 없다고 판단**하고 INSERT 시도 + +--- + +### Insert First 패턴 + +```mermaid +flowchart TB + Start[메시지 수신] --> Insert["1. INSERT 시도
(eventId + PROCESSING)"] + Insert --> Success{성공?} + Success -->|Yes| Process[비즈니스 로직 실행] + Success -->|"No
(중복 키 에러)"| Query["2. 기존 레코드 조회
(pessimistic lock)"] + Query --> Check{상태 확인} + Check -->|"PROCESSING
COMPLETED"| Skip[이미 처리됨 - Skip] + Check -->|FAILED| Retry[재처리 로직] + Process --> Complete["상태 → COMPLETED"] +``` + +--- + +### 구현 예시 (의사 코드) + +```typescript +async function processMessage(eventId: string, messageId: string) { + // 1. INSERT 시도 (Insert First) + try { + await db.insert({ + eventId, + messageId, + status: 'PROCESSING', + }); + } catch (error) { + // 2. 중복 키 에러인 경우 + if (isDuplicateKeyError(error)) { + const existing = await db.findOne({ + eventId, + lock: 'FOR UPDATE', // Pessimistic Lock + }); + + // 이미 처리 중이거나 완료됨 + if (['PROCESSING', 'COMPLETED'].includes(existing.status)) { + throw new AlreadyProcessedError(eventId); + } + + // FAILED 상태면 재처리 가능 + // (정책에 따라 다름) + } + throw error; + } + + // 3. 비즈니스 로직 실행 + try { + await executeBusinessLogic(); + await db.update(eventId, { status: 'COMPLETED' }); + } catch (error) { + await db.update(eventId, { + status: 'FAILED', + failureReason: error.message, + }); + throw error; + } +} +``` + +--- + +### 핵심 포인트 + +| 요소 | 설명 | +|------|------| +| **Unique 제약** | `eventId`에 UNIQUE 인덱스 필수 | +| **Insert First** | SELECT 없이 바로 INSERT 시도 | +| **Pessimistic Lock** | 기존 레코드 조회 시 `FOR UPDATE` | +| **상태 머신** | PROCESSING → COMPLETED / FAILED | + +--- + +### 상태 전이 + +```mermaid +stateDiagram-v2 + [*] --> PROCESSING: INSERT 성공 + PROCESSING --> COMPLETED: 처리 성공 + PROCESSING --> FAILED: 처리 실패 + FAILED --> PROCESSING: 재처리 시도 + COMPLETED --> [*]: 완료 +``` + +--- + +## 중복 처리 방지 아키텍처 + +```mermaid +flowchart LR + E[이벤트 발생] --> Q[[SQS]] + + subgraph Consumer["Consumer (소비자)"] + Q --> L[Lambda] + L --> |Insert First| SUB[(처리 기록)] + SUB --> |중복 체크 통과| P[비즈니스 로직] + P --> |완료| UPDATE[상태 업데이트] + end +``` + +| 단계 | 동작 | 목적 | +|------|------|------| +| **1. Insert First** | 처리 기록 삽입 시도 | 동시 요청 차단 | +| **2. 중복 체크** | 기존 상태 확인 | 이미 처리된 메시지 건너뛰기 | +| **3. 비즈니스 로직** | 실제 작업 수행 | 핵심 처리 | +| **4. 상태 업데이트** | COMPLETED로 변경 | 완료 기록 | + +--- + +## Part 4: 스터디 토론 + +--- + +## 핵심 요약 + +```markdown +## At-least-once 핵심 정리 + +- SQS + Lambda = at-least-once +- 중복 처리는 정상 동작 +- FIFO 써도 소비자 측은 여전히 at-least-once! +- 반드시 필요한 설계: + 1. Idempotency (멱등성) + 2. Deduplication (중복 제거) + 3. Insert First 패턴 (동시성 제어) +- 메시지를 믿지 말고, 결과를 믿자 +``` + +--- + +## 핵심 메시지 + +> **분산 시스템에서 가장 무서운 버그는 +> 실패가 아니라 '중복 성공'이다** + +- 실패 → 재시도로 해결 가능 +- 중복 성공 → 돈, 데이터, 신뢰 손실 + +--- + +## 스터디 질문 + +1. **위험한 작업 식별** + - 두 번 실행되면 위험한 작업은? + - (결제, 포인트 적립, 외부 API 호출 등) + +2. **현재 코드 점검** + - 멱등하지 않은 로직은 어디? + - Insert First 패턴이 적용되어 있는가? + +3. **고유 ID 전략** + - 어떤 ID를 기준으로 중복을 판단할까? + - messageId vs eventId vs 비즈니스 ID + +4. **FIFO vs Standard 선택** + - 현재 서비스에서 순서가 중요한 이벤트는? + - 처리량 요구사항은 FIFO 한계(3,000 TPS) 이내인가? + +--- + +## Chapter 13과의 연결 + +| 발표 내용 | 딥다이브 연결 | +|----------|--------------| +| 메시지 전달 보장 | At-least-once 개념 | +| 작업 분배 패턴 | SQS + Lambda 구조 | +| 상관 식별자 | eventId 기반 중복 방지 | +| 브로커 기반 아키텍처 | SQS가 메시지 브로커 역할 | + +--- + +## 마지막 한 문장 + +> **"한 번만 실행된다는 가정이 +> 가장 위험한 가정이다."** +> +> **"FIFO를 쓴다고 안심하지 마라. +> 소비자 측 멱등성은 여전히 필수다."** diff --git a/chapter13/kilhyeonjun/PRESENTATION.md b/chapter13/kilhyeonjun/PRESENTATION.md new file mode 100644 index 0000000..56087e1 --- /dev/null +++ b/chapter13/kilhyeonjun/PRESENTATION.md @@ -0,0 +1,278 @@ +# Chapter 13: 메시징과 통합 패턴 + +> **발표자**: 길현준 +> **발표일**: 2025-12-15 +> **주제**: 분산 시스템을 위한 메시징 시스템과 통합 패턴 + +## 메시징과 통합 패턴 + +### 분산 시스템은 어떻게 대화할까? + +--- + +## 오늘 발표의 목표 + +> **"여러 컴퓨터가 어떻게 서로 일을 나눠서 처리하는지 이해한다"** + +- 메시징이 왜 필요한지 +- 대표적인 3가지 대화 방식 +- 언제 어떤 방식을 쓰면 좋은지 + +❌ 기술 암기 +✅ 상황에 맞는 선택 기준 이해 + +--- + +## 왜 메시징이 필요한가? + +### 문제 상황 + +- 서비스가 하나일 때 → 함수 호출 +- 서비스가 여러 개일 때 → ? + +> A 서비스가 B 서비스에게 계속 직접 물어본다면? + +- B가 느리면 A도 멈춤 +- B가 죽으면 A도 멈춤 +- 여러 서비스면 관리 지옥 + +--- + +## 해결 방법: 메시지 + +### 메시지란? + +> **컴퓨터가 주고받는 편지** + +- 상대가 지금 없어도 보낼 수 있음 +- 바로 답장 안 와도 괜찮음 +- 누가 받을지 몰라도 됨 + +📞 전화 ❌ +✉️ 편지 ⭕ + +--- + +## 메시지의 3가지 목적 + +### 1️⃣ 명령 (Command) + +> "이거 해줘" + +- 한 명만 실행 +- 예: 주문 생성, 작업 실행 + +--- + +### 2️⃣ 이벤트 (Event) + +> "이런 일이 있었다" + +- 여러 명이 알아도 됨 +- 예: 결제 완료, 로그인 성공 + +--- + +### 3️⃣ 문서 (Document) + +> "이 데이터야" + +- 그냥 정보 전달 +- 예: 조회 결과 + +⚠️ 이 구분이 무너지면 설계가 꼬임 + +--- + +## 메시지는 어떻게 전달될까? + +### 방법은 두 가지 + +--- + +## ① 직접 통신 (P2P) + +- 서비스끼리 직접 연결 +- 빠름 +- 하지만: + - 연결 관리 어려움 + - 한 곳 문제 나면 영향 큼 + +👉 소규모, 고성능 환경에 적합 + +--- + +## ② 중간에 우체국 두기 (브로커) + +- 메시지를 대신 전달해주는 중간 시스템 +- 장점: + - 저장 가능 + - 여러 명에게 전달 가능 + - 시스템 분리 쉬움 + +👉 대부분의 실무 시스템 + +--- + +# 핵심 패턴 ① + +## 게시 / 구독 (Publish / Subscribe) + +--- + +## Pub/Sub이란? + +> **방송국 모델** + +- 보내는 쪽: 방송만 함 +- 받는 쪽: 듣고 싶은 사람만 듣기 + +보내는 쪽은 +❌ 누가 듣는지 모름 +❌ 몇 명이 듣는지 모름 + +```mermaid +flowchart LR + P[📻 방송국] --> S1[📱 청취자] + P --> S2[📱 청취자] + P --> S3[📱 청취자] +``` + +--- + +## Pub/Sub 특징 + +- 이벤트 전달에 최적 +- 느슨한 결합 +- 확장 쉬움 + +### 예시 + +- 결제 완료 알림 +- 실시간 알림 +- 상태 변경 전파 + +👉 **이벤트 = Pub/Sub** + +--- + +# 핵심 패턴 ② + +## 작업 분배 (Task Distribution) + +--- + +## 작업 분배란? + +> **일감을 여러 사람에게 나눠주는 방식** + +- 작업은 줄에 쌓임 +- 작업자는 하나씩 가져감 +- 한 작업은 한 명만 처리 + +🍕 배달 주문 분배와 동일 + +```mermaid +flowchart LR + Q[["🍕 주문 대기열"]] --> W1[🛵 배달원 1] + Q --> W2[🛵 배달원 2] + Q --> W3[🛵 배달원 3] +``` + +--- + +## 작업 분배 특징 + +- 병렬 처리 +- 자동 분산 +- 처리량 증가 + +### 예시 + +- 이미지 변환 +- 이메일 발송 +- 대량 계산 + +👉 **작업 = 경쟁 소비자** + +--- + +# 핵심 패턴 ③ + +## 요청 / 응답 (Request / Reply) + +--- + +## 요청 / 응답의 문제 + +> "질문 여러 개 보냈는데, +> 답이 뒤섞여서 온다" + +비동기 환경에서는: + +- 응답 순서 보장 ❌ +- 누가 누구의 답인지 헷갈림 + +--- + +## 해결 방법: 번호표 + +> **요청마다 ID를 붙인다** + +- 요청: "이건 1번 질문" +- 응답: "1번 질문의 답" + +이 번호를 +👉 **상관 ID (Correlation ID)** + +```mermaid +sequenceDiagram + participant C as 🙋 고객 + participant S as 🏪 창구 + C->>S: 질문 (번호표 1) + C->>S: 질문 (번호표 2) + S-->>C: 답변 (2) + S-->>C: 답변 (1) + Note over C,S: 순서가 뒤바뀌어도
번호표로 매칭! +``` + +--- + +## 요청 / 응답 정리 + +- 겉보기엔 동기 +- 실제로는 비동기 +- 질서를 만들어주는 패턴 + +👉 **응답이 꼭 필요할 때 사용** + +--- + +## 기술은 왜 이렇게 많을까? + +중요한 건 기술 ❌ +중요한 건 **상황** + +--- + +## 상황별 선택 가이드 + +| 상황 | 추천 | +|------|------| +| 그냥 알림 | Pub/Sub | +| 작업 나누기 | 작업 분배 | +| 응답 필요 | Request/Reply | +| 메시지 저장 필요 | 스트림 기반 | +| 대규모 처리 | Kafka 계열 | + +--- + +## 오늘의 핵심 요약 + +1. 이벤트 → Pub/Sub +2. 작업 → 분배 +3. 응답 → Request/Reply + +> **분산 시스템은 +> 코드를 잘 짜는 문제가 아니라 +> 대화를 잘 설계하는 문제다** \ No newline at end of file diff --git a/chapter13/kilhyeonjun/README.md b/chapter13/kilhyeonjun/README.md new file mode 100644 index 0000000..15dadfd --- /dev/null +++ b/chapter13/kilhyeonjun/README.md @@ -0,0 +1,617 @@ +# Chapter 13: 메시징과 통합 패턴 + +> **발표자**: 길현준 +> **발표일**: 2025-12-15 +> **주제**: 분산 시스템을 위한 메시징 시스템과 통합 패턴 + +--- + +## 📌 목차 + +1. [개요](#개요) +2. [메시징 시스템 기초](#1-메시징-시스템-기초) +3. [게시/구독 패턴](#2-게시구독-패턴-publishsubscribe) +4. [작업 분배 패턴](#3-작업-분배-패턴-task-distribution) +5. [요청/응답 패턴](#4-요청응답-패턴-requestreply) +6. [요약](#요약) +7. [연습문제](#연습문제) + +--- + +## 개요 + +### 왜 이 챕터가 중요한가? + +**확장성**이 시스템 배포에 관한 것이라면, **통합**은 시스템들을 연결하는 것입니다. 분산 애플리케이션을 통합하는 두 가지 주요 기술: + +1. **공유 스토리지**: 중앙의 중재자와 정보 관리자로 사용 +2. **메시징**: 시스템 노드들에게 데이터, 이벤트 및 명령을 전파 + +메시지는 컴포넌트와 시스템 간에 정보를 교환하는 개별적이고 구조화된 데이터입니다. + +### 핵심 키워드 + +- **Pub/Sub**: 발행자가 메시지를 발행하면 모든 구독자가 수신 +- **작업 분배**: 작업을 여러 워커에게 분산하여 병렬 처리 +- **요청/응답**: 비동기 채널 위에 동기식 통신 추상화 +- **브로커**: 메시지를 중재하는 중앙 집중식 시스템 +- **피어 투 피어**: 노드 간 직접 통신 + +### 이 장은 책의 마지막 장입니다 + +> "이 장은 이 책의 마지막 장입니다. 이제 프로젝트에 적용할 수 있는 패턴과 기술로 가득 찬 일련의 도구들이 여러분의 지식 창고에 존재해야 합니다." +> — *Mario Casciaro와 Luciano Mammino* + +--- + +## 1. 메시징 시스템 기초 + +메시지 및 메시징 시스템에서 고려해야 할 **네 가지 기본 요소**: + +1. **통신 방향**: 단방향 또는 요청/응답 +2. **메시지 목적**: 명령, 이벤트, 문서 +3. **메시지 타이밍**: 동기식 또는 비동기식 +4. **메시지 전달**: 직접(P2P) 또는 브로커 + +### 1-1. 단방향 vs 요청/응답 패턴 + +**단방향 통신** + +```mermaid +flowchart LR + A[메시지 생성자] -->|전달| B[메시지 소비자] +``` + +- 메시지가 소스에서 대상으로 한 방향으로 푸시 +- 예: WebSocket 알림, 작업 배포 + +**요청/응답 교환** + +```mermaid +sequenceDiagram + participant A as 메시지 생성자 + participant B as 메시지 소비자 + A->>B: (1) 요청 + B->>A: (2) 응답 +``` + +- 한 방향의 메시지가 항상 반대 방향의 메시지와 쌍을 이룸 +- 예: 웹 서비스 호출, 데이터베이스 쿼리 + +**멀티노드 요청/응답** + +```mermaid +sequenceDiagram + participant A as 메시지 생성자 + participant B as 중간 노드 + participant C as 최종 노드 + A->>B: (1) 요청 + B->>C: (2) 요청 + C->>B: (3) 응답 + B->>A: (4) 응답 +``` + +### 1-2. 메시지 유형 + +| 유형 | 목적 | 예시 | +|------|------|------| +| **명령(Command)** | 수신자에서 작업 실행 트리거 | RPC, RESTful HTTP 호출 | +| **이벤트(Event)** | 무언가 발생했음을 알림 | WebSocket 알림, 상태 변경 | +| **문서(Document)** | 데이터 전송 | DB 쿼리 결과, 응답 데이터 | + +**명령 메시지** +- 직렬화된 명령 객체 +- 작업 이름과 인자 목록 포함 +- RESTful HTTP: GET(조회), POST(생성), PUT/PATCH(수정), DELETE(삭제) + +**이벤트 메시지** +- 이벤트 유형과 컨텍스트 포함 +- 시스템의 모든 노드를 동기화 + +**문서 메시지** +- 수행할 작업 정보 없음 +- 특정 사건과의 연관성 없음 + +### 1-3. 비동기 메시징, 큐 및 스트림 + +**동기 vs 비동기 통신** + +| 구분 | 동기식 | 비동기식 | +|------|--------|----------| +| 비유 | 전화 통화 | SMS | +| 연결 | 실시간으로 연결 필요 | 수신자가 연결 안되어도 됨 | +| 병렬성 | 제한적 | 더 나은 병렬 처리 | + +**메시지 큐** + +```mermaid +flowchart LR + P[메시지 생성자] --> Q[["📨📨 메시지 큐"]] + Q --> C[메시지 소비자] +``` + +- 메시지를 저장 후 전달 +- 소비자가 오프라인이어도 메시지 보존 +- 소비자가 온라인 되면 발송 + +**데이터 스트림** + +```mermaid +flowchart TB + P[생산자] --> S[["📨₁ 📨₂ 📨₃ 📨₄ 스트림"]] + S --> CA[소비자 A] + S --> CB[소비자 B] +``` + +| 구분 | 큐 | 스트림 | +|------|-----|--------| +| 메시지 제거 | 처리 시 제거 | 제거 안 됨 | +| 접근 방식 | 한 번에 하나 | 언제든지 질의 가능 | +| 소비자 공유 | 한 소비자만 | 여러 소비자 공유 가능 | + +### 1-4. 피어 투 피어 vs 브로커 기반 + +**피어 투 피어 아키텍처** + +```mermaid +flowchart TB + subgraph P2P["피어 투 피어"] + N1[노드 1] <--> N2[노드 2] + N1 <--> N3[노드 3] + N1 <--> N4[노드 4] + N2 <--> N3 + N2 <--> N4 + N3 <--> N4 + end +``` + +**브로커 기반 아키텍처** + +```mermaid +flowchart TB + subgraph Broker["브로커 기반"] + NA[노드 A] --> B[메시지 브로커] + NB[노드 B] --> B + NC[노드 C] --> B + B --> ND[노드 D] + B --> NE[노드 E] + end +``` + +**브로커의 장점** +- 발신자와 수신자 분리 +- 서로 다른 프로토콜 간 연결 (AMQP, MQTT, STOMP) +- 영구 대기열, 라우팅, 메시지 변환, 모니터링 + +**피어 투 피어의 장점** +- 단일 장애 지점 제거 +- 확장 시 브로커 확장 불필요 +- 통신 대기 시간 감소 +- 특정 기술/프로토콜에 종속 안됨 + +--- + +## 2. 게시/구독 패턴 (Publish/Subscribe) + +### 2-1. Pub/Sub란? + +**분산된 관찰자(Observer) 패턴**입니다. 구독자는 특정 메시지 수신을 등록하고, 게시자는 메시지를 생성하여 모든 관련 구독자에게 배포합니다. + +**피어 투 피어 Pub/Sub** + +```mermaid +flowchart LR + P[발행자] --> S1[구독자 1] + P --> S2[구독자 2] + P --> S3[구독자 3] +``` + +**브로커 기반 Pub/Sub** + +```mermaid +flowchart LR + P[발행자] --> B[브로커/중개자] + B --> S1[구독자 1] + B --> S2[구독자 2] + B --> S3[구독자 3] +``` + +**핵심 특징**: +- 게시자는 수신자가 누구인지 미리 알지 못함 +- 구독자가 메시지 수신 등록 +- 양면이 느슨하게 결합 + +### 2-2. 구현 기술 비교 + +| 기술 | 유형 | 특징 | +|------|------|------| +| **Redis Pub/Sub** | 브로커 | 간단, 메시지 영속성 없음 | +| **ZeroMQ** | P2P | 직접 연결, 고성능, 네트워크 자동 복구 | +| **AMQP (RabbitMQ)** | 브로커 | 익스체인지/대기열, 고급 라우팅 | +| **Redis Streams** | 브로커 | 영속성, 히스토리, 소비자 그룹 | + +### 2-3. Redis Pub/Sub 개념 + +```javascript +// 발행자 (Publisher) +import Redis from 'ioredis' +const pub = new Redis() +pub.publish('chat', JSON.stringify({ message: 'Hello!' })) + +// 구독자 (Subscriber) +const sub = new Redis() +sub.subscribe('chat') +sub.on('message', (channel, message) => { + console.log(`Received: ${message}`) +}) +``` + +**단점**: 메시지 영속성 없음 - 구독자가 오프라인이면 메시지 손실 + +### 2-4. ZeroMQ P2P Pub/Sub 개념 + +```javascript +// 발행자 (PUB 소켓) +import zeromq from 'zeromq' +const socket = new zeromq.Publisher() +await socket.bind('tcp://127.0.0.1:5000') +await socket.send(['chat', JSON.stringify({ message: 'Hello!' })]) + +// 구독자 (SUB 소켓) +const socket = new zeromq.Subscriber() +socket.connect('tcp://127.0.0.1:5000') +socket.subscribe('chat') +for await (const [topic, msg] of socket) { + console.log(`Received: ${msg}`) +} +``` + +**특징**: +- 브로커 없이 직접 연결 +- 연결 끊어져도 자동 재연결 +- 고성능, 낮은 대기 시간 + +### 2-5. AMQP (RabbitMQ) 개념 + +```mermaid +flowchart LR + subgraph Broker["브로커"] + E[익스체인지] --> Q1[대기열 1] + E --> Q2[대기열 2] + end + P[발행자] --> E + Q1 --> C1[소비자 1] + Q2 --> C2[소비자 2] +``` + +**익스체인지 유형**: +- **direct**: 라우팅 키가 정확히 일치하는 대기열로 +- **topic**: 패턴 매칭 (예: `chat.*`) +- **fanout**: 바인딩된 모든 대기열로 브로드캐스트 + +### 2-6. Redis Streams 개념 + +```javascript +// 메시지 추가 +await redis.xadd('chat_stream', '*', 'message', 'Hello!') + +// 히스토리 조회 +const logs = await redis.xrange('chat_stream', '-', '+') + +// 새 메시지 대기 (블로킹) +const [[, records]] = await redis.xread( + 'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId) +``` + +**장점**: +- 메시지 영속성 +- 히스토리 질의 가능 +- 소비자 그룹 지원 + +--- + +## 3. 작업 분배 패턴 (Task Distribution) + +### 3-1. 작업 분배란? + +네트워크의 모든 곳에 위치한 원격 작업자를 사용하여 작업을 분산합니다. + +```mermaid +flowchart LR + P[생산자] -->|작업 1| C1[소비자 1] + P -->|작업 2| C2[소비자 2] + P -->|작업 3| C3[소비자 3] +``` + +**패턴 별칭**: +- 경쟁 소비자 (Competing Consumers) +- 팬아웃 배포 (Fanout Distribution) +- 벤틸레이터 (Ventilator) + +### 3-2. 파이프라인 (Fanout/Fanin) + +```mermaid +flowchart LR + subgraph Fanout["분배 (팬아웃)"] + V[벤틸레이터] + end + + subgraph Workers["워커"] + W1[워커 1] + W2[워커 2] + W3[워커 3] + end + + subgraph Fanin["수집 (팬인)"] + S[싱크] + end + + V --> W1 + V --> W2 + V --> W3 + W1 --> S + W2 --> S + W3 --> S +``` + +**장점**: +- 동기식 요청/응답 오버헤드 없음 +- 낮은 지연시간, 높은 처리량 + +### 3-3. ZeroMQ PUSH/PULL 소켓 + +```mermaid +flowchart TB + subgraph Producer["생산자/벤틸레이터"] + PUSH["PUSH 소켓
(바인딩)"] + end + + subgraph Workers["작업자"] + PULL1["PULL 소켓
(연결)"] + PULL2["PULL 소켓
(연결)"] + PULL3["PULL 소켓
(연결)"] + end + + subgraph Sink["싱크/수집기"] + SINK_PULL["PULL 소켓
(바인딩)"] + end + + PUSH --> PULL1 + PUSH --> PULL2 + PUSH --> PULL3 + PULL1 --> SINK_PULL + PULL2 --> SINK_PULL + PULL3 --> SINK_PULL +``` + +**특징**: +- PUSH: 연결된 PULL 소켓에 메시지를 라운드 로빈으로 분배 +- PULL: 여러 PUSH 소켓에서 메시지 수신 + +### 3-4. AMQP 경쟁 소비자 패턴 + +```mermaid +flowchart LR + subgraph Broker["브로커"] + Q[["작업 대기열"]] + end + + P[생산자] --> Q + Q -->|라운드 로빈| C1[소비자 1] + Q -->|라운드 로빈| C2[소비자 2] +``` + +여러 소비자가 같은 대기열에서 메시지를 가져가면, **라운드 로빈**으로 분배됩니다. + +### 3-5. Redis Streams 소비자 그룹 + +```javascript +// 소비자 그룹 생성 +await redis.xgroup('CREATE', 'tasks_stream', 'workers_group', '$', 'MKSTREAM') + +// 새 작업 읽기 (그룹 내에서 분배됨) +const result = await redis.xreadgroup( + 'GROUP', 'workers_group', consumerName, + 'BLOCK', '0', + 'COUNT', '1', + 'STREAMS', 'tasks_stream', '>' +) + +// 처리 완료 확인 +await redis.xack('tasks_stream', 'workers_group', recordId) +``` + +**특징**: +- 소비자 그룹 내에서 메시지가 분배됨 +- 각 메시지는 하나의 소비자만 처리 +- ACK로 처리 완료 확인 + +--- + +## 4. 요청/응답 패턴 (Request/Reply) + +단방향 비동기 채널 위에 요청/응답 통신 추상화를 구현합니다. + +### 4-1. 상관 식별자 (Correlation Identifier) + +비동기 채널에서 요청과 응답을 매칭시키는 기본 패턴입니다. + +```mermaid +sequenceDiagram + participant R as Requestor + participant P as Replier + + R->>P: 요청 (ID: 1) + R->>P: 요청 (ID: 2) + P-->>R: 응답 (ID: 2) + R->>P: 요청 (ID: 3) + P-->>R: 응답 (ID: 1) + P-->>R: 응답 (ID: 3) + + Note over R,P: 응답 순서가 요청 순서와
다를 수 있음 +``` + +**동작 방식**: +1. 각 요청에 고유 ID 부여 +2. 응답에 해당 ID 첨부 +3. 요청자가 ID로 응답 매칭 + +### 4-2. 요청 추상화 구현 (child_process 예제) + +**createRequestChannel.js** +```javascript +import { nanoid } from 'nanoid' + +export function createRequestChannel(channel) { + const correlationMap = new Map() + + function sendRequest(data) { + console.log('Sending request', data) + return new Promise((resolve, reject) => { + const correlationId = nanoid() + + // 타임아웃 설정 + const replyTimeout = setTimeout(() => { + correlationMap.delete(correlationId) + reject(new Error('Request timeout')) + }, 10000) + + // 응답 핸들러 등록 + correlationMap.set(correlationId, (replyData) => { + correlationMap.delete(correlationId) + clearTimeout(replyTimeout) + resolve(replyData) + }) + + // 요청 전송 + channel.send({ + type: 'request', + data, + id: correlationId + }) + }) + } + + // 응답 수신 리스너 + channel.on('message', message => { + const callback = correlationMap.get(message.inReplyTo) + if (callback) { + callback(message.data) + } + }) + + return sendRequest +} +``` + +### 4-3. 응답 추상화 구현 + +**createReplyChannel.js** +```javascript +export function createReplyChannel(channel) { + return function registerHandler(handler) { + channel.on('message', async message => { + if (message.type !== 'request') { + return + } + + const replyData = await handler(message.data) + channel.send({ + type: 'response', + data: replyData, + inReplyTo: message.id + }) + }) + } +} +``` + +### 4-4. 반환 주소 (Return Address) + +여러 요청자가 있는 경우, 응답자가 **어디로** 응답을 보내야 하는지도 알아야 합니다. + +```mermaid +flowchart TB + subgraph Broker["브로커"] + RQ[["요청 대기열"]] + RA[["요청자 A 응답 대기열"]] + RB[["요청자 B 응답 대기열"]] + end + + A[요청자 A] -->|"요청 + replyTo:A"| RQ + B[요청자 B] -->|"요청 + replyTo:B"| RQ + RQ --> RP[응답자] + RP -->|응답| RA + RP -->|응답| RB + RA --> A + RB --> B +``` + +**AMQP에서 구현**: +- 각 요청자는 자신만의 임시 응답 대기열 생성 +- 요청에 `replyTo` 속성으로 대기열 이름 첨부 +- 응답자는 `replyTo` 대기열로 응답 전송 + +```javascript +// 요청 전송 시 +this.channel.sendToQueue(queue, + Buffer.from(JSON.stringify(message)), + { + correlationId: id, + replyTo: this.replyQueue // 반환 주소 + } +) + +// 응답 전송 시 +this.channel.sendToQueue( + msg.properties.replyTo, // 반환 주소로 전송 + Buffer.from(JSON.stringify(replyData)), + { correlationId: msg.properties.correlationId } +) +``` + +--- + +## 요약 + +### 메시지 교환 패턴 비교 + +| 패턴 | 방향 | 용도 | 특징 | +|------|------|------|------| +| **Pub/Sub** | 1:N | 이벤트 브로드캐스트 | 느슨한 결합, 구독 기반 | +| **작업 분배** | 1:N (분배) | 병렬 처리 | 로드 밸런싱, 경쟁 소비자 | +| **파이프라인** | 단방향 체인 | 복잡한 처리 | 팬아웃/팬인, 싱크 | +| **요청/응답** | 양방향 | 동기식 추상화 | 상관 ID, 반환 주소 | + +### 아키텍처 비교 + +| 구분 | 피어 투 피어 | 브로커 기반 | +|------|--------------|-------------| +| **장점** | 낮은 지연, 단일 장애점 없음 | 분리, 영속성, 라우팅 | +| **단점** | 복잡한 구현 | 추가 인프라 필요 | +| **예시** | ZeroMQ | Redis, RabbitMQ | + +### 기술별 특징 + +| 기술 | 유형 | 영속성 | 패턴 | +|------|------|--------|------| +| **Redis Pub/Sub** | 브로커 | ❌ | Pub/Sub | +| **Redis Streams** | 브로커 | ✅ | Pub/Sub, 소비자 그룹 | +| **ZeroMQ** | P2P | ❌ | Pub/Sub, Push/Pull | +| **AMQP** | 브로커 | ✅ | 모든 패턴 | + +### 패턴 선택 가이드 + +- **여러 구독자에게 이벤트 전파** → Pub/Sub +- **작업을 여러 워커에게 분산** → 경쟁 소비자 +- **복잡한 다단계 처리** → 파이프라인 +- **비동기 채널에서 동기식 통신** → 요청/응답 + 상관 ID +- **여러 요청자, 하나의 응답자** → 반환 주소 + +--- + +### 관련 링크 +- [Redis Streams](https://redis.io/docs/data-types/streams/) +- [ZeroMQ Guide](https://zguide.zeromq.org/) +- [RabbitMQ Tutorials](https://www.rabbitmq.com/tutorials) +- [Apache Kafka](https://kafka.apache.org/) diff --git a/chapter13/kilhyeonjun/code/01-request-reply-process.js b/chapter13/kilhyeonjun/code/01-request-reply-process.js new file mode 100644 index 0000000..e409e67 --- /dev/null +++ b/chapter13/kilhyeonjun/code/01-request-reply-process.js @@ -0,0 +1,164 @@ +/** + * Chapter 13: 요청/응답 패턴 - child_process 예제 + * + * child_process.fork()를 사용하여 부모-자식 프로세스 간 + * 요청/응답 통신을 구현합니다. + * + * 이 예제는 단일 파일로 작성되어 있어 requestor와 replier 역할을 + * 모두 시뮬레이션합니다. + */ + +import { fork } from 'child_process' +import { fileURLToPath } from 'url' + +// nanoid 대신 간단한 ID 생성 함수 +function generateId() { + return Math.random().toString(36).substring(2, 15) +} + +/** + * 요청 채널 생성 함수 + * 주어진 채널(process 또는 child)을 감싸서 요청/응답 추상화를 제공합니다. + */ +function createRequestChannel(channel) { + const correlationMap = new Map() + + function sendRequest(data) { + console.log('[Requestor] Sending request:', data) + + return new Promise((resolve, reject) => { + const correlationId = generateId() + + // 10초 타임아웃 + const replyTimeout = setTimeout(() => { + correlationMap.delete(correlationId) + reject(new Error('Request timeout')) + }, 10000) + + // 응답 핸들러 등록 + correlationMap.set(correlationId, (replyData) => { + correlationMap.delete(correlationId) + clearTimeout(replyTimeout) + resolve(replyData) + }) + + // 요청 전송 + channel.send({ + type: 'request', + data, + id: correlationId + }) + }) + } + + // 응답 수신 리스너 + channel.on('message', (message) => { + if (message.type === 'response') { + const callback = correlationMap.get(message.inReplyTo) + if (callback) { + callback(message.data) + } + } + }) + + return sendRequest +} + +/** + * 응답 채널 생성 함수 + * 요청을 수신하고 핸들러를 실행한 후 응답을 전송합니다. + */ +function createReplyChannel(channel) { + return function registerHandler(handler) { + channel.on('message', async (message) => { + if (message.type !== 'request') { + return + } + + console.log('[Replier] Received request:', message.data) + + // 핸들러 실행 + const replyData = await handler(message.data) + + console.log('[Replier] Sending response:', replyData) + + // 응답 전송 + channel.send({ + type: 'response', + data: replyData, + inReplyTo: message.id + }) + }) + } +} + +// 자식 프로세스로 실행될 때 (CHILD_PROCESS 환경변수로 판단) +if (process.env.CHILD_PROCESS === 'true') { + console.log('[Replier] Child process started') + + const registerReplyHandler = createReplyChannel(process) + + // 요청 핸들러: 두 숫자의 합계 계산 (지연 시뮬레이션) + registerReplyHandler((req) => { + return new Promise((resolve) => { + setTimeout(() => { + resolve({ sum: req.a + req.b }) + }, req.delay || 100) + }) + }) + + // 준비 완료 알림 + process.send({ type: 'ready' }) +} +// 부모 프로세스로 실행될 때 +else { + async function main() { + const __filename = fileURLToPath(import.meta.url) + + console.log('[Requestor] Starting child process...') + + // 자식 프로세스 생성 (같은 파일을 CHILD_PROCESS=true로 실행) + const child = fork(__filename, [], { + env: { ...process.env, CHILD_PROCESS: 'true' } + }) + + const request = createRequestChannel(child) + + // 자식 프로세스 준비 대기 + await new Promise((resolve) => { + child.once('message', (msg) => { + if (msg.type === 'ready') { + console.log('[Requestor] Child process is ready') + resolve() + } + }) + }) + + try { + // 두 개의 요청을 병렬로 전송 + // 첫 번째 요청은 500ms 지연, 두 번째는 100ms 지연 + // 응답은 보낸 순서와 다르게 도착할 수 있음 + const p1 = request({ a: 1, b: 2, delay: 500 }) + .then((res) => { + console.log(`[Requestor] Reply: 1 + 2 = ${res.sum}`) + return res + }) + + const p2 = request({ a: 6, b: 1, delay: 100 }) + .then((res) => { + console.log(`[Requestor] Reply: 6 + 1 = ${res.sum}`) + return res + }) + + await Promise.all([p1, p2]) + + console.log('[Requestor] All requests completed') + } finally { + // 자식 프로세스 종료 + child.disconnect() + console.log('[Requestor] Child process disconnected') + } + } + + main().catch((err) => console.error(err)) +} diff --git a/chapter13/kilhyeonjun/code/02-correlation-id-pattern.js b/chapter13/kilhyeonjun/code/02-correlation-id-pattern.js new file mode 100644 index 0000000..a1790e4 --- /dev/null +++ b/chapter13/kilhyeonjun/code/02-correlation-id-pattern.js @@ -0,0 +1,175 @@ +/** + * Chapter 13: 상관 식별자(Correlation ID) 패턴 + * + * 비동기 채널에서 요청과 응답을 매칭시키는 핵심 패턴입니다. + * EventEmitter를 사용하여 채널을 시뮬레이션합니다. + */ + +import { EventEmitter } from 'events' + +// 간단한 ID 생성 함수 +function generateId() { + return Math.random().toString(36).substring(2, 15) +} + +/** + * 상관 식별자 패턴을 구현한 요청/응답 추상화 + * + * 이 클래스는 비동기 채널 위에 동기식 요청/응답 통신을 추상화합니다. + */ +class CorrelationChannel { + constructor() { + // 상관관계 맵: correlationId -> { resolve, reject, timeout } + this.correlationMap = new Map() + + // 시뮬레이션용 채널 (실제로는 WebSocket, child_process 등) + this.channel = new EventEmitter() + + // 응답 수신 리스너 설정 + this.channel.on('response', (message) => { + this._handleResponse(message) + }) + } + + /** + * 요청 전송 + * @param {any} data - 요청 데이터 + * @param {number} timeout - 타임아웃 (ms) + * @returns {Promise} 응답 데이터 + */ + sendRequest(data, timeout = 10000) { + return new Promise((resolve, reject) => { + const correlationId = generateId() + + console.log(`[Request] ID: ${correlationId}, Data:`, data) + + // 타임아웃 설정 + const timeoutId = setTimeout(() => { + this.correlationMap.delete(correlationId) + reject(new Error(`Request timeout: ${correlationId}`)) + }, timeout) + + // 상관관계 맵에 저장 + this.correlationMap.set(correlationId, { + resolve, + reject, + timeout: timeoutId + }) + + // 요청 이벤트 발생 + this.channel.emit('request', { + id: correlationId, + data + }) + }) + } + + /** + * 응답 핸들러 등록 + * @param {Function} handler - 요청을 처리하고 응답 데이터를 반환하는 함수 + */ + registerHandler(handler) { + this.channel.on('request', async (message) => { + console.log(`[Handler] Processing request ID: ${message.id}`) + + try { + // 핸들러 실행 + const responseData = await handler(message.data) + + // 응답 전송 + this.channel.emit('response', { + inReplyTo: message.id, + data: responseData + }) + } catch (error) { + // 에러 응답 전송 + this.channel.emit('response', { + inReplyTo: message.id, + error: error.message + }) + } + }) + } + + /** + * 응답 처리 + * @private + */ + _handleResponse(message) { + const pending = this.correlationMap.get(message.inReplyTo) + + if (!pending) { + console.log(`[Response] Unknown correlation ID: ${message.inReplyTo}`) + return + } + + // 정리 + this.correlationMap.delete(message.inReplyTo) + clearTimeout(pending.timeout) + + console.log(`[Response] ID: ${message.inReplyTo}, Data:`, message.data) + + // 에러 또는 성공 처리 + if (message.error) { + pending.reject(new Error(message.error)) + } else { + pending.resolve(message.data) + } + } +} + +// 예제 실행 +async function main() { + const channel = new CorrelationChannel() + + // 핸들러 등록: 두 숫자의 합계 계산 (비동기 지연 시뮬레이션) + channel.registerHandler(async (data) => { + // 요청별 다른 지연 시간 시뮬레이션 + await new Promise((resolve) => setTimeout(resolve, data.delay || 100)) + return { sum: data.a + data.b } + }) + + console.log('=== 상관 식별자 패턴 데모 ===\n') + + // 여러 요청을 병렬로 전송 + // 응답 순서가 요청 순서와 다를 수 있음을 보여줍니다 + const requests = [ + channel.sendRequest({ a: 1, b: 2, delay: 300 }), // 느린 요청 + channel.sendRequest({ a: 3, b: 4, delay: 100 }), // 빠른 요청 + channel.sendRequest({ a: 5, b: 6, delay: 200 }) // 중간 속도 + ] + + console.log('\n=== 요청 전송 완료, 응답 대기 중... ===\n') + + const results = await Promise.all(requests) + + console.log('\n=== 모든 응답 수신 완료 ===') + console.log('Results:', results) + + // 응답 순서 확인 + // 빠른 요청(3+4)이 먼저 응답하고, 느린 요청(1+2)이 마지막에 응답 + // 하지만 Promise.all의 결과는 요청 순서를 유지 +} + +main().catch(console.error) + +/* +예상 출력: +=== 상관 식별자 패턴 데모 === + +[Request] ID: abc123, Data: { a: 1, b: 2, delay: 300 } +[Handler] Processing request ID: abc123 +[Request] ID: def456, Data: { a: 3, b: 4, delay: 100 } +[Handler] Processing request ID: def456 +[Request] ID: ghi789, Data: { a: 5, b: 6, delay: 200 } +[Handler] Processing request ID: ghi789 + +=== 요청 전송 완료, 응답 대기 중... === + +[Response] ID: def456, Data: { sum: 7 } <- 빠른 요청이 먼저 응답 +[Response] ID: ghi789, Data: { sum: 11 } <- 중간 속도 +[Response] ID: abc123, Data: { sum: 3 } <- 느린 요청이 마지막 + +=== 모든 응답 수신 완료 === +Results: [{ sum: 3 }, { sum: 7 }, { sum: 11 }] +*/ diff --git a/chapter13/kilhyeonjun/code/README.md b/chapter13/kilhyeonjun/code/README.md new file mode 100644 index 0000000..cdf8c82 --- /dev/null +++ b/chapter13/kilhyeonjun/code/README.md @@ -0,0 +1,65 @@ +# Chapter 13 코드 예제 + +## 예제 목록 + +| 파일명 | 설명 | +|--------|------| +| `01-request-reply-process.js` | child_process를 사용한 요청/응답 패턴 구현 | +| `02-correlation-id-pattern.js` | 상관 식별자(Correlation ID) 패턴 추상화 | + +## 연습문제 + +| 파일명 | 연습문제 | +|--------|----------| +| `exercises/13.3-stop-workers.js` | 작업 중지 로직 (핵심 구조) | +| `exercises/13.5-data-collector.js` | 데이터 수집기 추상화 (핵심 구조) | + +## 실행 방법 + +### 01-request-reply-process.js + +요청자(requestor)와 응답자(replier)가 child_process를 통해 통신하는 예제입니다. + +```bash +node 01-request-reply-process.js +``` + +### 02-correlation-id-pattern.js + +상관 식별자 패턴의 핵심 추상화를 보여줍니다. 실제 채널 대신 EventEmitter를 사용하여 동작을 시뮬레이션합니다. + +```bash +node 02-correlation-id-pattern.js +``` + +### exercises/13.3-stop-workers.js + +해시썸 크래커에서 일치 항목 발견 시 모든 작업자를 중지하는 로직입니다. + +```bash +node exercises/13.3-stop-workers.js +``` + +**핵심 개념:** +- 브로드캐스트 메시지로 중지 신호 전파 +- 각 작업자의 상태 플래그 관리 +- EventEmitter 기반 메시지 버스 + +### exercises/13.5-data-collector.js + +모든 노드에 요청을 보내고 응답을 집계하는 추상화입니다. + +```bash +node exercises/13.5-data-collector.js +``` + +**핵심 개념:** +- 게시/구독으로 요청 브로드캐스트 +- 상관 식별자로 응답 매칭 +- 타임아웃 기반 집계 완료 + +## 참고사항 + +- 이 예제들은 **외부 의존성 없이** 순수 Node.js로 작성되었습니다 +- Redis, ZeroMQ, RabbitMQ 등의 외부 서비스가 필요한 예제는 README.md에서 개념만 설명합니다 +- 연습문제는 핵심 로직과 구조만 포함합니다 diff --git a/chapter13/kilhyeonjun/code/exercises/13.3-stop-workers.js b/chapter13/kilhyeonjun/code/exercises/13.3-stop-workers.js new file mode 100644 index 0000000..616b065 --- /dev/null +++ b/chapter13/kilhyeonjun/code/exercises/13.3-stop-workers.js @@ -0,0 +1,228 @@ +/** + * 연습문제 13.3: 작업 중지 (Stop Workers) + * + * 일치 항목이 발견되면 모든 작업자 노드에서 계산을 중지하는 로직을 구현합니다. + * 이 예제는 EventEmitter를 사용하여 분산 시스템의 동작을 시뮬레이션합니다. + * + * 핵심 개념: + * - 브로드캐스트 메시지를 통한 작업 중지 신호 전파 + * - 각 작업자가 중지 신호를 수신하고 처리하는 로직 + * - 작업 완료 또는 중지 상태 관리 + */ + +import { EventEmitter } from 'events' + +/** + * 작업자 노드 시뮬레이션 + * 해시썸 크래커의 작업자를 간단화하여 구현 + */ +class Worker extends EventEmitter { + constructor(id, messageBus) { + super() + this.id = id + this.messageBus = messageBus + this.isRunning = false + this.isStopped = false // 전역 중지 상태 + this.currentTask = null + + // 중지 신호 수신 리스너 + this.messageBus.on('stop', () => { + this._handleStop() + }) + + // 새 작업 수신 리스너 + this.messageBus.on(`task:${this.id}`, (task) => { + // 이미 중지 상태면 작업 거부 + if (this.isStopped) { + console.log(`[Worker ${this.id}] 중지 상태 - 작업 거부`) + return + } + this._processTask(task) + }) + } + + /** + * 중지 신호 처리 + * @private + */ + _handleStop() { + this.isStopped = true // 전역 중지 상태 설정 + if (this.isRunning) { + console.log(`[Worker ${this.id}] 중지 신호 수신, 작업 중단`) + this.isRunning = false + this.currentTask = null + this.emit('stopped') + } + } + + /** + * 작업 처리 시뮬레이션 + * @param {Object} task - 처리할 작업 + * @private + */ + async _processTask(task) { + this.isRunning = true + this.currentTask = task + console.log(`[Worker ${this.id}] 작업 시작: ${JSON.stringify(task)}`) + + // 작업 처리 시뮬레이션 (100ms 단위로 체크) + const startRange = task.startRange + const endRange = task.endRange + const targetHash = task.targetHash + + for (let i = startRange; i <= endRange && this.isRunning; i++) { + // 매 100번째 반복마다 이벤트 루프에 제어권 양보 + if (i % 100 === 0) { + await new Promise((resolve) => setImmediate(resolve)) + } + + // 일치 여부 확인 (시뮬레이션) + const hash = this._computeHash(i) + if (hash === targetHash) { + console.log(`[Worker ${this.id}] 일치 항목 발견: ${i}`) + + // 결과 보고 및 전체 중지 신호 발송 + this.messageBus.emit('match-found', { + workerId: this.id, + value: i, + hash + }) + + // 전체 작업자에게 중지 신호 브로드캐스트 + this.messageBus.emit('stop') + return + } + } + + if (this.isRunning) { + console.log(`[Worker ${this.id}] 작업 완료, 일치 항목 없음`) + this.isRunning = false + this.emit('completed') + } + } + + /** + * 해시 계산 시뮬레이션 + * @param {number} value - 해시할 값 + * @returns {string} 해시 값 + * @private + */ + _computeHash(value) { + // 실제로는 crypto.createHash 사용 + // 여기서는 간단한 시뮬레이션 + return `hash_${value % 1000}` + } +} + +/** + * 코디네이터 노드 + * 작업을 분배하고 결과를 수집 + */ +class Coordinator { + constructor(messageBus, workerCount) { + this.messageBus = messageBus + this.workerCount = workerCount + this.result = null + this.isComplete = false + + // 일치 항목 발견 리스너 + this.messageBus.on('match-found', (result) => { + this._handleMatchFound(result) + }) + } + + /** + * 일치 항목 발견 처리 + * @param {Object} result - 발견된 결과 + * @private + */ + _handleMatchFound(result) { + if (!this.isComplete) { + this.isComplete = true + this.result = result + console.log(`[Coordinator] 일치 항목 수신: Worker ${result.workerId}에서 ${result.value} 발견`) + } + } + + /** + * 작업 분배 + * @param {string} targetHash - 찾을 해시 + * @param {number} totalRange - 전체 검색 범위 + */ + distributeWork(targetHash, totalRange) { + const rangePerWorker = Math.ceil(totalRange / this.workerCount) + + for (let i = 0; i < this.workerCount; i++) { + const startRange = i * rangePerWorker + const endRange = Math.min((i + 1) * rangePerWorker - 1, totalRange - 1) + + console.log(`[Coordinator] Worker ${i}에 작업 할당: ${startRange} ~ ${endRange}`) + + this.messageBus.emit(`task:${i}`, { + targetHash, + startRange, + endRange + }) + } + } +} + +// 예제 실행 +async function main() { + console.log('=== 연습문제 13.3: 작업 중지 패턴 ===\n') + + // 메시지 버스 (실제로는 Redis Pub/Sub, ZeroMQ 등) + const messageBus = new EventEmitter() + + // 작업자 수 + const WORKER_COUNT = 3 + + // 작업자 생성 + const workers = [] + for (let i = 0; i < WORKER_COUNT; i++) { + workers.push(new Worker(i, messageBus)) + } + + // 코디네이터 생성 + const coordinator = new Coordinator(messageBus, WORKER_COUNT) + + // 검색할 대상 해시 (hash_350 = 350 % 1000) + const targetHash = 'hash_350' + const totalRange = 1000 + + console.log(`대상 해시: ${targetHash}`) + console.log(`검색 범위: 0 ~ ${totalRange - 1}`) + console.log(`작업자 수: ${WORKER_COUNT}\n`) + + // 결과 대기 Promise 설정 (작업 분배 전에 리스너 등록) + const resultPromise = new Promise((resolve) => { + messageBus.once('stop', () => { + // 모든 작업자가 중지될 시간을 줌 + setTimeout(() => { + console.log('\n=== 최종 결과 ===') + if (coordinator.result) { + console.log(`발견된 값: ${coordinator.result.value}`) + console.log(`발견한 작업자: Worker ${coordinator.result.workerId}`) + } + resolve() + }, 100) + }) + }) + + // 작업 분배 시작 + coordinator.distributeWork(targetHash, totalRange) + + // 결과 대기 + await resultPromise +} + +main().catch(console.error) + +/* +핵심 포인트: +1. 메시지 버스를 통한 브로드캐스트 'stop' 이벤트 +2. 각 작업자는 isRunning 플래그로 상태 관리 +3. 일치 항목 발견 시 즉시 전체 중지 신호 발송 +4. 비동기 루프에서 주기적으로 이벤트 루프에 제어권 양보 (setImmediate) +5. 코디네이터가 결과를 수집하고 최종 상태 관리 +*/ diff --git a/chapter13/kilhyeonjun/code/exercises/13.5-data-collector.js b/chapter13/kilhyeonjun/code/exercises/13.5-data-collector.js new file mode 100644 index 0000000..a472eba --- /dev/null +++ b/chapter13/kilhyeonjun/code/exercises/13.5-data-collector.js @@ -0,0 +1,304 @@ +/** + * 연습문제 13.5: 데이터 수집기 (Data Collector) + * + * 시스템에 연결된 모든 노드에 요청을 보내고 + * 모든 응답의 집계를 반환하는 추상화를 구현합니다. + * + * 핵심 개념: + * - 게시/구독을 사용하여 모든 노드에 요청 브로드캐스트 + * - 단방향 채널을 통해 응답 수집 + * - 타임아웃 기반 집계 완료 또는 모든 노드 응답 대기 + */ + +import { EventEmitter } from 'events' + +/** + * 데이터 수집기 + * 모든 노드에 요청을 보내고 응답을 집계 + */ +class DataCollector { + constructor(messageBus, options = {}) { + this.messageBus = messageBus + this.timeout = options.timeout || 5000 // 기본 타임아웃 5초 + this.requestChannel = options.requestChannel || 'collector:request' + this.responseChannel = options.responseChannel || 'collector:response' + + // 상관 식별자 -> 수집 상태 맵 + this.pendingCollections = new Map() + + // 응답 리스너 설정 + this.messageBus.on(this.responseChannel, (response) => { + this._handleResponse(response) + }) + } + + /** + * ID 생성 + * @private + */ + _generateId() { + return Math.random().toString(36).substring(2, 15) + } + + /** + * 모든 노드에 요청을 보내고 응답 집계 + * @param {any} data - 요청 데이터 + * @param {Object} options - 옵션 (expectedNodes, timeout) + * @returns {Promise} 집계된 응답 배열 + */ + collect(data, options = {}) { + return new Promise((resolve, reject) => { + const correlationId = this._generateId() + const timeout = options.timeout || this.timeout + const expectedNodes = options.expectedNodes || null // null이면 타임아웃까지 대기 + + console.log(`[Collector] 요청 발송 (ID: ${correlationId})`) + + // 수집 상태 초기화 + const collectionState = { + responses: [], + expectedNodes, + resolve, + reject, + timeoutId: null + } + + // 타임아웃 설정 + collectionState.timeoutId = setTimeout(() => { + this._finalizeCollection(correlationId, false) + }, timeout) + + this.pendingCollections.set(correlationId, collectionState) + + // 요청 브로드캐스트 + this.messageBus.emit(this.requestChannel, { + correlationId, + data, + replyTo: this.responseChannel + }) + }) + } + + /** + * 응답 처리 + * @param {Object} response - 노드 응답 + * @private + */ + _handleResponse(response) { + const { correlationId, nodeId, data, error } = response + const state = this.pendingCollections.get(correlationId) + + if (!state) { + console.log(`[Collector] 알 수 없는 상관 ID: ${correlationId}`) + return + } + + console.log(`[Collector] 응답 수신 (Node: ${nodeId}, ID: ${correlationId})`) + + // 응답 저장 + state.responses.push({ + nodeId, + data, + error, + timestamp: Date.now() + }) + + // 모든 예상 노드로부터 응답을 받았는지 확인 + if (state.expectedNodes && state.responses.length >= state.expectedNodes) { + this._finalizeCollection(correlationId, true) + } + } + + /** + * 수집 완료 처리 + * @param {string} correlationId - 상관 ID + * @param {boolean} allReceived - 모든 응답 수신 여부 + * @private + */ + _finalizeCollection(correlationId, allReceived) { + const state = this.pendingCollections.get(correlationId) + + if (!state) return + + // 정리 + clearTimeout(state.timeoutId) + this.pendingCollections.delete(correlationId) + + if (allReceived) { + console.log(`[Collector] 수집 완료 (ID: ${correlationId}) - 모든 응답 수신`) + } else { + console.log(`[Collector] 수집 완료 (ID: ${correlationId}) - 타임아웃`) + } + + // 결과 반환 + state.resolve({ + correlationId, + allReceived, + responses: state.responses, + totalCount: state.responses.length + }) + } +} + +/** + * 데이터 노드 시뮬레이션 + * 요청을 받고 자신의 데이터를 응답 + */ +class DataNode { + constructor(nodeId, messageBus, options = {}) { + this.nodeId = nodeId + this.messageBus = messageBus + this.data = options.data || {} // 노드가 보유한 데이터 + this.responseDelay = options.responseDelay || 0 // 응답 지연 시뮬레이션 + + // 요청 리스너 설정 + this.messageBus.on('collector:request', (request) => { + this._handleRequest(request) + }) + } + + /** + * 요청 처리 + * @param {Object} request - 수신된 요청 + * @private + */ + async _handleRequest(request) { + const { correlationId, data, replyTo } = request + + console.log(`[Node ${this.nodeId}] 요청 수신 (ID: ${correlationId})`) + + // 응답 지연 시뮬레이션 + if (this.responseDelay > 0) { + await new Promise((resolve) => setTimeout(resolve, this.responseDelay)) + } + + try { + // 요청 데이터에 따라 응답 생성 + const responseData = this._processRequest(data) + + // 응답 전송 + this.messageBus.emit(replyTo, { + correlationId, + nodeId: this.nodeId, + data: responseData, + error: null + }) + } catch (error) { + // 에러 응답 전송 + this.messageBus.emit(replyTo, { + correlationId, + nodeId: this.nodeId, + data: null, + error: error.message + }) + } + } + + /** + * 요청 데이터 처리 + * @param {Object} requestData - 요청 데이터 + * @returns {any} 응답 데이터 + * @private + */ + _processRequest(requestData) { + // 예: 상태 정보 요청 + if (requestData.type === 'status') { + return { + nodeId: this.nodeId, + status: 'running', + uptime: process.uptime(), + data: this.data + } + } + + // 예: 특정 키 값 요청 + if (requestData.type === 'get' && requestData.key) { + return { + nodeId: this.nodeId, + key: requestData.key, + value: this.data[requestData.key] + } + } + + // 기본: 전체 데이터 반환 + return { + nodeId: this.nodeId, + data: this.data + } + } +} + +// 예제 실행 +async function main() { + console.log('=== 연습문제 13.5: 데이터 수집기 패턴 ===\n') + + // 메시지 버스 (실제로는 Redis Pub/Sub, ZeroMQ 등) + const messageBus = new EventEmitter() + + // 노드 생성 (각각 다른 데이터와 응답 지연) + const nodes = [ + new DataNode('node-1', messageBus, { + data: { cpu: 45, memory: 60 }, + responseDelay: 100 + }), + new DataNode('node-2', messageBus, { + data: { cpu: 80, memory: 75 }, + responseDelay: 200 + }), + new DataNode('node-3', messageBus, { + data: { cpu: 30, memory: 40 }, + responseDelay: 50 + }) + ] + + // 데이터 수집기 생성 + const collector = new DataCollector(messageBus, { + timeout: 3000 + }) + + console.log('노드 수:', nodes.length) + console.log('') + + // 예제 1: 모든 노드의 상태 수집 + console.log('--- 예제 1: 상태 정보 수집 ---\n') + + const statusResult = await collector.collect( + { type: 'status' }, + { expectedNodes: 3 } + ) + + console.log('\n수집 결과:') + console.log(`- 총 응답 수: ${statusResult.totalCount}`) + console.log(`- 전체 수신 완료: ${statusResult.allReceived}`) + console.log('- 응답 데이터:') + statusResult.responses.forEach((r) => { + console.log(` ${r.nodeId}: CPU ${r.data.data.cpu}%, Memory ${r.data.data.memory}%`) + }) + + console.log('\n--- 예제 2: 타임아웃으로 수집 완료 ---\n') + + // 예제 2: 짧은 타임아웃 (일부 응답만 수집) + const partialResult = await collector.collect( + { type: 'status' }, + { timeout: 150 } // 150ms 타임아웃 - node-2는 응답 못함 + ) + + console.log('\n수집 결과:') + console.log(`- 총 응답 수: ${partialResult.totalCount}`) + console.log(`- 전체 수신 완료: ${partialResult.allReceived}`) + + console.log('\n=== 완료 ===') +} + +main().catch(console.error) + +/* +핵심 포인트: +1. DataCollector는 게시/구독 패턴으로 모든 노드에 요청 브로드캐스트 +2. 상관 식별자(correlationId)로 요청-응답 매칭 +3. 두 가지 완료 조건: + - 예상 노드 수만큼 응답 수신 (expectedNodes) + - 타임아웃 발생 (timeout) +4. DataNode는 요청을 수신하고 replyTo 채널로 응답 +5. 집계 결과에는 모든 응답 + 메타데이터 포함 +*/