Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@
"count": 3,
"state_size_max_bytes": 256,
"state_size_mean_bytes": 254,
"state_size_median_bytes": 254
"state_size_median_bytes": 254,
"state_compressed_size_max_bytes": COMPRESSED_SIZE,
"state_compressed_size_mean_bytes": COMPRESSED_SIZE,
"state_compressed_size_median_bytes": COMPRESSED_SIZE
},
{
"resource_type": "pipelines",
"count": 2,
"state_size_max_bytes": 205,
"state_size_mean_bytes": 205,
"state_size_median_bytes": 205
"state_size_median_bytes": 205,
"state_compressed_size_max_bytes": COMPRESSED_SIZE,
"state_compressed_size_mean_bytes": COMPRESSED_SIZE,
"state_compressed_size_median_bytes": COMPRESSED_SIZE
}
]
}
16 changes: 16 additions & 0 deletions acceptance/bundle/telemetry/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,19 @@ New = '[OS]'
[[Repls]]
Old = '"local_cache_measurements_ms": \[[^\]]*\]'
New = '"local_cache_measurements_ms": [...redacted...]'

# zstd-compressed state sizes depend on the klauspost/compress version, so they
# are not stable across dependency bumps (same reasoning that drops
# state_file_size_bytes from the golden). Assert the fields are present and
# numeric, not their exact value. The dstate unit test covers real compression.
[[Repls]]
Old = '"state_compressed_size_max_bytes": \d+'
New = '"state_compressed_size_max_bytes": COMPRESSED_SIZE'

[[Repls]]
Old = '"state_compressed_size_mean_bytes": \d+'
New = '"state_compressed_size_mean_bytes": COMPRESSED_SIZE'

[[Repls]]
Old = '"state_compressed_size_median_bytes": \d+'
New = '"state_compressed_size_median_bytes": COMPRESSED_SIZE'
28 changes: 25 additions & 3 deletions bundle/direct/dstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,29 @@ import (
"github.com/databricks/cli/internal/build"
"github.com/databricks/cli/libs/log"
"github.com/google/uuid"
"github.com/klauspost/compress/zstd"
)

// telemetryEncoder measures the zstd-compressed size of each resource's state
// for deploy telemetry. zstd.Encoder.EncodeAll is safe for concurrent use.
// NewWriter with default options does not error in practice; on the off chance
// it does, telemetryEncoderErr makes compressedStateSize degrade to zero rather
// than crash a deploy over a telemetry measurement.
var telemetryEncoder, telemetryEncoderErr = zstd.NewWriter(nil)

// compressedStateSize returns the zstd-compressed size in bytes of a resource's
// serialized state. It is used purely for deploy telemetry, to gauge how much
// resource state shrinks under compression (the deployment metadata service
// stores resource state zstd-compressed). Returns 0 for empty state or if the
// encoder is unavailable. The exact ratio is an approximation of the service's:
// both use zstd at default settings but different implementations.
func compressedStateSize(state []byte) int {
if len(state) == 0 || telemetryEncoderErr != nil {
return 0
}
return len(telemetryEncoder.EncodeAll(state, nil))
}

const (
currentStateVersion = 2
initialBufferSize = 64 * 1024
Expand Down Expand Up @@ -456,9 +477,10 @@ func ExportStateFromData(data Database) resourcestate.ExportedResourcesMap {
}

result[key] = resourcestate.ResourceState{
ID: entry.ID,
ETag: etag,
StateSizeBytes: len(entry.State),
ID: entry.ID,
ETag: etag,
StateSizeBytes: len(entry.State),
StateCompressedSizeBytes: compressedStateSize(entry.State),
}
}
return result
Expand Down
15 changes: 15 additions & 0 deletions bundle/direct/dstate/state_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dstate

import (
"bytes"
"os"
"path/filepath"
"testing"
Expand All @@ -15,6 +16,20 @@ func mustFinalize(t *testing.T, db *DeploymentState) {
require.NoError(t, err)
}

func TestCompressedStateSize(t *testing.T) {
require.NoError(t, telemetryEncoderErr)

// Empty state has no compressed size.
assert.Equal(t, 0, compressedStateSize(nil))
assert.Equal(t, 0, compressedStateSize([]byte{}))

// A highly compressible blob shrinks: positive size, smaller than raw.
blob := bytes.Repeat([]byte(`{"key":"value"}`), 1000)
got := compressedStateSize(blob)
assert.Positive(t, got)
assert.Less(t, got, len(blob))
}

func TestOpenSaveFinalizeRoundTrip(t *testing.T) {
path := filepath.Join(t.TempDir(), "state.json")

Expand Down
24 changes: 16 additions & 8 deletions bundle/phases/resources_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ const directEngine = "direct"
// Only direct deploys are measured. b.Metrics.ResourceState is the direct
// engine's finalized state, populated in deployCore from the WAL replay the
// deploy already performs; each entry carries StateSizeBytes (len of the JSON
// blob stored in resources.json). So no marshalling, file read, or JSON parsing
// blob stored in resources.json) and StateCompressedSizeBytes (its zstd-compressed
// length, computed during export). So no marshalling, file read, or JSON parsing
// happens here — sizes are read straight off the in-memory map. The whole-file
// size comes from a single os.Stat (no parse). Returns nil for terraform
// deploys (ResourceState is nil) and when no resources are in state.
Expand All @@ -44,16 +45,18 @@ func collectResourcesMetadata(ctx context.Context, b *bundle.Bundle) *protos.Bun
}

// resourceMetadataFromState groups the deployment state by resource type and
// computes per-type count and max/mean/median state size. Sizes are sorted per
// type (needed for the median).
// computes per-type count and max/mean/median state size, both raw and
// zstd-compressed. Sizes are sorted per type (needed for the median).
func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []protos.ResourceMetadata {
sizesByType := make(map[string][]int64)
compressedByType := make(map[string][]int64)
for key, rs := range state {
t := config.GetResourceTypeFromKey(key)
if t == "" {
continue
}
sizesByType[t] = append(sizesByType[t], int64(rs.StateSizeBytes))
compressedByType[t] = append(compressedByType[t], int64(rs.StateCompressedSizeBytes))
}

types := make([]string, 0, len(sizesByType))
Expand All @@ -66,12 +69,17 @@ func resourceMetadataFromState(state resourcestate.ExportedResourcesMap) []proto
for _, t := range types {
sizes := sizesByType[t]
slices.Sort(sizes)
compressed := compressedByType[t]
slices.Sort(compressed)
resources = append(resources, protos.ResourceMetadata{
ResourceType: t,
Count: int64(len(sizes)),
StateSizeMaxBytes: statMax(sizes),
StateSizeMeanBytes: statMean(sizes),
StateSizeMedianBytes: statMedian(sizes),
ResourceType: t,
Count: int64(len(sizes)),
StateSizeMaxBytes: statMax(sizes),
StateSizeMeanBytes: statMean(sizes),
StateSizeMedianBytes: statMedian(sizes),
StateCompressedSizeMaxBytes: statMax(compressed),
StateCompressedSizeMeanBytes: statMean(compressed),
StateCompressedSizeMedianBytes: statMedian(compressed),
})
}
return resources
Expand Down
40 changes: 29 additions & 11 deletions bundle/phases/resources_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,35 @@ import (

func TestResourceMetadataFromState_GroupsByType(t *testing.T) {
state := resourcestate.ExportedResourcesMap{
"resources.jobs.foo": {StateSizeBytes: 20},
"resources.jobs.bar": {StateSizeBytes: 10},
"resources.jobs.foo.permissions": {StateSizeBytes: 2},
"resources.pipelines.qux": {StateSizeBytes: 14},
"resources.jobs.foo": {StateSizeBytes: 20, StateCompressedSizeBytes: 12},
"resources.jobs.bar": {StateSizeBytes: 10, StateCompressedSizeBytes: 8},
"resources.jobs.foo.permissions": {StateSizeBytes: 2, StateCompressedSizeBytes: 3},
"resources.pipelines.qux": {StateSizeBytes: 14, StateCompressedSizeBytes: 9},
}

got := resourceMetadataFromState(state)

// Sorted by resource type. Sub-resources (permissions) group under
// "<parent>.permissions" per config.GetResourceTypeFromKey. jobs median is
// the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10.
// the lower-middle of sorted [10,20] -> index (2-1)/2 = 0 -> 10. Raw and
// compressed stats are computed independently (each slice sorted on its own),
// so a resource's raw and compressed values need not share a rank.
assert.Equal(t, []protos.ResourceMetadata{
{ResourceType: "jobs", Count: 2, StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10},
{ResourceType: "jobs.permissions", Count: 1, StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2},
{ResourceType: "pipelines", Count: 1, StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14},
{
ResourceType: "jobs", Count: 2,
StateSizeMaxBytes: 20, StateSizeMeanBytes: 15, StateSizeMedianBytes: 10,
StateCompressedSizeMaxBytes: 12, StateCompressedSizeMeanBytes: 10, StateCompressedSizeMedianBytes: 8,
},
{
ResourceType: "jobs.permissions", Count: 1,
StateSizeMaxBytes: 2, StateSizeMeanBytes: 2, StateSizeMedianBytes: 2,
StateCompressedSizeMaxBytes: 3, StateCompressedSizeMeanBytes: 3, StateCompressedSizeMedianBytes: 3,
},
{
ResourceType: "pipelines", Count: 1,
StateSizeMaxBytes: 14, StateSizeMeanBytes: 14, StateSizeMedianBytes: 14,
StateCompressedSizeMaxBytes: 9, StateCompressedSizeMeanBytes: 9, StateCompressedSizeMedianBytes: 9,
},
}, got)
}

Expand All @@ -39,12 +53,16 @@ func TestStatHelpers(t *testing.T) {

func TestResourceMetadataFromState_SkipsNonResourceKeys(t *testing.T) {
state := resourcestate.ExportedResourcesMap{
"resources.jobs.foo": {StateSizeBytes: 5},
"bogus": {StateSizeBytes: 99},
"resources.jobs.foo": {StateSizeBytes: 5, StateCompressedSizeBytes: 4},
"bogus": {StateSizeBytes: 99, StateCompressedSizeBytes: 50},
}
got := resourceMetadataFromState(state)
assert.Equal(t, []protos.ResourceMetadata{
{ResourceType: "jobs", Count: 1, StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5},
{
ResourceType: "jobs", Count: 1,
StateSizeMaxBytes: 5, StateSizeMeanBytes: 5, StateSizeMedianBytes: 5,
StateCompressedSizeMaxBytes: 4, StateCompressedSizeMeanBytes: 4, StateCompressedSizeMedianBytes: 4,
},
}, got)
}

Expand Down
7 changes: 7 additions & 0 deletions bundle/statemgmt/resourcestate/resourcestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ type ResourceState struct {
// direct engine (len of the JSON stored in resources.json) for deploy
// telemetry; left zero by the terraform path.
StateSizeBytes int

// Size in bytes of the resource's serialized state blob after zstd
// compression. Populated by the direct engine alongside StateSizeBytes for
// deploy telemetry; left zero by the terraform path. Used to gauge how much
// resource state shrinks under compression (the deployment metadata service
// stores state zstd-compressed).
StateCompressedSizeBytes int
}

// ExportedResourcesMap stores relevant attributes from terraform/direct state for all resources
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ require (
gopkg.in/ini.v1 v1.67.2 // Apache-2.0
)

require github.com/klauspost/compress v1.18.6

require (
cloud.google.com/go/auth v0.18.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao=
github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down
9 changes: 9 additions & 0 deletions libs/telemetry/protos/bundle_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ type ResourceMetadata struct {
StateSizeMaxBytes int64 `json:"state_size_max_bytes,omitempty"`
StateSizeMeanBytes int64 `json:"state_size_mean_bytes,omitempty"`
StateSizeMedianBytes int64 `json:"state_size_median_bytes,omitempty"`

// zstd-compressed state-size statistics across resources of this type, each
// measured as the zstd-compressed length of the same per-resource state
// blob. The deployment metadata service stores resource state compressed,
// so these capture how much it shrinks under compression rather than just
// the raw sizes above.
StateCompressedSizeMaxBytes int64 `json:"state_compressed_size_max_bytes,omitempty"`
StateCompressedSizeMeanBytes int64 `json:"state_compressed_size_mean_bytes,omitempty"`
StateCompressedSizeMedianBytes int64 `json:"state_compressed_size_median_bytes,omitempty"`
}

type BoolMapEntry struct {
Expand Down
Loading