From bcf2d6e13bbb1e2634c9f7c8e97d83992affa841 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 15 Jun 2026 12:10:58 +0000 Subject: [PATCH] Track zstd-compressed resource state sizes in deploy telemetry (direct engine) Deploy telemetry already reports per-resource-type raw state-size statistics (state_size_{max,mean,median}_bytes). The deployment metadata service stores that same per-resource state zstd-compressed, so this adds the compressed-size counterparts to gauge how much resource state shrinks under compression rather than just the raw sizes: - state_compressed_size_max_bytes - state_compressed_size_mean_bytes - state_compressed_size_median_bytes The compressed length is computed once per resource at state-export time (alongside the existing raw length), using github.com/klauspost/compress/zstd. Only the direct engine is measured, matching the existing raw-size behavior. The acceptance golden masks the compressed values (COMPRESSED_SIZE) because they depend on the klauspost/compress version, like state_file_size_bytes is dropped for embedding the CLI version; the dstate unit test covers real compression behavior. Co-authored-by: Isaac --- .../deploy/out.resources_metadata.direct.txt | 10 ++++- acceptance/bundle/telemetry/test.toml | 16 ++++++++ bundle/direct/dstate/state.go | 28 +++++++++++-- bundle/direct/dstate/state_test.go | 15 +++++++ bundle/phases/resources_metadata.go | 24 +++++++---- bundle/phases/resources_metadata_test.go | 40 ++++++++++++++----- .../statemgmt/resourcestate/resourcestate.go | 7 ++++ go.mod | 2 + go.sum | 2 + libs/telemetry/protos/bundle_deploy.go | 9 +++++ 10 files changed, 129 insertions(+), 24 deletions(-) diff --git a/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt b/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt index 551a48fa89b..f1a750864ee 100644 --- a/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt +++ b/acceptance/bundle/telemetry/deploy/out.resources_metadata.direct.txt @@ -6,14 +6,20 @@ "count": 3, "state_size_max_bytes": 256, "state_size_mean_bytes": 254, - "state_size_median_bytes": 254 + "state_size_median_bytes": 254, + "state_compressed_size_max_bytes": COMPRESSED_SIZE, + "state_compressed_size_mean_bytes": COMPRESSED_SIZE, + "state_compressed_size_median_bytes": COMPRESSED_SIZE }, { "resource_type": "pipelines", "count": 2, "state_size_max_bytes": 205, "state_size_mean_bytes": 205, - "state_size_median_bytes": 205 + "state_size_median_bytes": 205, + "state_compressed_size_max_bytes": COMPRESSED_SIZE, + "state_compressed_size_mean_bytes": COMPRESSED_SIZE, + "state_compressed_size_median_bytes": COMPRESSED_SIZE } ] } diff --git a/acceptance/bundle/telemetry/test.toml b/acceptance/bundle/telemetry/test.toml index 14453c07f92..6f927d4df4f 100644 --- a/acceptance/bundle/telemetry/test.toml +++ b/acceptance/bundle/telemetry/test.toml @@ -20,3 +20,19 @@ New = '[OS]' [[Repls]] Old = '"local_cache_measurements_ms": \[[^\]]*\]' New = '"local_cache_measurements_ms": [...redacted...]' + +# zstd-compressed state sizes depend on the klauspost/compress version, so they +# are not stable across dependency bumps (same reasoning that drops +# state_file_size_bytes from the golden). Assert the fields are present and +# numeric, not their exact value. The dstate unit test covers real compression. +[[Repls]] +Old = '"state_compressed_size_max_bytes": \d+' +New = '"state_compressed_size_max_bytes": COMPRESSED_SIZE' + +[[Repls]] +Old = '"state_compressed_size_mean_bytes": \d+' +New = '"state_compressed_size_mean_bytes": COMPRESSED_SIZE' + +[[Repls]] +Old = '"state_compressed_size_median_bytes": \d+' +New = '"state_compressed_size_median_bytes": COMPRESSED_SIZE' diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index 54505677663..9fc82050182 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -18,8 +18,29 @@ import ( "github.com/databricks/cli/internal/build" "github.com/databricks/cli/libs/log" "github.com/google/uuid" + "github.com/klauspost/compress/zstd" ) +// telemetryEncoder measures the zstd-compressed size of each resource's state +// for deploy telemetry. zstd.Encoder.EncodeAll is safe for concurrent use. +// NewWriter with default options does not error in practice; on the off chance +// it does, telemetryEncoderErr makes compressedStateSize degrade to zero rather +// than crash a deploy over a telemetry measurement. +var telemetryEncoder, telemetryEncoderErr = zstd.NewWriter(nil) + +// compressedStateSize returns the zstd-compressed size in bytes of a resource's +// serialized state. It is used purely for deploy telemetry, to gauge how much +// resource state shrinks under compression (the deployment metadata service +// stores resource state zstd-compressed). Returns 0 for empty state or if the +// encoder is unavailable. The exact ratio is an approximation of the service's: +// both use zstd at default settings but different implementations. +func compressedStateSize(state []byte) int { + if len(state) == 0 || telemetryEncoderErr != nil { + return 0 + } + return len(telemetryEncoder.EncodeAll(state, nil)) +} + const ( currentStateVersion = 2 initialBufferSize = 64 * 1024 @@ -456,9 +477,10 @@ func ExportStateFromData(data Database) resourcestate.ExportedResourcesMap { } result[key] = resourcestate.ResourceState{ - ID: entry.ID, - ETag: etag, - StateSizeBytes: len(entry.State), + ID: entry.ID, + ETag: etag, + StateSizeBytes: len(entry.State), + StateCompressedSizeBytes: compressedStateSize(entry.State), } } return result diff --git a/bundle/direct/dstate/state_test.go b/bundle/direct/dstate/state_test.go index bbfd2559951..9a240d22279 100644 --- a/bundle/direct/dstate/state_test.go +++ b/bundle/direct/dstate/state_test.go @@ -1,6 +1,7 @@ package dstate import ( + "bytes" "os" "path/filepath" "testing" @@ -15,6 +16,20 @@ func mustFinalize(t *testing.T, db *DeploymentState) { require.NoError(t, err) } +func TestCompressedStateSize(t *testing.T) { + require.NoError(t, telemetryEncoderErr) + + // Empty state has no compressed size. + assert.Equal(t, 0, compressedStateSize(nil)) + assert.Equal(t, 0, compressedStateSize([]byte{})) + + // A highly compressible blob shrinks: positive size, smaller than raw. + blob := bytes.Repeat([]byte(`{"key":"value"}`), 1000) + got := compressedStateSize(blob) + assert.Positive(t, got) + assert.Less(t, got, len(blob)) +} + func TestOpenSaveFinalizeRoundTrip(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") diff --git a/bundle/phases/resources_metadata.go b/bundle/phases/resources_metadata.go index ae45f6ad46a..d6dc42a9aa5 100644 --- a/bundle/phases/resources_metadata.go +++ b/bundle/phases/resources_metadata.go @@ -21,7 +21,8 @@ const directEngine = "direct" // Only direct deploys are measured. b.Metrics.ResourceState is the direct // engine's finalized state, populated in deployCore from the WAL replay the // deploy already performs; each entry carries StateSizeBytes (len of the JSON -// blob stored in resources.json). So no marshalling, file read, or JSON parsing +// blob stored in resources.json) and StateCompressedSizeBytes (its zstd-compressed +// length, computed during export). So no marshalling, file read, or JSON parsing // happens here — sizes are read straight off the in-memory map. The whole-file // size comes from a single os.Stat (no parse). Returns nil for terraform // deploys (ResourceState is nil) and when no resources are in state. @@ -44,16 +45,18 @@ func collectResourcesMetadata(ctx context.Context, b *bundle.Bundle) *protos.Bun } // resourceMetadataFromState groups the deployment state by resource type and -// computes per-type count and max/mean/median state size. Sizes are sorted per -// type (needed for the median). +// computes per-type count and max/mean/median state size, both raw and +// zstd-compressed. Sizes are sorted per type (needed for the median). func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []protos.ResourceMetadata { sizesByType := make(map[string][]int64) + compressedByType := make(map[string][]int64) for key, rs := range state { t := config.GetResourceTypeFromKey(key) if t == "" { continue } sizesByType[t] = append(sizesByType[t], int64(rs.StateSizeBytes)) + compressedByType[t] = append(compressedByType[t], int64(rs.StateCompressedSizeBytes)) } types := make([]string, 0, len(sizesByType)) @@ -66,12 +69,17 @@ func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []proto for _, t := range types { sizes := sizesByType[t] slices.Sort(sizes) + compressed := compressedByType[t] + slices.Sort(compressed) resources = append(resources, protos.ResourceMetadata{ - ResourceType: t, - Count: int64(len(sizes)), - StateSizeMaxBytes: statMax(sizes), - StateSizeMeanBytes: statMean(sizes), - StateSizeMedianBytes: statMedian(sizes), + ResourceType: t, + Count: int64(len(sizes)), + StateSizeMaxBytes: statMax(sizes), + StateSizeMeanBytes: statMean(sizes), + StateSizeMedianBytes: statMedian(sizes), + StateCompressedSizeMaxBytes: statMax(compressed), + StateCompressedSizeMeanBytes: statMean(compressed), + StateCompressedSizeMedianBytes: statMedian(compressed), }) } return resources diff --git a/bundle/phases/resources_metadata_test.go b/bundle/phases/resources_metadata_test.go index bb34e43519a..c5fac1d2c43 100644 --- a/bundle/phases/resources_metadata_test.go +++ b/bundle/phases/resources_metadata_test.go @@ -11,21 +11,35 @@ import ( func TestResourceMetadataFromState_GroupsByType(t *testing.T) { state := resourcestate.ExportedResourcesMap{ - "resources.jobs.foo": {StateSizeBytes: 20}, - "resources.jobs.bar": {StateSizeBytes: 10}, - "resources.jobs.foo.permissions": {StateSizeBytes: 2}, - "resources.pipelines.qux": {StateSizeBytes: 14}, + "resources.jobs.foo": {StateSizeBytes: 20, StateCompressedSizeBytes: 12}, + "resources.jobs.bar": {StateSizeBytes: 10, StateCompressedSizeBytes: 8}, + "resources.jobs.foo.permissions": {StateSizeBytes: 2, StateCompressedSizeBytes: 3}, + "resources.pipelines.qux": {StateSizeBytes: 14, StateCompressedSizeBytes: 9}, } got := resourceMetadataFromState(state) // Sorted by resource type. Sub-resources (permissions) group under // ".permissions" per config.GetResourceTypeFromKey. jobs median is - // the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10. + // the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10. Raw and + // compressed stats are computed independently (each slice sorted on its own), + // so a resource's raw and compressed values need not share a rank. assert.Equal(t, []protos.ResourceMetadata{ - {ResourceType: "jobs", Count: 2, StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10}, - {ResourceType: "jobs.permissions", Count: 1, StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2}, - {ResourceType: "pipelines", Count: 1, StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14}, + { + ResourceType: "jobs", Count: 2, + StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10, + StateCompressedSizeMaxBytes: 12, StateCompressedSizeMeanBytes: 10, StateCompressedSizeMedianBytes: 8, + }, + { + ResourceType: "jobs.permissions", Count: 1, + StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2, + StateCompressedSizeMaxBytes: 3, StateCompressedSizeMeanBytes: 3, StateCompressedSizeMedianBytes: 3, + }, + { + ResourceType: "pipelines", Count: 1, + StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14, + StateCompressedSizeMaxBytes: 9, StateCompressedSizeMeanBytes: 9, StateCompressedSizeMedianBytes: 9, + }, }, got) } @@ -39,12 +53,16 @@ func TestStatHelpers(t *testing.T) { func TestResourceMetadataFromState_SkipsNonResourceKeys(t *testing.T) { state := resourcestate.ExportedResourcesMap{ - "resources.jobs.foo": {StateSizeBytes: 5}, - "bogus": {StateSizeBytes: 99}, + "resources.jobs.foo": {StateSizeBytes: 5, StateCompressedSizeBytes: 4}, + "bogus": {StateSizeBytes: 99, StateCompressedSizeBytes: 50}, } got := resourceMetadataFromState(state) assert.Equal(t, []protos.ResourceMetadata{ - {ResourceType: "jobs", Count: 1, StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5}, + { + ResourceType: "jobs", Count: 1, + StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5, + StateCompressedSizeMaxBytes: 4, StateCompressedSizeMeanBytes: 4, StateCompressedSizeMedianBytes: 4, + }, }, got) } diff --git a/bundle/statemgmt/resourcestate/resourcestate.go b/bundle/statemgmt/resourcestate/resourcestate.go index ec98f1bc827..c31f6b56062 100644 --- a/bundle/statemgmt/resourcestate/resourcestate.go +++ b/bundle/statemgmt/resourcestate/resourcestate.go @@ -12,6 +12,13 @@ type ResourceState struct { // direct engine (len of the JSON stored in resources.json) for deploy // telemetry; left zero by the terraform path. StateSizeBytes int + + // Size in bytes of the resource's serialized state blob after zstd + // compression. Populated by the direct engine alongside StateSizeBytes for + // deploy telemetry; left zero by the terraform path. Used to gauge how much + // resource state shrinks under compression (the deployment metadata service + // stores state zstd-compressed). + StateCompressedSizeBytes int } // ExportedResourcesMap stores relevant attributes from terraform/direct state for all resources diff --git a/go.mod b/go.mod index cd414efbb9b..23d2b27a0c8 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,8 @@ require ( gopkg.in/ini.v1 v1.67.2 // Apache-2.0 ) +require github.com/klauspost/compress v1.18.6 + require ( cloud.google.com/go/auth v0.18.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect diff --git a/go.sum b/go.sum index d73bc16fa46..9b09ae1b74d 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= +github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= +github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/libs/telemetry/protos/bundle_deploy.go b/libs/telemetry/protos/bundle_deploy.go index 73562782f1a..3bb019ff19e 100644 --- a/libs/telemetry/protos/bundle_deploy.go +++ b/libs/telemetry/protos/bundle_deploy.go @@ -123,6 +123,15 @@ type ResourceMetadata struct { StateSizeMaxBytes int64 `json:"state_size_max_bytes,omitempty"` StateSizeMeanBytes int64 `json:"state_size_mean_bytes,omitempty"` StateSizeMedianBytes int64 `json:"state_size_median_bytes,omitempty"` + + // zstd-compressed state-size statistics across resources of this type, each + // measured as the zstd-compressed length of the same per-resource state + // blob. The deployment metadata service stores resource state compressed, + // so these capture how much it shrinks under compression rather than just + // the raw sizes above. + StateCompressedSizeMaxBytes int64 `json:"state_compressed_size_max_bytes,omitempty"` + StateCompressedSizeMeanBytes int64 `json:"state_compressed_size_mean_bytes,omitempty"` + StateCompressedSizeMedianBytes int64 `json:"state_compressed_size_median_bytes,omitempty"` } type BoolMapEntry struct {