Skip to content

Commit 9d49f83

Browse files
author
Brian Vandenberg
committed
fix(test): enable template support in test command
Templates loaded via the -t flag were not available during test execution because the manager defaulted to bundle.GlobalEnvironment instead of using the environment where templates were registered. This fix explicitly passes the environment to the manager, matching the pattern used by the run command (internal/cli/common/manager.go:107). The issue only affects RPK, which uses a cloned environment rather than bundle.GlobalEnvironment directly. Standalone Benthos is unaffected. Changes: - Add manager.OptSetEnvironment(p.env) in processors_provider.go - Add integration test verifying templates work with ProcessorsProvider
1 parent 480745b commit 9d49f83

2 files changed

Lines changed: 62 additions & 1 deletion

File tree

internal/cli/test/processors_provider.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,12 @@ func (b *bloblangProc) Close(context.Context) error {
135135
//------------------------------------------------------------------------------
136136

137137
func (p *ProcessorsProvider) initProcs(confs cachedConfig) ([]processor.V1, error) {
138-
mgr, err := manager.New(confs.mgr, manager.OptSetLogger(p.logger))
138+
// Explicitly pass the environment to ensure templates registered via -t flag
139+
// are available. This matches the pattern used in the run command.
140+
mgr, err := manager.New(confs.mgr,
141+
manager.OptSetLogger(p.logger),
142+
manager.OptSetEnvironment(p.env),
143+
)
139144
if err != nil {
140145
return nil, fmt.Errorf("failed to initialise resources: %v", err)
141146
}

internal/cli/test/processors_provider_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import (
1313
"github.com/stretchr/testify/require"
1414
yaml "gopkg.in/yaml.v3"
1515

16+
"github.com/redpanda-data/benthos/v4/internal/bloblang"
1617
"github.com/redpanda-data/benthos/v4/internal/bundle"
1718
"github.com/redpanda-data/benthos/v4/internal/cli/test"
1819
"github.com/redpanda-data/benthos/v4/internal/component/processor"
1920
"github.com/redpanda-data/benthos/v4/internal/config"
2021
"github.com/redpanda-data/benthos/v4/internal/log"
2122
"github.com/redpanda-data/benthos/v4/internal/message"
23+
"github.com/redpanda-data/benthos/v4/internal/template"
2224

2325
_ "github.com/redpanda-data/benthos/v4/public/components/io"
2426
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
@@ -413,3 +415,57 @@ pipeline:
413415
_, err = provider.Provide("/pipeline/processors", nil, nil)
414416
require.EqualError(t, err, "failed to initialise resources: cache resource label 'barcache' collides with a previously defined resource")
415417
}
418+
419+
func TestProcessorsProviderWithTemplate(t *testing.T) {
420+
// Create a cloned environment to simulate RPK's behavior where templates
421+
// are registered to a cloned environment, not bundle.GlobalEnvironment
422+
env := bundle.GlobalEnvironment.Clone()
423+
bloblEnv := bloblang.GlobalEnvironment()
424+
425+
// Register a simple test template
426+
templateYAML := []byte(`
427+
name: test_uppercase_processor
428+
type: processor
429+
fields:
430+
- name: prefix
431+
type: string
432+
default: "PREFIX: "
433+
mapping: |
434+
root.mapping = """root = "%s" + content().string().uppercase()""".format(this.prefix)
435+
`)
436+
437+
err := template.RegisterTemplateYAML(env, bloblEnv, templateYAML)
438+
require.NoError(t, err)
439+
440+
// Create test config using the template
441+
files := map[string]string{
442+
"config.yaml": `
443+
pipeline:
444+
processors:
445+
- test_uppercase_processor:
446+
prefix: "TEST: "
447+
`,
448+
}
449+
450+
testDir, err := initTestFiles(t, files)
451+
require.NoError(t, err)
452+
defer os.RemoveAll(testDir)
453+
454+
// Create ProcessorsProvider with the environment containing the template
455+
configPath := filepath.Join(testDir, "config.yaml")
456+
provider := test.NewProcessorsProvider(configPath, nil, config.Spec(), env, log.Noop())
457+
458+
// Extract processors - this should work now that we pass the environment
459+
procs, err := provider.Provide("/pipeline/processors", nil, nil)
460+
require.NoError(t, err)
461+
require.Len(t, procs, 1)
462+
463+
// Test the processor works correctly
464+
msg := message.Batch{message.NewPart([]byte("hello"))}
465+
results, err := procs[0].ProcessBatch(context.Background(), msg)
466+
require.NoError(t, err)
467+
require.Len(t, results, 1)
468+
require.Len(t, results[0], 1)
469+
470+
assert.Equal(t, "TEST: HELLO", string(results[0][0].AsBytes()))
471+
}

0 commit comments

Comments
 (0)