-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathsse_parser.go
More file actions
124 lines (102 loc) · 2.36 KB
/
sse_parser.go
File metadata and controls
124 lines (102 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package aibridge
import (
"bufio"
"io"
"strconv"
"strings"
"sync"
)
const (
SSEEventTypeMessage = "message"
SSEEventTypeError = "error"
SSEEventTypePing = "ping"
)
type SSEEvent struct {
Type string
Data string
ID string
Retry int
}
type SSEParser struct {
events map[string][]SSEEvent
mu sync.RWMutex
}
func NewSSEParser() *SSEParser {
return &SSEParser{
events: make(map[string][]SSEEvent),
}
}
func (p *SSEParser) Parse(reader io.Reader) error {
scanner := bufio.NewScanner(reader)
var currentEvent SSEEvent
var dataLines []string
for scanner.Scan() {
line := scanner.Text()
// Empty line indicates end of event
if line == "" {
if len(dataLines) > 0 {
currentEvent.Data = strings.Join(dataLines, "\n")
}
// Default to message type if no event type specified
if currentEvent.Type == "" {
currentEvent.Type = SSEEventTypeMessage
}
// Store the event
p.mu.Lock()
p.events[currentEvent.Type] = append(p.events[currentEvent.Type], currentEvent)
p.mu.Unlock()
// Reset for next event
currentEvent = SSEEvent{}
dataLines = nil
continue
}
// Skip comments
if strings.HasPrefix(line, ":") {
continue
}
// Parse field:value format
if colonIndex := strings.Index(line, ":"); colonIndex != -1 {
field := line[:colonIndex]
value := line[colonIndex+1:]
// Remove leading space from value if present
if len(value) > 0 && value[0] == ' ' {
value = value[1:]
}
switch field {
case "event":
currentEvent.Type = value
case "data":
dataLines = append(dataLines, value)
case "id":
currentEvent.ID = value
case "retry":
if retryMs, err := strconv.Atoi(value); err == nil {
currentEvent.Retry = retryMs
}
}
}
}
return scanner.Err()
}
func (p *SSEParser) EventsByType(eventType string) []SSEEvent {
p.mu.RLock()
defer p.mu.RUnlock()
events := p.events[eventType]
result := make([]SSEEvent, len(events))
copy(result, events)
return result
}
func (p *SSEParser) MessageEvents() []SSEEvent {
return p.EventsByType(SSEEventTypeMessage)
}
func (p *SSEParser) AllEvents() map[string][]SSEEvent {
p.mu.RLock()
defer p.mu.RUnlock()
result := make(map[string][]SSEEvent)
for eventType, events := range p.events {
eventsCopy := make([]SSEEvent, len(events))
copy(eventsCopy, events)
result[eventType] = eventsCopy
}
return result
}