|
| 1 | +--- |
| 2 | +title: "Event‑Driven Transcription D‑App Integration" |
| 3 | +description: "How to integrate an external transcription service (e.g., DreamTrans) with PCAS via the Event Bus." |
| 4 | +tags: ["dapp", "transcription", "event-bus", "integration", "guide"] |
| 5 | +version: "0.1.0" |
| 6 | +--- |
| 7 | + |
| 8 | +# Event‑Driven Transcription D‑App Integration |
| 9 | + |
| 10 | +This guide describes how to integrate an external transcription service (e.g., DreamTrans) as a D‑App using PCAS's event‑driven model. The D‑App connects to the PCAS gRPC Event Bus, subscribes to request events, performs the transcription, and publishes response events back to PCAS. |
| 11 | + |
| 12 | +PCAS does not require the D‑App to host a gRPC server or listen on TCP ports. The D‑App is a gRPC client of PCAS (default: `127.0.0.1:50051` or your server address). |
| 13 | + |
| 14 | +## High‑Level Flow |
| 15 | + |
| 16 | +1) PCAS publishes a transcription request event and broadcasts it to subscribers. |
| 17 | +2) Your D‑App is subscribed to the bus, filters for the request type, and performs transcription. |
| 18 | +3) Your D‑App publishes a response event with `correlation_id` pointing to the original request `id`. |
| 19 | +4) PCAS stores the response event and broadcasts it to subscribers (including the original caller if it is listening). |
| 20 | + |
| 21 | +## Event Types |
| 22 | + |
| 23 | +Use the following event types (you can change the prefix to match your org, but keep the structure): |
| 24 | + |
| 25 | +- Request: `capability.audio.transcribe.request.v1` |
| 26 | +- Response: `capability.audio.transcribe.response.v1` |
| 27 | +- Error (optional): `capability.audio.transcribe.error.v1` |
| 28 | + |
| 29 | +## Request Event (PCAS → D‑App) |
| 30 | + |
| 31 | +- `type`: `capability.audio.transcribe.request.v1` |
| 32 | +- Core fields |
| 33 | + - `id`: globally unique ID (set by PCAS) |
| 34 | + - `trace_id`: tracing ID (propagated along the chain) |
| 35 | + - `user_id` (optional): end‑user context |
| 36 | + - `session_id` (optional): logical session |
| 37 | + - `attributes` (optional): key/value metadata |
| 38 | + - Recommended keys: `language`, `format`, `sample_rate`, `source` |
| 39 | +- `data` (one of) |
| 40 | + - `audio_base64` (string): base64‑encoded audio payload |
| 41 | + - `audio_url` (string): where to fetch the audio (preferred for large media) |
| 42 | + |
| 43 | +Example JSON (data payload only): |
| 44 | + |
| 45 | +```json |
| 46 | +{ |
| 47 | + "audio_base64": "<base64-bytes>", |
| 48 | + "language": "en", |
| 49 | + "format": "wav", |
| 50 | + "sample_rate": 16000 |
| 51 | +} |
| 52 | +``` |
| 53 | + |
| 54 | +Notes: |
| 55 | +- PCAS broadcasts inbound request events; your D‑App will receive them after subscribing. |
| 56 | +- For large audio, prefer `audio_url` and have the D‑App fetch the content. |
| 57 | + |
| 58 | +## Response Event (D‑App → PCAS) |
| 59 | + |
| 60 | +- `type`: `capability.audio.transcribe.response.v1` |
| 61 | +- Core fields |
| 62 | + - `correlation_id`: MUST equal the original request event `id` |
| 63 | + - `trace_id`: SHOULD copy the original `trace_id` |
| 64 | + - `user_id`/`session_id`: MAY be copied for filtering/analytics |
| 65 | + - `source`: set to your D‑App identifier, e.g., `dapp.dreamtrans` |
| 66 | +- `data` |
| 67 | + - `text` (string): transcription result (required) |
| 68 | + - `language` (string, optional) |
| 69 | + - `segments` (array, optional): detailed segmentation if available |
| 70 | + |
| 71 | +Example JSON (data payload only): |
| 72 | + |
| 73 | +```json |
| 74 | +{ |
| 75 | + "text": "Hello world, this is a demo.", |
| 76 | + "language": "en" |
| 77 | +} |
| 78 | +``` |
| 79 | + |
| 80 | +## Error Event (optional) |
| 81 | + |
| 82 | +- `type`: `capability.audio.transcribe.error.v1` |
| 83 | +- `correlation_id`: original request `id` |
| 84 | +- `data`: `{ "code": "...", "message": "..." }` |
| 85 | + |
| 86 | +## Best‑Practice Policy |
| 87 | + |
| 88 | +To keep PCAS from handling transcription itself (so your D‑App does it), DO NOT route the request type to an internal provider in `policy.yaml`. |
| 89 | + |
| 90 | +```yaml |
| 91 | +# policy.yaml (excerpt) |
| 92 | +providers: |
| 93 | + - name: mock-provider |
| 94 | + type: mock |
| 95 | + |
| 96 | +rules: |
| 97 | + # Intentionally no rule routing capability.audio.transcribe.request.v1 |
| 98 | + # so PCAS broadcasts it and your D-App processes it. |
| 99 | +``` |
| 100 | + |
| 101 | +## Minimal Go Example (Subscribe + Respond) |
| 102 | + |
| 103 | +This sample uses the generated gRPC client directly for full control of `correlation_id` and `trace_id`. |
| 104 | + |
| 105 | +```go |
| 106 | +package main |
| 107 | + |
| 108 | +import ( |
| 109 | + "context" |
| 110 | + "encoding/base64" |
| 111 | + "log" |
| 112 | + "time" |
| 113 | + |
| 114 | + "google.golang.org/grpc" |
| 115 | + "google.golang.org/grpc/credentials/insecure" |
| 116 | + "google.golang.org/protobuf/types/known/anypb" |
| 117 | + "google.golang.org/protobuf/types/known/structpb" |
| 118 | + "google.golang.org/protobuf/types/known/timestamppb" |
| 119 | + |
| 120 | + busv1 "github.com/soaringjerry/pcas/gen/go/pcas/bus/v1" |
| 121 | + eventsv1 "github.com/soaringjerry/pcas/gen/go/pcas/events/v1" |
| 122 | +) |
| 123 | + |
| 124 | +func main() { |
| 125 | + addr := "127.0.0.1:50051" // PCAS server address |
| 126 | + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) |
| 127 | + if err != nil { log.Fatal(err) } |
| 128 | + defer conn.Close() |
| 129 | + |
| 130 | + client := busv1.NewEventBusServiceClient(conn) |
| 131 | + |
| 132 | + // Subscribe to events |
| 133 | + sub, err := client.Subscribe(context.Background(), &busv1.SubscribeRequest{ClientId: "dreamtrans-dapp"}) |
| 134 | + if err != nil { log.Fatal(err) } |
| 135 | + |
| 136 | + for { |
| 137 | + evt, err := sub.Recv() |
| 138 | + if err != nil { log.Fatal(err) } |
| 139 | + if evt.GetType() != "capability.audio.transcribe.request.v1" { continue } |
| 140 | + |
| 141 | + // Extract audio |
| 142 | + var audioB64 string |
| 143 | + var language string |
| 144 | + if evt.Data != nil { |
| 145 | + val := &structpb.Value{} |
| 146 | + if evt.Data.UnmarshalTo(val) == nil { |
| 147 | + if m, ok := val.AsInterface().(map[string]interface{}); ok { |
| 148 | + if s, ok := m["audio_base64"].(string); ok { audioB64 = s } |
| 149 | + if s, ok := m["language"].(string); ok { language = s } |
| 150 | + } |
| 151 | + } |
| 152 | + } |
| 153 | + if audioB64 == "" { log.Println("missing audio_base64; skipping"); continue } |
| 154 | + |
| 155 | + // Decode and transcribe (replace with real DreamTrans call) |
| 156 | + audioBytes, _ := base64.StdEncoding.DecodeString(audioB64) |
| 157 | + _ = audioBytes // use bytes in your transcription API |
| 158 | + text := "<transcribed text>" // TODO: call DreamTrans here |
| 159 | + |
| 160 | + // Build response data |
| 161 | + respMap := map[string]interface{}{"text": text, "language": language} |
| 162 | + respVal, _ := structpb.NewValue(respMap) |
| 163 | + respAny, _ := anypb.New(respVal) |
| 164 | + |
| 165 | + // Publish response event |
| 166 | + resp := &eventsv1.Event{ |
| 167 | + Id: "", // let server assign or generate your own UUID |
| 168 | + Type: "capability.audio.transcribe.response.v1", |
| 169 | + Source: "dapp.dreamtrans", |
| 170 | + Specversion: "1.0", |
| 171 | + Time: timestamppb.Now(), |
| 172 | + TraceId: evt.GetTraceId(), |
| 173 | + CorrelationId: evt.GetId(), |
| 174 | + UserId: evt.GetUserId(), |
| 175 | + SessionId: evt.GetSessionId(), |
| 176 | + Data: respAny, |
| 177 | + } |
| 178 | + if _, err := client.Publish(context.Background(), resp); err != nil { |
| 179 | + log.Printf("publish response failed: %v", err) |
| 180 | + } |
| 181 | + } |
| 182 | +} |
| 183 | +``` |
| 184 | + |
| 185 | +## Operational Notes |
| 186 | + |
| 187 | +- Delivery semantics: subscription is a live stream; if disconnected, events may be missed. Keep your D‑App online and handle reconnects. |
| 188 | +- Backpressure: PCAS uses buffered channels; if your D‑App is slow, older events may drop. Consume promptly. |
| 189 | +- Large media: for large audio, prefer `audio_url` to avoid bloating event payloads. |
| 190 | +- Correlation: always set `correlation_id` on responses; a client can match it to the original request. |
| 191 | + |
| 192 | +## Testing Locally |
| 193 | + |
| 194 | +1) Start PCAS (compose or binary). |
| 195 | +2) Run your D‑App. |
| 196 | +3) Publish a test request from anywhere: |
| 197 | + |
| 198 | +```bash |
| 199 | +./bin/pcasctl emit \ |
| 200 | + --type capability.audio.transcribe.request.v1 \ |
| 201 | + --data '{"audio_base64":"<base64 sample>", "language":"en"}' |
| 202 | +``` |
| 203 | + |
| 204 | +4) Observe your D‑App logs and the broadcast response event. You can also run a second subscriber to verify response delivery. |
| 205 | + |
0 commit comments