Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 10 additions & 38 deletions config/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand/v2"
mrand "math/rand/v2"
"sync"

"github.com/sei-protocol/sei-load/utils/rng"
)

var (
Expand All @@ -18,7 +16,7 @@ var (

// indexSampler draws an index in [0, n) from some keyspace distribution.
type indexSampler interface {
SampleIndex(n uint64) (uint64, error)
SampleIndex(rng *mrand.Rand, n uint64) (uint64, error)
}

// Distribution is a tagged keyspace index sampler selected by a "Name"
Expand All @@ -31,25 +29,13 @@ type Distribution struct {

func (d *Distribution) Name() string { return d.name }

// SetStream binds the sampler to a deterministic sub-stream (nil = unseeded
// global RNG); a zero-value Distribution draws nothing, so it no-ops. See
// package doc for the reproducibility contract.
func (d *Distribution) SetStream(s *rng.Stream) {
switch delegate := d.delegate.(type) {
case *UniformDistribution:
delegate.stream = s
case *ZipfianDistribution:
delegate.stream = s
}
}

// SampleIndex delegates to the selected sampler; a zero-value (no Name)
// Distribution returns 0.
func (d *Distribution) SampleIndex(n uint64) (uint64, error) {
func (d *Distribution) SampleIndex(rng *mrand.Rand, n uint64) (uint64, error) {
if d.delegate == nil {
return 0, nil
}
return d.delegate.SampleIndex(n)
return d.delegate.SampleIndex(rng, n)
}

func (d *Distribution) UnmarshalJSON(data []byte) error {
Expand All @@ -64,7 +50,7 @@ func (d *Distribution) UnmarshalJSON(data []byte) error {
case "":
return nil
case "uniform":
// No JSON parameters; the stream is bound later via SetStream.
// No JSON parameters; the PRNG is supplied at draw time.
d.delegate = &UniformDistribution{}
return nil
case "zipfian":
Expand All @@ -83,20 +69,13 @@ func (d *Distribution) UnmarshalJSON(data []byte) error {
}

// UniformDistribution draws each index with equal probability.
//
// copy-safe: holds no mutex; the *rng.Stream pointer aliases on copy.
type UniformDistribution struct {
stream *rng.Stream
}
type UniformDistribution struct{}

func (u *UniformDistribution) SampleIndex(n uint64) (uint64, error) {
func (u *UniformDistribution) SampleIndex(rng *mrand.Rand, n uint64) (uint64, error) {
if n == 0 {
return 0, fmt.Errorf("uniform sample: empty keyspace (n == 0)")
}
if u.stream != nil {
return u.stream.Uint64N(n), nil
}
return rand.Uint64N(n), nil
return rng.Uint64N(n), nil
}

// ZipfianDistribution is the YCSB precomputed-zeta generator: zeta(n, theta) is
Expand All @@ -107,8 +86,6 @@ func (u *UniformDistribution) SampleIndex(n uint64) (uint64, error) {
type ZipfianDistribution struct {
Theta float64 `json:"theta"`

stream *rng.Stream

mu sync.Mutex
state *zipfState // memoized for state.n; recomputed when n changes.
}
Expand Down Expand Up @@ -169,7 +146,7 @@ func (z *ZipfianDistribution) validate() error {
// SampleIndex draws a Zipf-skewed index in [0, n). n must be stable per sampler:
// the zeta cache is keyed on n, so a changing n recomputes O(n) every draw. See
// package doc.
func (z *ZipfianDistribution) SampleIndex(n uint64) (uint64, error) {
func (z *ZipfianDistribution) SampleIndex(rng *mrand.Rand, n uint64) (uint64, error) {
if n == 0 {
return 0, fmt.Errorf("zipfian sample: empty keyspace (n == 0)")
}
Expand All @@ -181,12 +158,7 @@ func (z *ZipfianDistribution) SampleIndex(n uint64) (uint64, error) {
st := z.state
z.mu.Unlock()

var u float64
if z.stream != nil {
u = z.stream.Float64()
} else {
u = rand.Float64()
}
u := rng.Float64()
uz := u * st.zetaN
if uz < 1.0 {
return 0, nil
Expand Down
51 changes: 18 additions & 33 deletions config/distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config_test
import (
"encoding/json"
"fmt"
mrand "math/rand/v2"
"os"
"path/filepath"
"testing"
Expand All @@ -19,7 +20,7 @@ func TestDistribution(t *testing.T) {
var subject config.Distribution
require.NoError(t, subject.UnmarshalJSON([]byte(`{}`)))
require.Empty(t, subject.Name())
idx, err := subject.SampleIndex(100)
idx, err := subject.SampleIndex(rng.NewSource(1).Rand("config:distribution:test"), 100)
require.NoError(t, err)
require.Zero(t, idx)
})
Expand Down Expand Up @@ -55,13 +56,12 @@ func distribution(t *testing.T, raw string) *config.Distribution {
return &d
}

// sample binds d to stream and pulls count draws over keyspace n.
func sample(t *testing.T, d *config.Distribution, s *rng.Stream, n uint64, count int) []uint64 {
// sample binds d to a PRNG and pulls count draws over keyspace n.
func sample(t *testing.T, d *config.Distribution, rng *mrand.Rand, n uint64, count int) []uint64 {
t.Helper()
d.SetStream(s)
out := make([]uint64, count)
for i := range out {
v, err := d.SampleIndex(n)
v, err := d.SampleIndex(rng, n)
require.NoError(t, err)
require.Less(t, v, n, "draw out of range [0, n)")
out[i] = v
Expand All @@ -74,7 +74,7 @@ func sample(t *testing.T, d *config.Distribution, s *rng.Stream, n uint64, count
func TestSampleIndexEmptyKeyspace(t *testing.T) {
t.Parallel()
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.9}`} {
_, err := distribution(t, raw).SampleIndex(0)
_, err := distribution(t, raw).SampleIndex(rng.NewSource(1).Rand("config:distribution:test"), 0)
require.Error(t, err, raw)
}
}
Expand All @@ -85,27 +85,12 @@ func TestSampleIndexDeterminism(t *testing.T) {
t.Parallel()
const seed, n, count = 99, 1000, 256
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} {
a := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
b := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
a := sample(t, distribution(t, raw), rng.NewSource(seed).Rand(rng.KeyDistributionStream(0)), n, count)
b := sample(t, distribution(t, raw), rng.NewSource(seed).Rand(rng.KeyDistributionStream(0)), n, count)
require.Equal(t, a, b, "same seed must reproduce the draw sequence: %s", raw)
}
}

// TestSampleIndexSeededDiffersFromUnseeded guards the binding the way
// TestRandomGasPickerStreamSeeds does for gas: a bound sampler draws
// seed-determined values that differ from the unseeded global RNG path. If a
// refactor silently broke the binding, the seeded and unseeded sequences would
// match by accident only with probability ~0.
func TestSampleIndexSeededDiffersFromUnseeded(t *testing.T) {
t.Parallel()
const seed, n, count = 7, 1000, 128
for _, raw := range []string{`{"Name":"uniform"}`, `{"Name":"zipfian","theta":0.8}`} {
seeded := sample(t, distribution(t, raw), rng.NewSource(seed).Stream(rng.KeyDistributionStream(0)), n, count)
unseeded := sample(t, distribution(t, raw), nil, n, count)
require.NotEqual(t, seeded, unseeded, "seeded draws must differ from the unseeded global RNG: %s", raw)
}
}

// TestUniformIsUniform: a chi-square goodness-of-fit test over evenly-sized
// buckets. With B buckets and N draws the statistic should sit well under the
// upper critical value; a badly skewed "uniform" would blow far past it.
Expand All @@ -114,7 +99,7 @@ func TestUniformIsUniform(t *testing.T) {
const n, buckets, perBucket = 1000, 20, 5000
const draws = buckets * perBucket // 100k draws, expected 5k per bucket.

got := sample(t, distribution(t, `{"Name":"uniform"}`), rng.NewSource(1).Stream("x"), n, draws)
got := sample(t, distribution(t, `{"Name":"uniform"}`), rng.NewSource(1).Rand("x"), n, draws)
counts := make([]float64, buckets)
width := uint64(n / buckets)
for _, v := range got {
Expand All @@ -140,7 +125,7 @@ func TestZipfianSkewRisesWithTheta(t *testing.T) {

topKMass := func(theta float64) float64 {
raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta)
got := sample(t, distribution(t, raw), rng.NewSource(5).Stream("x"), n, draws)
got := sample(t, distribution(t, raw), rng.NewSource(5).Rand("x"), n, draws)
var hot int
for _, v := range got {
if v < topK {
Expand Down Expand Up @@ -176,17 +161,17 @@ func TestZipfianInitCostBounded(t *testing.T) {
t.Parallel()
const n = 1_000_000
d := distribution(t, `{"Name":"zipfian","theta":0.99}`)
d.SetStream(rng.NewSource(1).Stream("x"))
rand := rng.NewSource(1).Rand("x")

// Warmup outside the timer: pay the one-time O(n) zeta precompute here so the
// timed window measures only steady-state per-draw cost.
warm, err := d.SampleIndex(n)
warm, err := d.SampleIndex(rand, n)
require.NoError(t, err)
require.Less(t, warm, uint64(n))

start := time.Now()
for i := 0; i < 1000; i++ {
v, err := d.SampleIndex(n)
v, err := d.SampleIndex(rand, n)
require.NoError(t, err)
require.Less(t, v, uint64(n))
}
Expand All @@ -202,18 +187,18 @@ func TestZipfianInitCostBounded(t *testing.T) {
func TestZipfianRecomputesOnNChange(t *testing.T) {
t.Parallel()
d := distribution(t, `{"Name":"zipfian","theta":0.9}`)
d.SetStream(rng.NewSource(1).Stream("x"))
rand := rng.NewSource(1).Rand("x")

// Same seed + same draw index against two different keyspaces: if the cache
// ignored the n change, the second n would reuse the first's zetaN/eta and the
// draw could fall outside [0, n2). The in-range check is the recompute witness.
const n1, n2 = 1_000_000, 10
v1, err := d.SampleIndex(n1)
v1, err := d.SampleIndex(rand, n1)
require.NoError(t, err)
require.Less(t, v1, uint64(n1))

for i := 0; i < 1000; i++ {
v2, err := d.SampleIndex(n2)
v2, err := d.SampleIndex(rand, n2)
require.NoError(t, err)
require.Less(t, v2, uint64(n2), "draw must be in [0, n2) after n change; stale cache would overshoot")
}
Expand All @@ -228,9 +213,9 @@ func TestZipfianNoNaNAcrossThetaRange(t *testing.T) {
for _, n := range []uint64{2, 3, 100, 1000} {
raw := fmt.Sprintf(`{"Name":"zipfian","theta":%v}`, theta)
d := distribution(t, raw)
d.SetStream(rng.NewSource(1).Stream("x"))
rand := rng.NewSource(1).Rand("x")
for i := 0; i < 100; i++ {
v, err := d.SampleIndex(n)
v, err := d.SampleIndex(rand, n)
require.NoError(t, err)
// v is a uint64 index; the in-range check is the real guard that
// the internal zeta/eta math never produced a bad (NaN-derived) draw.
Expand Down
4 changes: 2 additions & 2 deletions config/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@
//
// # Seeded-stream reproducibility (FROZEN inputs)
//
// Draws go through a bound *rng.Stream (see SetStream): a per-scenario
// substream derived from the run seed. This is what gives the workload its
// Draws go through an explicitly supplied *rand.Rand: a per-scenario
// substream-derived PRNG from the run seed. This is what gives the workload its
// reproducibility contract — same seed + same config yields the same per-stream
// draw multiset (see package utils/rng for the precise contract and its limits
// above one worker).
Expand Down
33 changes: 7 additions & 26 deletions config/gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package config
import (
"encoding/json"
"fmt"
"math/rand/v2"

"github.com/sei-protocol/sei-load/utils/rng"
mrand "math/rand/v2"
)

var (
Expand All @@ -15,7 +13,7 @@ var (
)

type gasGenerator interface {
GenerateGas() (uint64, error)
GenerateGas(rng *mrand.Rand) (uint64, error)
}

type GasPicker struct {
Expand All @@ -25,23 +23,11 @@ type GasPicker struct {

func (g *GasPicker) Name() string { return g.name }

// SetStream binds the picker's random delegate to a deterministic sub-stream. A
// nil stream leaves the picker on the unseeded global RNG.
//
// Only a random delegate has anything to seed: fixed and empty pickers draw no
// randomness, so the type assertion intentionally no-ops for them rather than
// erroring.
func (g *GasPicker) SetStream(s *rng.Stream) {
if r, ok := g.delegate.(*RandomGasGenerator); ok {
r.stream = s
}
}

func (g *GasPicker) GenerateGas() (uint64, error) {
func (g *GasPicker) GenerateGas(rng *mrand.Rand) (uint64, error) {
if g.delegate == nil {
return 0, nil
}
return g.delegate.GenerateGas()
return g.delegate.GenerateGas(rng)
}

func (g *GasPicker) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -78,24 +64,19 @@ type FixedGasGenerator struct {
Gas uint64 `json:"Gas"`
}

func (f *FixedGasGenerator) GenerateGas() (uint64, error) {
func (f *FixedGasGenerator) GenerateGas(rng *mrand.Rand) (uint64, error) {
return f.Gas, nil
}

type RandomGasGenerator struct {
Min uint64 `json:"Min"`
Max uint64 `json:"Max"`

stream *rng.Stream
}

func (r *RandomGasGenerator) GenerateGas() (uint64, error) {
func (r *RandomGasGenerator) GenerateGas(rng *mrand.Rand) (uint64, error) {
if r.Min >= r.Max {
return 0, fmt.Errorf("invalid random gas range: min %d must be less than max %d", r.Min, r.Max)
}
span := r.Max - r.Min + 1
if r.stream != nil {
return r.Min + r.stream.Uint64N(span), nil
}
return r.Min + rand.Uint64N(span), nil
return r.Min + rng.Uint64N(span), nil
}
Loading
Loading