diff --git a/pkg/sources/forward/applier/sourcetransformer.go b/pkg/sources/forward/applier/sourcetransformer.go index bd81436190..795cd4c5a2 100644 --- a/pkg/sources/forward/applier/sourcetransformer.go +++ b/pkg/sources/forward/applier/sourcetransformer.go @@ -35,12 +35,3 @@ type ApplySourceTransformFunc func(ctx context.Context, message *isb.ReadMessage func (f ApplySourceTransformFunc) ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { return f(ctx, message) } - -var ( - // Terminal Applier do not make any change to the message - Terminal = ApplySourceTransformFunc(func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) { - return []*isb.WriteMessage{{ - Message: msg.Message, - }}, nil - }) -) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index f3e3f5c712..4f90e4ab6c 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -164,7 +164,10 @@ func (df *DataForward) Start() <-chan struct{} { for _, buffer := range df.toBuffers { for _, partition := range buffer { if err := partition.Close(); err != nil { - log.Errorw("Failed to close partition writer, shutdown anyways...", zap.Error(err), zap.String("bufferTo", partition.GetName())) + log.Errorw("Failed to close partition writer, shutdown anyways...", + zap.Error(err), + zap.String("bufferTo", partition.GetName()), + ) } else { log.Infow("Closed partition writer", zap.String("bufferTo", partition.GetName())) } @@ -192,6 +195,17 @@ type readWriteMessagePair struct { transformerError error } +// emitMessageSizeMetric emits the message size metric +func emitMessageSizeMetric(df *DataForward, m *isb.ReadMessage) { + metrics.ReadBytesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(m.Payload))) +} + // forwardAChunk forwards a chunk of message from the reader to the toBuffers. It does the Read -> Process -> Forward -> Ack chain // for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack // the message after forwarding, barring any platform errors. The platform errors include buffer-full, @@ -204,7 +218,13 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { readMessages, err := df.reader.Read(ctx, df.opts.readBatchSize) if err != nil { df.opts.logger.Warnw("failed to read from source", zap.Error(err)) - metrics.ReadMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Inc() + metrics.ReadMessagesError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Inc() } // if there are no read messages, we return early. @@ -236,7 +256,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { publisher = df.createToVertexWatermarkPublisher(toVertexName, sp) vertexPublishers[sp] = publisher } - idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, fetchedWm) + idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, + df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, fetchedWm) } } } @@ -248,9 +269,21 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // reset the idle handler because we have read messages df.srcIdleHandler.Reset() - - metrics.ReadDataMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readMessages))) - metrics.ReadMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readMessages))) + metrics.ReadDataMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName()}, + ).Add(float64(len(readMessages))) + + metrics.ReadMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(readMessages))) // store the offsets of the messages we read from source var readOffsets = make([]isb.Offset, len(readMessages)) @@ -270,51 +303,66 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { messageToStep[toVertex] = make([][]isb.Message, len(df.toBuffers[toVertex])) } - // FIXME: when the transformer is not defined, we should avoid doing this. - // user-defined transformer concurrent processing request channel - transformerCh := make(chan *readWriteMessagePair) - // transformerResults stores the results after user-defined transformer processing for all read messages. It indexes - // a read message to the corresponding write message - transformerResults := make([]readWriteMessagePair, len(readMessages)) - // applyTransformer, if there is an Internal error, it is a blocking call and - // will return only if shutdown has been initiated. - - // create a pool of Transformer Processors - var wg sync.WaitGroup - for i := 0; i < df.opts.transformerConcurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - df.concurrentApplyTransformer(ctx, transformerCh) - }() - } - concurrentTransformerProcessingStart := time.Now() - - // send to transformer only the data messages - for idx, m := range readMessages { - // emit message size metric - metrics.ReadBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(m.Payload))) - // assign watermark to the message - m.Watermark = time.Time(processorWM) - // send transformer processing work to the channel - transformerResults[idx].readMessage = m - transformerCh <- &transformerResults[idx] + readWriteMessagePairs := make([]readWriteMessagePair, len(readMessages)) + + // If a user-defined transformer exists, apply it + if df.transformer != nil { + // user-defined transformer concurrent processing request channel + transformerCh := make(chan *readWriteMessagePair) + + // create a pool of Transformer Processors + var wg sync.WaitGroup + for i := 0; i < df.opts.transformerConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + df.concurrentApplyTransformer(ctx, transformerCh) + }() + } + concurrentTransformerProcessingStart := time.Now() + for idx, m := range readMessages { + emitMessageSizeMetric(df, m) + // assign watermark to the message + m.Watermark = time.Time(processorWM) + readWriteMessagePairs[idx].readMessage = m + // send transformer processing work to the channel. Thus, the results of the transformer + // application on a read message will be stored as the corresponding writeMessage in readWriteMessagePairs + transformerCh <- &readWriteMessagePairs[idx] + } + // let the go routines know that there is no more work + close(transformerCh) + // wait till the processing is done. this will not be an infinite wait because the transformer processing will exit if + // context.Done() is closed. + wg.Wait() + df.opts.logger.Debugw("concurrent applyTransformer completed", + zap.Int("concurrency", df.opts.transformerConcurrency), + zap.Duration("took", time.Since(concurrentTransformerProcessingStart)), + ) + + metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) + } else { + for idx, m := range readMessages { + emitMessageSizeMetric(df, m) + // assign watermark to the message + m.Watermark = time.Time(processorWM) + readWriteMessagePairs[idx].readMessage = m + // if no user-defined transformer exists, then the messages to write will be identical to the message read from source + // thus, the unmodified read message will be stored as the corresponding writeMessage in readWriteMessagePairs + readWriteMessagePairs[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} + } } - // let the go routines know that there is no more work - close(transformerCh) - // wait till the processing is done. this will not be an infinite wait because the transformer processing will exit if - // context.Done() is closed. - wg.Wait() - df.opts.logger.Debugw("concurrent applyTransformer completed", zap.Int("concurrency", df.opts.transformerConcurrency), zap.Duration("took", time.Since(concurrentTransformerProcessingStart))) - metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) - // transformer processing is done. // publish source watermark and assign IsLate attribute based on new event time. var writeMessages []*isb.WriteMessage var transformedReadMessages []*isb.ReadMessage latestEtMap := make(map[int32]int64) - for _, m := range transformerResults { + for _, m := range readWriteMessagePairs { writeMessages = append(writeMessages, m.writeMessages...) for _, message := range m.writeMessages { message.Headers = m.readMessage.Headers @@ -345,11 +393,17 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { var sourcePartitionsIndices = make(map[int32]bool) // let's figure out which vertex to send the results to. // update the toBuffer(s) with writeMessages. - for _, m := range transformerResults { + for _, m := range readWriteMessagePairs { // Look for errors in transformer processing if we see even 1 error we return. // Handling partial retrying is not worth ATM. if m.transformerError != nil { - metrics.SourceTransformerError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Inc() + metrics.SourceTransformerError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Inc() + df.opts.logger.Errorw("failed to apply source transformer", zap.Error(m.transformerError)) return } @@ -418,7 +472,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { publisher = df.createToVertexWatermarkPublisher(toVertexName, sp) vertexPublishers[sp] = publisher } - idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, processorWM) + idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, + df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, processorWM) } } } @@ -431,13 +486,31 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // implicit return for posterity :-) if err != nil { df.opts.logger.Errorw("failed to ack from source", zap.Error(err)) - metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readOffsets))) + metrics.AckMessageError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(readOffsets))) + return } - metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readOffsets))) + metrics.AckMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(readOffsets))) // ProcessingTimes of the entire forwardAChunk - metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Observe(float64(time.Since(start).Microseconds())) + metrics.ForwardAChunkProcessingTime.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Observe(float64(time.Since(start).Microseconds())) } func (df *DataForward) ackFromSource(ctx context.Context, offsets []isb.Offset) error { @@ -509,15 +582,32 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. metrics.LabelReason: err.Error(), }).Add(float64(len(msg.Payload))) - df.opts.logger.Infow("Dropped message", zap.String("reason", err.Error()), zap.String("partition", toBufferPartition.GetName()), zap.String("vertex", df.vertexName), zap.String("pipeline", df.pipelineName)) + df.opts.logger.Infow("Dropped message", + zap.String("reason", err.Error()), + zap.String("partition", toBufferPartition.GetName()), + zap.String("vertex", df.vertexName), zap.String("pipeline", df.pipelineName), + ) } else { needRetry = true // we retry only failed messages failedMessages = append(failedMessages, msg) - metrics.WriteMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Inc() + metrics.WriteMessagesError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + }).Inc() + // a shutdown can break the blocking loop caused due to InternalErr if ok, _ := df.IsShuttingDown(); ok { - metrics.PlatformError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Inc() + metrics.PlatformError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Inc() + return writeOffsets, fmt.Errorf("writeToBuffer failed, Stop called while stuck on an internal error with failed messages:%d, %v", len(failedMessages), errs) } } @@ -547,8 +637,22 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. } } - metrics.WriteMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount)) - metrics.WriteBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes) + metrics.WriteMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + }).Add(float64(writeCount)) + + metrics.WriteBytesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + }).Add(writeBytes) + return writeOffsets, nil } @@ -556,12 +660,29 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. func (df *DataForward) concurrentApplyTransformer(ctx context.Context, readMessagePair <-chan *readWriteMessagePair) { for message := range readMessagePair { start := time.Now() - metrics.SourceTransformerReadMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Inc() + metrics.SourceTransformerReadMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Inc() + writeMessages, err := df.applyTransformer(ctx, message.readMessage) - metrics.SourceTransformerWriteMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(writeMessages))) + metrics.SourceTransformerWriteMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(writeMessages))) + message.writeMessages = append(message.writeMessages, writeMessages...) message.transformerError = err - metrics.SourceTransformerProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(start).Microseconds())) + metrics.SourceTransformerProcessingTime.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Observe(float64(time.Since(start).Microseconds())) } } @@ -580,7 +701,13 @@ func (df *DataForward) applyTransformer(ctx context.Context, readMessage *isb.Re // this does not mean we should prohibit this from a shutdown. if ok, _ := df.IsShuttingDown(); ok { df.opts.logger.Errorw("Transformer.Apply, Stop called while stuck on an internal error", zap.Error(err)) - metrics.PlatformError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Inc() + metrics.PlatformError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Inc() + return nil, err } continue @@ -598,11 +725,23 @@ func (df *DataForward) whereToStep(writeMessage *isb.WriteMessage, messageToStep // call WhereTo and drop it on errors to, err := df.toWhichStepDecider.WhereTo(writeMessage.Keys, writeMessage.Tags, writeMessage.ID) if err != nil { - df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{Name: df.reader.GetName(), Header: readMessage.Header, Body: readMessage.Body, Message: fmt.Sprintf("WhereTo failed, %s", err)})) + df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{ + Name: df.reader.GetName(), + Header: readMessage.Header, + Body: readMessage.Body, + Message: fmt.Sprintf("WhereTo failed, %s", err), + })) + // a shutdown can break the blocking loop caused due to InternalErr if ok, _ := df.IsShuttingDown(); ok { err := fmt.Errorf("whereToStep, Stop called while stuck on an internal error, %v", err) - metrics.PlatformError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Inc() + metrics.PlatformError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Inc() + return err } return err @@ -610,7 +749,12 @@ func (df *DataForward) whereToStep(writeMessage *isb.WriteMessage, messageToStep for _, t := range to { if _, ok := messageToStep[t.ToVertexName]; !ok { - df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{Name: df.reader.GetName(), Header: readMessage.Header, Body: readMessage.Body, Message: fmt.Sprintf("no such destination (%s)", t.ToVertexName)})) + df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{ + Name: df.reader.GetName(), + Header: readMessage.Header, + Body: readMessage.Body, + Message: fmt.Sprintf("no such destination (%s)", t.ToVertexName), + })) } messageToStep[t.ToVertexName][t.ToVertexPartitionIdx] = append(messageToStep[t.ToVertexName][t.ToVertexPartitionIdx], writeMessage.Message) } diff --git a/pkg/sources/generator/tickgen_test.go b/pkg/sources/generator/tickgen_test.go index 7eec6543cb..96eed7a12c 100644 --- a/pkg/sources/generator/tickgen_test.go +++ b/pkg/sources/generator/tickgen_test.go @@ -49,6 +49,7 @@ func TestRead(t *testing.T) { Replica: 0, } + mGen, err := NewMemGen(ctx, m, WithReadTimeout(3*time.Second)) assert.NoError(t, err) messages, err := mGen.Read(ctx, 5) @@ -66,4 +67,4 @@ func TestTimeForInvalidTime(t *testing.T) { nanotime := int64(-1) parsedtime := timeFromNanos(nanotime, 0) assert.True(t, parsedtime.UnixNano() > 0) -} +} \ No newline at end of file diff --git a/pkg/sources/http/http_test.go b/pkg/sources/http/http_test.go index c812cf9a0b..9e5b17177e 100644 --- a/pkg/sources/http/http_test.go +++ b/pkg/sources/http/http_test.go @@ -61,6 +61,7 @@ func Test_NewHTTP(t *testing.T) { Hostname: "test-host", Replica: 0, } + h, err := NewHttpSource(ctx, vi) assert.NoError(t, err) assert.NotNil(t, h.(*httpSource).shutdown) diff --git a/pkg/sources/source.go b/pkg/sources/source.go index 682a7e3f63..1bad4b66e9 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -41,7 +41,6 @@ import ( sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/shuffle" sourceforward "github.com/numaproj/numaflow/pkg/sources/forward" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/sources/generator" "github.com/numaproj/numaflow/pkg/sources/http" "github.com/numaproj/numaflow/pkg/sources/kafka" @@ -147,7 +146,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { } // created watermark related components only if watermark is enabled - // otherwise no op will used + // otherwise no op will be used if !sp.VertexInstance.Vertex.Spec.Watermark.Disabled { // build watermark stores for from vertex sourceWmStores, err = jetstream.BuildFromVertexWatermarkStores(ctx, sp.VertexInstance, natsClientPool.NextAvailableClient()) @@ -271,7 +270,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { if sp.VertexInstance.Vertex.HasUDTransformer() { sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getTransformerGoWhereDecider(shuffleFuncMap), srcTransformerGRPCClient, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...) } else { - sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), applier.Terminal, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...) + sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), nil, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...) } if err != nil { return fmt.Errorf("failed to create source forwarder, error: %w", err)