From 1abb4b4bf275e0367b0ee169047b1a26f4894b31 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 25 Mar 2026 16:39:20 -0700 Subject: [PATCH 1/7] Add checks for dependency corruption --- sdks/go/pkg/beam/artifact/materialize.go | 16 ++++++++-- sdks/go/pkg/beam/artifact/materialize_test.go | 28 +++++++++++++++++ .../runners/dataflow/internal/apiclient.py | 7 +++-- .../dataflow/internal/apiclient_test.py | 31 ++++++++++++------- 4 files changed, 64 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 624e30efcd2b..aef6fae99829 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -131,6 +131,7 @@ func newMaterializeWithClient(ctx context.Context, client jobpb.ArtifactRetrieva RoleUrn: URNStagingTo, RolePayload: rolePayload, }, + expectedSha256: filePayload.Sha256, }) } @@ -183,8 +184,9 @@ func MustExtractFilePayload(artifact *pipepb.ArtifactInformation) (string, strin } type artifact struct { - client jobpb.ArtifactRetrievalServiceClient - dep *pipepb.ArtifactInformation + client jobpb.ArtifactRetrievalServiceClient + dep *pipepb.ArtifactInformation + expectedSha256 string } func (a artifact) retrieve(ctx context.Context, dest string) error { @@ -231,7 +233,15 @@ func (a artifact) retrieve(ctx context.Context, dest string) error { stat, _ := fd.Stat() log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename, sha256Hash, stat.Size()) - return fd.Close() + if err := fd.Close(); err != nil { + return err + } + + if a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { + return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) + } + + return nil } func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) { diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index 31890ed045cc..d1a3cc5d506f 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -266,6 +266,19 @@ func TestNewRetrieveWithResolution(t *testing.T) { checkStagedFiles(mds, dest, expected, t) } +func TestNewRetrieveWithBadShaFails(t *testing.T) { + expected := map[string]string{"a.txt": "a"} + client := &fakeRetrievalService{artifacts: expected} + dest := makeTempDir(t) + defer os.RemoveAll(dest) + ctx := grpcx.WriteWorkerID(context.Background(), "worker") + + _, err := newMaterializeWithClient(ctx, client, client.fileArtifactsWithBadSha(), dest) + if err == nil { + t.Fatalf("expected materialization to fail due to bad sha256 mismatch") + } +} + func checkStagedFiles(mds []*pipepb.ArtifactInformation, dest string, expected map[string]string, t *testing.T) { if len(mds) != len(expected) { t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected)) @@ -323,6 +336,21 @@ func (fake *fakeRetrievalService) fileArtifactsWithoutStagingTo() []*pipepb.Arti return artifacts } +func (fake *fakeRetrievalService) fileArtifactsWithBadSha() []*pipepb.ArtifactInformation { + var artifacts []*pipepb.ArtifactInformation + for name := range fake.artifacts { + payload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{ + Path: filepath.Join("/tmp", name), + Sha256: "badhash", + }) + artifacts = append(artifacts, &pipepb.ArtifactInformation{ + TypeUrn: URNFileArtifact, + TypePayload: payload, + }) + } + return artifacts +} + func (fake *fakeRetrievalService) urlArtifactsWithoutStagingTo() []*pipepb.ArtifactInformation { var artifacts []*pipepb.ArtifactInformation for name := range fake.artifacts { diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 164ace532b23..193061868f0a 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -595,11 +595,12 @@ def _stage_resources(self, pipeline, options): else: remote_name = os.path.basename(type_payload.path) is_staged_role = False - - if self._enable_caching and not type_payload.sha256: + # compute sha256 even if caching is disabled. + # This is used to check the payload integrity along with caching. + if not type_payload.sha256: type_payload.sha256 = self._compute_sha256(type_payload.path) - if type_payload.sha256 and type_payload.sha256 in staged_hashes: + if self._enable_caching and type_payload.sha256 and type_payload.sha256 in staged_hashes: _LOGGER.info( 'Found duplicated artifact sha256: %s (%s)', type_payload.path, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index b767cef86b2e..8d1efd893b5b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,13 +1340,14 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient._LegacyDataflowStager, - 'stage_job_resources') as mock_stager: - client._stage_resources(pipeline, pipeline_options) + with mock.patch.object(apiclient.DataflowApplicationClient, '_compute_sha256', return_value='dummy_hash'): + with mock.patch.object(apiclient._LegacyDataflowStager, + 'stage_job_resources') as mock_stager: + client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', ''), ('/tmp/bar1', 'bar1', ''), - ('/tmp/baz', 'baz1', ''), ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', ''), ('/tmp/bar2', 'bar2', '')], + [('/tmp/foo1', 'foo1', 'dummy_hash'), ('/tmp/bar1', 'bar1', 'dummy_hash'), + ('/tmp/baz', 'baz1', 'dummy_hash'), ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'dummy_hash'), ('/tmp/bar2', 'bar2', 'dummy_hash')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1357,7 +1358,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo1' + url='gs://test-location/staging/foo1', + sha256='dummy_hash' ).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1366,7 +1368,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar1'). + url='gs://test-location/staging/bar1', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1375,7 +1378,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1396,7 +1400,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo2'). + url='gs://test-location/staging/foo2', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1405,7 +1410,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar2'). + url='gs://test-location/staging/bar2', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1414,7 +1420,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. From 106c037b4cc5d53e954663dca72d3189fe7b1f72 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 08:13:12 -0700 Subject: [PATCH 2/7] Add experiment to disable checks --- sdks/go/container/boot.go | 3 ++ sdks/go/pkg/beam/artifact/materialize.go | 25 +++++++++- sdks/go/pkg/beam/artifact/materialize_test.go | 47 +++++++++++++++++++ sdks/java/container/boot.go | 3 ++ sdks/python/container/boot.go | 3 ++ sdks/typescript/container/boot.go | 3 ++ 6 files changed, 83 insertions(+), 1 deletion(-) diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index b75201520f39..4e93f01b9d15 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -158,6 +158,9 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // (2) Retrieve the staged files. // // The Go SDK harness downloads the worker binary and invokes diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index aef6fae99829..2a787a1adbcb 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -39,6 +39,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" "google.golang.org/protobuf/proto" + structpb "google.golang.org/protobuf/types/known/structpb" ) // TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto @@ -237,7 +238,7 @@ func (a artifact) retrieve(ctx context.Context, dest string) error { return err } - if a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { + if isArtifactValidationEnabled(ctx) && a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) } @@ -521,3 +522,25 @@ func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata { } return ret } + +type contextKey string + +const pipelineOptionsKey contextKey = "pipeline_options" + +// Returns a new context carrying the full pipeline options struct. +func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context { + return context.WithValue(ctx, pipelineOptionsKey, options) +} + +// Parses pipeline options to check if "disable_integrity_checks" is enabled. +func isArtifactValidationEnabled(ctx context.Context) bool { + options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) + if options != nil { + for _, v := range options.GetFields()["options"].GetStructValue().GetFields()["experiments"].GetListValue().GetValues() { + if v.GetStringValue() == "disable_integrity_checks" { + return false + } + } + } + return true +} diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index d1a3cc5d506f..7901675e0f01 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" + structpb "google.golang.org/protobuf/types/known/structpb" ) // TestRetrieve tests that we can successfully retrieve fresh files. @@ -266,6 +267,23 @@ func TestNewRetrieveWithResolution(t *testing.T) { checkStagedFiles(mds, dest, expected, t) } +func TestIsArtifactValidationEnabled(t *testing.T) { + ctx := context.Background() + if !isArtifactValidationEnabled(ctx) { + t.Errorf("empty context should have validation enabled") + } + + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"disable_integrity_checks"}, + }, + }) + ctx2 := WithPipelineOptions(ctx, options) + if isArtifactValidationEnabled(ctx2) { + t.Errorf("populated context should have validation disabled") + } +} + func TestNewRetrieveWithBadShaFails(t *testing.T) { expected := map[string]string{"a.txt": "a"} client := &fakeRetrievalService{artifacts: expected} @@ -279,6 +297,35 @@ func TestNewRetrieveWithBadShaFails(t *testing.T) { } } +func TestNewRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { + expected := map[string]string{"a.txt": "a"} + client := &fakeRetrievalService{artifacts: expected} + dest := makeTempDir(t) + defer os.RemoveAll(dest) + + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"disable_integrity_checks"}, + }, + }) + ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "worker"), options) + + mds, err := newMaterializeWithClient(ctx, client, client.fileArtifactsWithBadSha(), dest) + if err != nil { + t.Fatalf("materialize failed but should have succeeded because validation was disabled via experiment: %v", err) + } + + generated := make(map[string]string) + for _, md := range mds { + name, _ := MustExtractFilePayload(md) + payload, _ := proto.Marshal(&pipepb.ArtifactStagingToRolePayload{ + StagedName: name}) + generated[name] = string(payload) + } + + checkStagedFiles(mds, dest, generated, t) +} + func checkStagedFiles(mds []*pipepb.ArtifactInformation, dest string, expected map[string]string, t *testing.T) { if len(mds) != len(expected) { t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected)) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index f6c33b635d3c..a5005e4dc320 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -105,6 +105,9 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // (2) Retrieve the staged user jars. We ignore any disk limit, // because the staged jars are mandatory. diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 7c0f22675daf..e0d2fd3d0937 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -184,6 +184,9 @@ func launchSDKProcess() error { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + experiments := getExperiments(options) pipNoBuildIsolation = false if slices.Contains(experiments, "pip_no_build_isolation") { diff --git a/sdks/typescript/container/boot.go b/sdks/typescript/container/boot.go index 44f94f804330..70c512d62b04 100644 --- a/sdks/typescript/container/boot.go +++ b/sdks/typescript/container/boot.go @@ -91,6 +91,9 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // (2) Retrieve and install the staged packages. dir := filepath.Join(*semiPersistDir, *id, "staged") From 082e3db68d828e1c9eea0903fae73cc8c8532a12 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 09:50:08 -0700 Subject: [PATCH 3/7] Add flag for legacy function too --- sdks/go/pkg/beam/artifact/materialize.go | 2 +- sdks/go/pkg/beam/artifact/materialize_test.go | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 2a787a1adbcb..4d117b049956 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -453,7 +453,7 @@ func retrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceCl } // Artifact Sha256 hash is an optional field in metadata so we should only validate when its present. - if a.Sha256 != "" && sha256Hash != a.Sha256 { + if isArtifactValidationEnabled(ctx) && a.Sha256 != "" && sha256Hash != a.Sha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256) } return nil diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index 7901675e0f01..9d527569d1bc 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -83,6 +83,57 @@ func TestMultiRetrieve(t *testing.T) { } } +func TestRetrieveWithBadShaFails(t *testing.T) { + cc := startServer(t) + defer cc.Close() + + ctx := grpcx.WriteWorkerID(context.Background(), "idA") + keys := []string{"foo"} + st := "whatever" + rt, artifacts := populate(ctx, cc, t, keys, 300, st) + + dst := makeTempDir(t) + defer os.RemoveAll(dst) + + client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc) + for _, a := range artifacts { + a.Sha256 = "badhash" // mutate hash + if err := Retrieve(ctx, client, a, rt, dst); err == nil { + t.Errorf("expected materialization to fail due to bad sha256 mismatch") + } + } +} + +func TestRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { + cc := startServer(t) + defer cc.Close() + + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"disable_integrity_checks"}, + }, + }) + ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "idA"), options) + keys := []string{"foo"} + st := "whatever" + rt, artifacts := populate(ctx, cc, t, keys, 300, st) + + dst := makeTempDir(t) + defer os.RemoveAll(dst) + + client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc) + for _, a := range artifacts { + originalHash := a.Sha256 + a.Sha256 = "badhash" // mutate hash + filename := makeFilename(dst, a.Name) + if err := Retrieve(ctx, client, a, rt, dst); err != nil { + t.Errorf("materialize failed but should have succeeded because validation was disabled via experiment: %v", err) + continue + } + verifySHA256(t, filename, originalHash) + } +} + // populate stages a set of artifacts with the given keys, each with // slightly different sizes and chucksizes. func populate(ctx context.Context, cc *grpc.ClientConn, t *testing.T, keys []string, size int, st string) (string, []*jobpb.ArtifactMetadata) { From 3ad52e7687cef4611280daf60b5013093032120f Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 10:49:01 -0700 Subject: [PATCH 4/7] fix lint --- sdks/go/pkg/beam/artifact/materialize.go | 4 +-- .../dataflow/internal/apiclient_test.py | 31 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 4d117b049956..edd59a26da17 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -527,12 +527,12 @@ type contextKey string const pipelineOptionsKey contextKey = "pipeline_options" -// Returns a new context carrying the full pipeline options struct. +// WithPipelineOptions returns a new context carrying the full pipeline options struct. func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context { return context.WithValue(ctx, pipelineOptionsKey, options) } -// Parses pipeline options to check if "disable_integrity_checks" is enabled. +// isArtifactValidationEnabled parses pipeline options to check if "disable_integrity_checks" is enabled. func isArtifactValidationEnabled(ctx context.Context) bool { options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) if options != nil { diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 8d1efd893b5b..c6cecc3c66f6 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,14 +1340,19 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient.DataflowApplicationClient, '_compute_sha256', return_value='dummy_hash'): + with mock.patch.object(apiclient.DataflowApplicationClient, + '_compute_sha256', + return_value='dummy_hash'): with mock.patch.object(apiclient._LegacyDataflowStager, 'stage_job_resources') as mock_stager: client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', 'dummy_hash'), ('/tmp/bar1', 'bar1', 'dummy_hash'), - ('/tmp/baz', 'baz1', 'dummy_hash'), ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', 'dummy_hash'), ('/tmp/bar2', 'bar2', 'dummy_hash')], + [('/tmp/foo1', 'foo1', 'dummy_hash'), + ('/tmp/bar1', 'bar1', 'dummy_hash'), + ('/tmp/baz', 'baz1', 'dummy_hash'), + ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'dummy_hash'), + ('/tmp/bar2', 'bar2', 'dummy_hash')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1359,8 +1364,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo1', - sha256='dummy_hash' - ).SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1369,8 +1373,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar1', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1379,8 +1382,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1401,8 +1403,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo2', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1411,8 +1412,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar2', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1421,8 +1421,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( From d0b0fbbb590a48b2c0f87e43a4783eb43466fae4 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 12:38:50 -0700 Subject: [PATCH 5/7] fix python flag --- .../runners/dataflow/internal/apiclient.py | 2 +- .../dataflow/internal/apiclient_test.py | 42 ++++++++----------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 193061868f0a..fe36f56df427 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -600,7 +600,7 @@ def _stage_resources(self, pipeline, options): if not type_payload.sha256: type_payload.sha256 = self._compute_sha256(type_payload.path) - if self._enable_caching and type_payload.sha256 and type_payload.sha256 in staged_hashes: + if type_payload.sha256 and type_payload.sha256 in staged_hashes: _LOGGER.info( 'Found duplicated artifact sha256: %s (%s)', type_payload.path, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index c6cecc3c66f6..b767cef86b2e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,19 +1340,13 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient.DataflowApplicationClient, - '_compute_sha256', - return_value='dummy_hash'): - with mock.patch.object(apiclient._LegacyDataflowStager, - 'stage_job_resources') as mock_stager: - client._stage_resources(pipeline, pipeline_options) + with mock.patch.object(apiclient._LegacyDataflowStager, + 'stage_job_resources') as mock_stager: + client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', 'dummy_hash'), - ('/tmp/bar1', 'bar1', 'dummy_hash'), - ('/tmp/baz', 'baz1', 'dummy_hash'), - ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', 'dummy_hash'), - ('/tmp/bar2', 'bar2', 'dummy_hash')], + [('/tmp/foo1', 'foo1', ''), ('/tmp/bar1', 'bar1', ''), + ('/tmp/baz', 'baz1', ''), ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', ''), ('/tmp/bar2', 'bar2', '')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1363,8 +1357,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/foo1' + ).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1372,8 +1366,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/bar1'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1381,8 +1375,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/baz1'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1402,8 +1396,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo2', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/foo2'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1411,8 +1405,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar2', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/bar2'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1420,8 +1414,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/baz1'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( From 6581d90e321d48fcfb1a0e1fe64a827292b8ef6e Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 13:21:04 -0700 Subject: [PATCH 6/7] fix test --- .../dataflow/internal/apiclient_test.py | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index b767cef86b2e..adb1feab53f7 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,13 +1340,21 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient._LegacyDataflowStager, - 'stage_job_resources') as mock_stager: - client._stage_resources(pipeline, pipeline_options) + with mock.patch.object(apiclient.DataflowApplicationClient, + '_compute_sha256', + side_effect=lambda path: 'hash' + path): + with mock.patch.object(apiclient._LegacyDataflowStager, + 'stage_job_resources') as mock_stager: + client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', ''), ('/tmp/bar1', 'bar1', ''), - ('/tmp/baz', 'baz1', ''), ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', ''), ('/tmp/bar2', 'bar2', '')], + [ + ('/tmp/foo1', 'foo1', 'hash/tmp/foo1'), + ('/tmp/bar1', 'bar1', 'hash/tmp/bar1'), + ('/tmp/baz', 'baz1', 'hash/tmp/baz'), + ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'hash/tmp/foo2'), + ('/tmp/bar2', 'bar2', 'hash/tmp/bar2') + ], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1357,7 +1365,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo1' + url='gs://test-location/staging/foo1', + sha256='hash/tmp/foo1' ).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1366,7 +1375,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar1'). + url='gs://test-location/staging/bar1', + sha256='hash/tmp/bar1'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1375,7 +1385,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='hash/tmp/baz'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1396,7 +1407,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo2'). + url='gs://test-location/staging/foo2', + sha256='hash/tmp/foo2'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1405,7 +1417,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar2'). + url='gs://test-location/staging/bar2', + sha256='hash/tmp/bar2'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1414,7 +1427,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='hash/tmp/baz'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. From 643fe5a7e73afb4ce5d6ea02012f45972691b8cb Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 13:49:27 -0700 Subject: [PATCH 7/7] fix formatter --- .../dataflow/internal/apiclient_test.py | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index adb1feab53f7..51f4264d3e45 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1347,14 +1347,12 @@ def test_stage_resources(self): 'stage_job_resources') as mock_stager: client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [ - ('/tmp/foo1', 'foo1', 'hash/tmp/foo1'), - ('/tmp/bar1', 'bar1', 'hash/tmp/bar1'), - ('/tmp/baz', 'baz1', 'hash/tmp/baz'), - ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', 'hash/tmp/foo2'), - ('/tmp/bar2', 'bar2', 'hash/tmp/bar2') - ], + [('/tmp/foo1', 'foo1', 'hash/tmp/foo1'), + ('/tmp/bar1', 'bar1', 'hash/tmp/bar1'), + ('/tmp/baz', 'baz1', 'hash/tmp/baz'), + ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'hash/tmp/foo2'), + ('/tmp/bar2', 'bar2', 'hash/tmp/bar2')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1366,8 +1364,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo1', - sha256='hash/tmp/foo1' - ).SerializeToString(), + sha256='hash/tmp/foo1').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1376,8 +1373,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar1', - sha256='hash/tmp/bar1'). - SerializeToString(), + sha256='hash/tmp/bar1').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1386,8 +1382,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='hash/tmp/baz'). - SerializeToString(), + sha256='hash/tmp/baz').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1408,8 +1403,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo2', - sha256='hash/tmp/foo2'). - SerializeToString(), + sha256='hash/tmp/foo2').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1418,8 +1412,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar2', - sha256='hash/tmp/bar2'). - SerializeToString(), + sha256='hash/tmp/bar2').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1428,8 +1421,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='hash/tmp/baz'). - SerializeToString(), + sha256='hash/tmp/baz').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload(