diff --git a/cmd/pulsar-consumer/consumer.go b/cmd/pulsar-consumer/consumer.go index d23ed61675..9f647eca00 100644 --- a/cmd/pulsar-consumer/consumer.go +++ b/cmd/pulsar-consumer/consumer.go @@ -99,26 +99,24 @@ func newConsumer(ctx context.Context, option *option) *consumer { } func (c *consumer) readMessage(ctx context.Context) error { - msgChan := c.pulsarConsumer.Chan() defer func() { c.pulsarConsumer.Close() c.client.Close() }() for { - select { - case <-ctx.Done(): - log.Info("terminating: context cancelled") - return errors.Trace(ctx.Err()) - case consumerMsg := <-msgChan: - log.Debug("Received message", zap.Stringer("msgId", consumerMsg.ID()), zap.ByteString("content", consumerMsg.Payload())) - needCommit := c.writer.WriteMessage(ctx, consumerMsg) - if !needCommit { - continue - } - err := c.pulsarConsumer.AckID(consumerMsg.Message.ID()) - if err != nil { - log.Panic("Error ack message", zap.Error(err)) - } + consumerMsg, err := c.pulsarConsumer.Receive(ctx) + if err != nil { + log.Error("Receive message error", zap.Error(err)) + return errors.Trace(err) + } + log.Debug("Received message", zap.Stringer("msgId", consumerMsg.ID()), zap.ByteString("content", consumerMsg.Payload())) + needCommit := c.writer.WriteMessage(ctx, consumerMsg) + if !needCommit { + continue + } + err = c.pulsarConsumer.AckID(consumerMsg.ID()) + if err != nil { + log.Panic("Error ack message", zap.Error(err)) } } }