Skip to content

Commit ec29410

Browse files
author
Alok Kumar Singh
authored
Merge pull request #171 from practo/fix-162
Applying the learnings of ctx handling and fixing #170
2 parents 0f16d43 + 31461a8 commit ec29410

6 files changed

Lines changed: 94 additions & 59 deletions

File tree

redshiftsink/cmd/redshiftbatcher/main.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func run(cmd *cobra.Command, args []string) {
7373
}
7474

7575
ctx, cancel := context.WithCancel(context.Background())
76+
defer cancel()
7677

7778
consumerGroups := make(map[string]kafka.ConsumerGroupInterface)
7879
var consumersReady []chan bool
@@ -105,43 +106,39 @@ func run(cmd *cobra.Command, args []string) {
105106
groupID,
106107
groupConfig.TopicRegexes,
107108
)
109+
108110
wg.Add(1)
109111
go manager.SyncTopics(ctx, wg)
112+
110113
wg.Add(1)
111114
go manager.Consume(ctx, wg)
112115
}
116+
klog.V(2).Infof("consumerGroups: %v", len(consumersReady))
113117

114-
sigterm := make(chan os.Signal, 1)
115-
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
116-
ready := 0
117-
118-
klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady))
119-
for ready >= 0 {
118+
go func() {
119+
sigterm := make(chan os.Signal, 1)
120+
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
120121
select {
121-
default:
122122
case <-sigterm:
123123
klog.V(2).Info("SIGTERM signal received")
124-
ready = -1
124+
cancel()
125+
klog.V(2).Info("Cancelled main context")
125126
}
127+
}()
126128

127-
if ready == -1 || ready == len(consumersReady) {
128-
time.Sleep(3 * time.Second)
129-
continue
130-
}
131-
132-
for _, channel := range consumersReady {
129+
go func() {
130+
for i, c := range consumersReady {
133131
select {
134-
case <-channel:
135-
ready += 1
136-
klog.V(2).Infof("ConsumerGroup #%d is up and running", ready)
132+
case <-c:
133+
klog.V(2).Infof(
134+
"#%d consumerGroup is up and running",
135+
i,
136+
)
137137
}
138138
}
139-
}
140-
141-
klog.V(2).Info("Cancelled main context")
142-
cancel()
139+
}()
143140

144-
klog.V(2).Info("Waiting for all goroutines to shutdown...")
141+
klog.V(2).Info("wg wait()")
145142
wg.Wait()
146143

147144
var closeErr error

redshiftsink/cmd/redshiftloader/main.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -125,43 +125,39 @@ func run(cmd *cobra.Command, args []string) {
125125
groupConfig.TopicRegexes,
126126
// cancel,
127127
)
128+
128129
wg.Add(1)
129130
go manager.SyncTopics(ctx, wg)
131+
130132
wg.Add(1)
131133
go manager.Consume(ctx, wg)
132134
}
135+
klog.V(2).Infof("consumerGroups: %v", len(consumersReady))
133136

134-
sigterm := make(chan os.Signal, 1)
135-
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
136-
ready := 0
137-
138-
klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady))
139-
for ready >= 0 {
137+
go func() {
138+
sigterm := make(chan os.Signal, 1)
139+
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
140140
select {
141-
default:
142141
case <-sigterm:
143142
klog.V(2).Info("SIGTERM signal received")
144-
ready = -1
143+
cancel()
144+
klog.V(2).Info("Cancelled main context")
145145
}
146+
}()
146147

147-
if ready == -1 || ready == len(consumersReady) {
148-
time.Sleep(3 * time.Second)
149-
continue
150-
}
151-
152-
for _, channel := range consumersReady {
148+
go func() {
149+
for i, c := range consumersReady {
153150
select {
154-
case <-channel:
155-
ready += 1
156-
klog.V(2).Infof("ConsumerGroup #%d is up and running", ready)
151+
case <-c:
152+
klog.V(2).Infof(
153+
"#%d consumerGroup is up and running",
154+
i,
155+
)
157156
}
158157
}
159-
}
160-
161-
klog.V(2).Info("Cancelling context to trigger graceful shutdown...")
162-
cancel()
158+
}()
163159

164-
klog.V(2).Info("Waiting for waitgroups to shutdown...")
160+
klog.V(2).Info("wg wait()")
165161
wg.Wait()
166162

167163
var closeErr error

redshiftsink/pkg/kafka/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ func (c *Manager) SyncTopics(
192192

193193
select {
194194
case <-ctx.Done():
195+
klog.V(2).Info("ctx cancelled bye")
195196
return
196197
case <-ticker.C:
197198
continue

redshiftsink/pkg/redshiftbatcher/batch_processor.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,8 @@ func (b *batchProcessor) Process(
456456
bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)),
457457
maskSchema: make(map[string]serializer.MaskInfo),
458458
}
459-
go b.processBatch(wg, session, msgBuf, resp)
460459
wg.Add(1)
460+
go b.processBatch(wg, session, msgBuf, resp)
461461
responses = append(responses, resp)
462462
}
463463
if len(responses) == 0 {
@@ -488,7 +488,18 @@ func (b *batchProcessor) Process(
488488
"%s, error(s) occured in processing (sending err)", b.topic,
489489
)
490490
b.handleShutdown()
491-
errChan <- errors
491+
492+
// send to channel with context check, fix #170
493+
select {
494+
case <-session.Context().Done():
495+
klog.V(2).Infof(
496+
"%s: processor returning, session ctx done",
497+
b.topic,
498+
)
499+
return
500+
case errChan <- errors:
501+
}
502+
492503
klog.Errorf(
493504
"%s, error(s) occured: %+v, processor shutdown.",
494505
b.topic,
@@ -501,9 +512,27 @@ func (b *batchProcessor) Process(
501512
// failure in between signal and marking the offset can lead to
502513
// duplicates in the loader topic, but it's ok as loader is idempotent
503514
for _, resp := range responses {
515+
select {
516+
default:
517+
case <-session.Context().Done():
518+
klog.V(2).Infof(
519+
"%s: processor returning, session ctx done",
520+
b.topic,
521+
)
522+
return
523+
}
504524
err := b.signalLoad(resp)
505525
if err != nil {
506-
errChan <- err
526+
// send to channel with context check, fix #170
527+
select {
528+
case <-session.Context().Done():
529+
klog.V(2).Infof(
530+
"%s: processor returning, session ctx done",
531+
b.topic,
532+
)
533+
return
534+
case errChan <- err:
535+
}
507536
klog.Errorf(
508537
"%s, error signalling: %v, processor shutdown.",
509538
b.topic,

redshiftsink/pkg/redshiftbatcher/batcher_handler.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,14 @@ func (h *batcherHandler) ConsumeClaim(
158158
)
159159

160160
wg := &sync.WaitGroup{}
161-
go processor.Process(wg, session, processChan, errChan)
162161
wg.Add(1)
163-
defer wg.Wait()
162+
go processor.Process(wg, session, processChan, errChan)
163+
164+
defer func() {
165+
klog.V(2).Infof("%s: wg wait() for processing to return", claim.Topic())
166+
wg.Wait()
167+
klog.V(2).Infof("%s: wg done. processing returned", claim.Topic())
168+
}()
164169

165170
klog.V(4).Infof("%s: read msgs", claim.Topic())
166171
// NOTE:
@@ -204,10 +209,10 @@ func (h *batcherHandler) ConsumeClaim(
204209
// Deserialize the message
205210
msg, err := h.serializer.Deserialize(message)
206211
if err != nil {
207-
return fmt.Errorf("error deserializing binary, err: %s\n", err)
212+
return fmt.Errorf("%s: consumeClaim returning, error deserializing binary, err: %s\n", claim.Topic(), err)
208213
}
209214
if msg == nil || msg.Value == nil {
210-
return fmt.Errorf("got message as nil, message: %+v\n", msg)
215+
return fmt.Errorf("%s: consumeClaim returning, error, got message as nil, message: %+v\n", claim.Topic(), msg)
211216
}
212217

213218
if lastSchemaId == nil {
@@ -220,22 +225,22 @@ func (h *batcherHandler) ConsumeClaim(
220225
msg.SchemaId,
221226
)
222227
// Flush the batch due to schema change
223-
msgBatch.Flush()
228+
msgBatch.Flush(session.Context())
224229
}
225230
// Flush the batch by size or insert in batch
226-
msgBatch.Insert(msg)
231+
msgBatch.Insert(session.Context(), msg)
227232
*lastSchemaId = msg.SchemaId
228233
case <-maxWaitTicker.C:
229234
// Flush the batch by time
230235
klog.V(2).Infof(
231236
"%s: maxWaitSeconds hit",
232237
claim.Topic(),
233238
)
234-
msgBatch.Flush()
239+
msgBatch.Flush(session.Context())
235240
case err := <-errChan:
236241
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
237242
klog.Errorf(
238-
"%s: error occured in processing, err: %v, triggered shutdown",
243+
"consumeClaim returning, %s: error occured in processing, err: %v, triggered shutdown",
239244
claim.Topic(),
240245
err,
241246
)

redshiftsink/pkg/serializer/message.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package serializer
22

33
import (
4+
"context"
45
"github.com/Shopify/sarama"
56
"github.com/practo/klog/v2"
67
"sync"
@@ -54,10 +55,16 @@ func NewMessageAsyncBatch(
5455
}
5556
}
5657

57-
func (b *MessageAsyncBatch) Flush() {
58+
func (b *MessageAsyncBatch) Flush(ctx context.Context) {
5859
size := len(b.msgBuf)
5960
if size > 0 {
60-
b.processChan <- b.msgBuf
61+
// write to channel with context check, fixes #170
62+
select {
63+
case <-ctx.Done():
64+
klog.V(2).Infof("%s: flush cancelled, ctx done, return", b.topic)
65+
return
66+
case b.processChan <- b.msgBuf:
67+
}
6168
b.msgBuf = make([]*Message, 0, b.maxSize)
6269
klog.V(4).Infof(
6370
"%s: flushed:%d, processChan:%v",
@@ -75,14 +82,14 @@ func (b *MessageAsyncBatch) Flush() {
7582

7683
// insert makes the batch and also and flushes to the processor
7784
// if batchSize >= maxSize
78-
func (b *MessageAsyncBatch) Insert(msg *Message) {
85+
func (b *MessageAsyncBatch) Insert(ctx context.Context, msg *Message) {
7986
b.msgBuf = append(b.msgBuf, msg)
8087
if len(b.msgBuf) >= b.maxSize {
8188
klog.V(2).Infof(
8289
"%s: maxSize hit",
8390
msg.Topic,
8491
)
85-
b.Flush()
92+
b.Flush(ctx)
8693
}
8794
}
8895

0 commit comments

Comments
 (0)