From e0cd51cd4276f8f8ebcbbb4fcc5f15ec35941433 Mon Sep 17 00:00:00 2001 From: nhsmw Date: Fri, 24 Apr 2026 15:13:56 +0800 Subject: [PATCH] Update consumer.go --- cmd/pulsar-consumer/consumer.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/cmd/pulsar-consumer/consumer.go b/cmd/pulsar-consumer/consumer.go index d23ed61675..9f647eca00 100644 --- a/cmd/pulsar-consumer/consumer.go +++ b/cmd/pulsar-consumer/consumer.go @@ -99,26 +99,24 @@ func newConsumer(ctx context.Context, option *option) *consumer { } func (c *consumer) readMessage(ctx context.Context) error { - msgChan := c.pulsarConsumer.Chan() defer func() { c.pulsarConsumer.Close() c.client.Close() }() for { - select { - case <-ctx.Done(): - log.Info("terminating: context cancelled") - return errors.Trace(ctx.Err()) - case consumerMsg := <-msgChan: - log.Debug("Received message", zap.Stringer("msgId", consumerMsg.ID()), zap.ByteString("content", consumerMsg.Payload())) - needCommit := c.writer.WriteMessage(ctx, consumerMsg) - if !needCommit { - continue - } - err := c.pulsarConsumer.AckID(consumerMsg.Message.ID()) - if err != nil { - log.Panic("Error ack message", zap.Error(err)) - } + consumerMsg, err := c.pulsarConsumer.Receive(ctx) + if err != nil { + log.Error("Receive message error", zap.Error(err)) + return errors.Trace(err) + } + log.Debug("Received message", zap.Stringer("msgId", consumerMsg.ID()), zap.ByteString("content", consumerMsg.Payload())) + needCommit := c.writer.WriteMessage(ctx, consumerMsg) + if !needCommit { + continue + } + err = c.pulsarConsumer.AckID(consumerMsg.ID()) + if err != nil { + log.Panic("Error ack message", zap.Error(err)) } } }