diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index b84bbf66a..0349263f8 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -45,26 +45,44 @@ jobs: name: database-sql-table path: ./database/sql/table label: database/sql/table + run_extra_args: '' + create_extra_args: '' - id: database_sql_query name: database-sql-query path: ./database/sql/query label: database/sql/query + run_extra_args: '' + create_extra_args: '' - id: native_query name: native-query path: ./native/query label: native/query + run_extra_args: '' + create_extra_args: '' - id: native_table name: native-table path: ./native/table label: native/table + run_extra_args: '' + create_extra_args: '' - id: native_table_over_query_service name: native-table-over-query-service path: ./native/table/over/query/service label: native/table/over/query/service + run_extra_args: '' + create_extra_args: '' - id: native_bulk_upsert name: native-bulk-upsert path: ./native/bulk-upsert label: native/bulk-upsert + run_extra_args: '-batch-size=10' + create_extra_args: '' + - id: native_node_hints + name: native-node-hints + path: ./native/node_hints + label: native/node_hints + run_extra_args: '-prometheus-endpoint http://172.28.0.2:9090 -batch-size=10' + create_extra_args: '-min-partitions-count 10' concurrency: group: slo-${{ github.ref }}-${{ matrix.sdk.name }} @@ -180,23 +198,25 @@ jobs: fi - name: Initialize YDB SLO - uses: ydb-platform/ydb-slo-action/init@main + uses: ydb-platform/ydb-slo-action/init@21473ae781c9bc8f16b42dedc79489c1867c6e50 with: github_issue: ${{ github.event.inputs.github_issue }} github_token: ${{ secrets.GITHUB_TOKEN }} workload_name: ${{ matrix.sdk.name }} workload_current_ref: ${{ github.head_ref || github.ref_name }} workload_baseline_ref: ${{ steps.baseline.outputs.ref }} + disable_compose_profiles: "${{ matrix.sdk.id == 'native_node_hints' && 'chaos' || '' }}" - name: Prepare SLO Database run: | echo "Preparing SLO database..." + CREATE_EXTRA_ARGS="${{ matrix.sdk.create_extra_args }}" docker run --rm --network ydb_ydb-net \ --add-host "ydb:172.28.0.11" \ --add-host "ydb:172.28.0.12" \ --add-host "ydb:172.28.0.13" \ --add-host "ydb:172.28.0.99" \ - ydb-app-current create grpc://ydb:2136 /Root/testdb + ydb-app-current create grpc://ydb:2136 /Root/testdb $CREATE_EXTRA_ARGS - name: Run SLO Tests (parallel) timeout-minutes: 15 @@ -204,11 +224,7 @@ jobs: DURATION=${{ inputs.slo_workload_duration_seconds || 600 }} READ_RPS=${{ inputs.slo_workload_read_max_rps || 1000 }} WRITE_RPS=${{ inputs.slo_workload_write_max_rps || 1000 }} - - EXTRA_ARGS="" - if [ "${{ matrix.sdk.id }}" = "native_bulk_upsert" ]; then - EXTRA_ARGS="--batch-size=10" - fi + RUN_EXTRA_ARGS="${{ matrix.sdk.run_extra_args }}" ARGS="run grpc://ydb:2136 /Root/testdb \ -otlp-endpoint prometheus:9090 \ @@ -218,7 +234,7 @@ jobs: -write-rps $WRITE_RPS \ -read-timeout 100 \ -write-timeout 100 \ - $EXTRA_ARGS" + $RUN_EXTRA_ARGS" echo "Starting ydb-app-current..." docker run -d \ diff --git a/tests/slo/go.mod b/tests/slo/go.mod index 323c080e5..71a12b8b1 100644 --- a/tests/slo/go.mod +++ b/tests/slo/go.mod @@ -5,6 +5,8 @@ go 1.24.3 toolchain go1.24.10 require ( + github.com/prometheus/client_golang v1.3.0 + github.com/prometheus/common v0.7.0 github.com/ydb-platform/gorm-driver v0.1.3 github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0 github.com/ydb-platform/ydb-go-sdk/v3 v3.67.0 diff --git a/tests/slo/go.sum b/tests/slo/go.sum index 9038ee624..1c9da9350 100644 --- a/tests/slo/go.sum +++ b/tests/slo/go.sum @@ -1905,6 +1905,7 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -1918,6 +1919,7 @@ github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= diff --git a/tests/slo/internal/config/config.go b/tests/slo/internal/config/config.go index 36648fa7d..58fb4d8c8 100644 --- a/tests/slo/internal/config/config.go +++ b/tests/slo/internal/config/config.go @@ -34,7 +34,8 @@ type Config struct { Time int ShutdownTime int - BatchSize int + BatchSize int + PrometheusEndpoint string } func New() (*Config, error) { @@ -95,6 +96,7 @@ func New() (*Config, error) { fs.StringVar(&cfg.OTLPEndpoint, "otlp-endpoint", "", "OTLP HTTP endpoint for metrics") fs.IntVar(&cfg.ReportPeriod, "report-period", 250, "metrics reporting period in milliseconds") + fs.StringVar(&cfg.PrometheusEndpoint, "prometheus-endpoint", "", "Prometheus endpoint") fs.IntVar(&cfg.ReadRPS, "read-rps", 1000, "read RPS") fs.IntVar(&cfg.WriteRPS, "write-rps", 100, "write RPS") diff --git a/tests/slo/internal/generator/generator.go b/tests/slo/internal/generator/generator.go index 85efc23fc..48d1f0652 100755 --- a/tests/slo/internal/generator/generator.go +++ b/tests/slo/internal/generator/generator.go @@ -13,18 +13,22 @@ const ( MaxLength = 40 ) -type Generator struct { +type Generator interface { + Generate() (Row, error) +} + +type Impl struct { currentID RowID mu sync.Mutex } -func New(id RowID) *Generator { - return &Generator{ +func New(id RowID) *Impl { + return &Impl{ currentID: id, } } -func (g *Generator) Generate() (Row, error) { +func (g *Impl) Generate() (Row, error) { g.mu.Lock() id := g.currentID g.currentID++ @@ -37,7 +41,7 @@ func (g *Generator) Generate() (Row, error) { } var err error - e.PayloadStr, err = g.genPayloadString() + e.PayloadStr, err = genPayloadString() if err != nil { return Row{}, err } @@ -45,7 +49,7 @@ func (g *Generator) Generate() (Row, error) { return e, nil } -func (g *Generator) genPayloadString() (*string, error) { +func genPayloadString() (*string, error) { l := MinLength + rand.Intn(MaxLength-MinLength+1) //nolint:gosec // speed more important sl := make([]byte, l) diff --git a/tests/slo/internal/generator/seeded.go b/tests/slo/internal/generator/seeded.go new file mode 100644 index 000000000..dfa9b5e59 --- /dev/null +++ b/tests/slo/internal/generator/seeded.go @@ -0,0 +1,58 @@ +package generator + +import ( + "math/rand" + "time" +) + +type Range struct { + Left uint64 + Right uint64 +} +type SeededGenerator struct { + rng *rand.Rand + setRange *Range +} + +func NewSeeded(seed int64) *SeededGenerator { + return &SeededGenerator{ + rng: rand.New(rand.NewSource(seed)), + } +} + +func (g *SeededGenerator) ConstructRow() (Row, error) { + e := Row{ + PayloadDouble: func(a float64) *float64 { return &a }(rand.Float64()), //nolint:gosec // speed more important + PayloadTimestamp: func(a time.Time) *time.Time { return &a }(time.Now()), + PayloadHash: func(a uint64) *uint64 { return &a }(rand.Uint64()), //nolint:gosec + } + + var err error + e.PayloadStr, err = genPayloadString() + if err != nil { + return Row{}, err + } + + return e, nil +} + +func (g *SeededGenerator) Generate() (Row, error) { + row, err := g.ConstructRow() + if err != nil { + return Row{}, err + } + if g.setRange == nil { + row.ID = g.rng.Uint64() + } else { + row.ID = g.setRange.Left + g.rng.Uint64()%(g.setRange.Right-g.setRange.Left) + } + + return row, nil +} + +func (g *SeededGenerator) SetRange(l uint64, r uint64) { + g.setRange = &Range{ + Left: l, + Right: r, + } +} diff --git a/tests/slo/internal/node_hints/node_hints.go b/tests/slo/internal/node_hints/node_hints.go new file mode 100644 index 000000000..65d92acf6 --- /dev/null +++ b/tests/slo/internal/node_hints/node_hints.go @@ -0,0 +1,184 @@ +package node_hints + +import ( + "context" + "fmt" + "log" + "math/rand" + "slices" + "sync/atomic" + "time" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + + "slo/internal/generator" +) + +func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (desc options.Description, err error) { + err = driver.Table().Do(ctx, + func(ctx context.Context, session table.Session) (err error) { + desc, err = session.DescribeTable(ctx, tableName, + options.WithTableStats(), + options.WithPartitionStats(), + options.WithShardKeyBounds(), + options.WithShardNodesInfo(), + ) + + return err + }, + table.WithIdempotent(), + ) + + return desc, err +} + +type NodeSelector struct { + LowerBounds []uint64 + UpperBounds []uint64 + NodeIDs []uint32 +} + +func extractKey(v types.Value, side int) (uint64, error) { + if types.IsNull(v) { + if side == LEFT { + return 0, nil + } + + return ^uint64(0), nil + } + parts, err := types.TupleItems(v) + if err != nil { + return 0, fmt.Errorf("extract tuple: %w", err) + } + + var res uint64 + if err := types.CastTo(parts[0], &res); err != nil { + return 0, fmt.Errorf("cast to uint64: %w", err) + } + + return res, nil +} + +const ( + LEFT = iota + RIGHT = iota +) + +func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string) (*NodeSelector, error) { + dsc, err := describeTable(ctx, driver, tableName) + if err != nil { + return nil, err + } + + s := NodeSelector{} + + for _, kr := range dsc.KeyRanges { + l, err := extractKey(kr.From, LEFT) + if err != nil { + return nil, err + } + s.LowerBounds = append(s.LowerBounds, l) + r, err := extractKey(kr.To, RIGHT) + if err != nil { + return nil, err + } + s.UpperBounds = append(s.UpperBounds, r) + } + + for i := range len(s.UpperBounds) - 1 { + if s.UpperBounds[i] >= s.UpperBounds[i+1] { + for _, b := range s.UpperBounds { + log.Println(b) + } + log.Fatalf("boundaries are not sorted") + } + } + + for _, ps := range dsc.Stats.PartitionStats { + s.NodeIDs = append(s.NodeIDs, ps.LeaderNodeID) + } + + return &s, nil +} + +func (ns *NodeSelector) findNodeID(key uint64) uint32 { + idx, found := slices.BinarySearch(ns.UpperBounds, key) + if found { + idx++ + } + + return ns.NodeIDs[idx] +} + +func (ns *NodeSelector) WithNodeHint(ctx context.Context, key uint64) context.Context { + if ns == nil || len(ns.NodeIDs) == 0 { + return ctx + } + + return ydb.WithPreferredNodeID(ctx, ns.findNodeID(key)) +} + +func (ns *NodeSelector) GeneratePartitionKey(partitionID uint64) uint64 { + l := ns.UpperBounds[partitionID] - ns.LowerBounds[partitionID] + + return ns.LowerBounds[partitionID] + rand.Uint64()%l +} + +func RunUpdates( + ctx context.Context, + driver *ydb.Driver, + tableName string, + frequency time.Duration, +) (*atomic.Pointer[NodeSelector], error) { + var ns atomic.Pointer[NodeSelector] + updateSelector := func() error { + selector, err := MakeNodeSelector(ctx, driver, tableName) + if err != nil { + return err + } + ns.Store(selector) + + return nil + } + + err := updateSelector() + if err != nil { + return nil, err + } + ticker := time.NewTicker(frequency) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err = updateSelector() + if err != nil { + log.Printf("node hints update error: %v\n", err) + } + } + } + }() + + return &ns, nil +} + +func (ns *NodeSelector) GetRandomNodeID(generator generator.Generator) (int, uint32) { + r, err := generator.Generate() + if err != nil { + log.Panicf("GetRandomNodeID: generator.Generate failed: %v", err) + } + shift := r.ID % uint64(len(ns.NodeIDs)) + for id, nodeID := range ns.NodeIDs { + if id == int(shift) { + return id, nodeID + } + } + log.Panicf("GetRandomNodeID: no nodeID found for shift: %d", shift) + + return 0, 0 +} diff --git a/tests/slo/internal/workers/read.go b/tests/slo/internal/workers/read.go index 99a4264a6..d62f2e272 100644 --- a/tests/slo/internal/workers/read.go +++ b/tests/slo/internal/workers/read.go @@ -33,19 +33,45 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter } } +func (w *Workers) ReadID() uint64 { + if w.Gen == nil { + return uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint: gosec + } + row, err := w.Gen.Generate() + if err != nil { + log.Panicf("generate error: %v", err) + } + + return row.ID +} + +func (w *Workers) ReadIDs() []uint64 { + ids := make([]uint64, 0, w.cfg.BatchSize) + for range w.cfg.BatchSize { + if w.Gen == nil { + ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint: gosec + } else { + row, err := w.Gen.Generate() + if err != nil { + log.Panicf("generate error: %v", err) + } + ids = append(ids, row.ID) + } + } + + return ids +} + func (w *Workers) read(ctx context.Context) error { var m metrics.Span var attempts int var err error if w.s != nil { - id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important + id := w.ReadID() m = w.m.Start(metrics.OperationTypeRead) _, attempts, err = w.s.Read(ctx, id) } else { - ids := make([]uint64, 0, w.cfg.BatchSize) - for range w.cfg.BatchSize { - ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint:gosec - } + ids := w.ReadIDs() m = w.m.Start(metrics.OperationTypeRead) _, attempts, err = w.sb.ReadBatch(ctx, ids) } diff --git a/tests/slo/internal/workers/workers.go b/tests/slo/internal/workers/workers.go index f763502b9..717495735 100644 --- a/tests/slo/internal/workers/workers.go +++ b/tests/slo/internal/workers/workers.go @@ -24,6 +24,7 @@ type Workers struct { s ReadWriter sb BatchReadWriter m *metrics.Metrics + Gen generator.Generator } func New(cfg *config.Config, s ReadWriter, ref, label, jobName string) (*Workers, error) { diff --git a/tests/slo/internal/workers/write.go b/tests/slo/internal/workers/write.go index 814904024..c50290b1e 100644 --- a/tests/slo/internal/workers/write.go +++ b/tests/slo/internal/workers/write.go @@ -11,7 +11,7 @@ import ( "slo/internal/metrics" ) -func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter, gen *generator.Generator) { +func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter, gen generator.Generator) { defer wg.Done() for { select { @@ -33,7 +33,7 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite } } -func (w *Workers) write(ctx context.Context, gen *generator.Generator) (finalErr error) { +func (w *Workers) write(ctx context.Context, gen generator.Generator) (finalErr error) { m := w.m.Start(metrics.OperationTypeWrite) var attempts int if w.s != nil { diff --git a/tests/slo/native/node_hints/dynnode_traffic.go b/tests/slo/native/node_hints/dynnode_traffic.go new file mode 100644 index 000000000..d2fc5e293 --- /dev/null +++ b/tests/slo/native/node_hints/dynnode_traffic.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + + "slo/internal/config" + "slo/internal/log" +) + +type Estimator struct { + cfg *config.Config + NodeInstances map[uint32]string + NodeRequests map[uint32]float64 + ClusterCounter float64 +} + +func getMetric(ctx context.Context, cfg *config.Config, query string) model.Vector { + client, err := api.NewClient(api.Config{ + Address: cfg.PrometheusEndpoint, + }) + if err != nil { + log.Panicf("api.NewClient failed: %v", err) + } + + v1api := v1.NewAPI(client) + + result, warnings, err := v1api.Query( + ctx, + query, + time.Now(), + ) + if err != nil { + log.Panicf("query failed: %v", err) + } + if len(warnings) > 0 { + fmt.Println("Warnings: ", warnings) + } + vector, ok := result.(model.Vector) + if !ok || len(vector) == 0 { + log.Panicf("no data found for query: %s", query) + } + + return vector +} + +func getMetricValue(ctx context.Context, cfg *config.Config, query string) float64 { + vector := getMetric(ctx, cfg, query) + + return float64(vector[0].Value) +} + +func formatNodeID(v model.LabelValue) uint32 { + i64, err := strconv.ParseUint(string(v), 10, 32) + if err != nil { + log.Panicf("formatNodeID failed: %v", err) + } + + return uint32(i64) +} + +func NewEstimator(ctx context.Context, storage *Storage) *Estimator { + e := &Estimator{ + cfg: storage.cfg, + } + vec := getMetric(ctx, e.cfg, `Traffic{}`) + allNodeIDs := make(map[uint32]bool) + instanceID := make(map[string]map[uint32]bool) + nodeInstance := make(map[uint32]string) + // get all node ids + for _, v := range vec { + allNodeIDs[formatNodeID(v.Metric["peer_node_id"])] = true + } + // for target instance, the only absent node id is correct + for _, v := range vec { + instance := string(v.Metric["instance"]) + instanceID[instance] = make(map[uint32]bool) + for nodeID := range allNodeIDs { + instanceID[instance][nodeID] = true + } + } + for _, v := range vec { + instance := string(v.Metric["instance"]) + instanceID[instance][formatNodeID(v.Metric["peer_node_id"])] = false + } + // backwards mapping + for instance, nodeIDs := range instanceID { + if strings.Contains(instance, "storage") { + continue + } + for k, v := range nodeIDs { + if v { + nodeInstance[k] = instance + } + } + } + e.NodeInstances = nodeInstance + e.NodeRequests = make(map[uint32]float64) + // collect counters + for nodeID := range e.NodeInstances { + e.NodeRequests[nodeID] = e.NodeRWCounter(ctx, nodeID) + } + e.ClusterCounter = e.ClusterRWCounter(ctx) + + return e +} + +func (e *Estimator) NodeGrpcAPICounter(ctx context.Context, method string, nodeID uint32) float64 { + instance, ok := e.NodeInstances[nodeID] + if !ok { + log.Panicf("no instance found for nodeID: %d", nodeID) + } + + return getMetricValue(ctx, e.cfg, fmt.Sprintf(`api_grpc_request_count{instance="%s",method="%s"}`, instance, method)) +} + +func (e *Estimator) ClusterGrpcAPICounter(ctx context.Context, method string) float64 { + return getMetricValue(ctx, e.cfg, fmt.Sprintf(`sum(api_grpc_request_count{method="%s"})`, method)) +} + +func (e *Estimator) NodeRWCounter(ctx context.Context, nodeID uint32) float64 { + return e.NodeGrpcAPICounter(ctx, "ReadRows", nodeID) + e.NodeGrpcAPICounter(ctx, "BulkUpsert", nodeID) +} + +func (e *Estimator) ClusterRWCounter(ctx context.Context) float64 { + return e.ClusterGrpcAPICounter(ctx, "ReadRows") + e.ClusterGrpcAPICounter(ctx, "BulkUpsert") +} + +func (e *Estimator) OnlyThisNode(ctx context.Context, nodeID uint32) { + clusterNow := e.ClusterRWCounter(ctx) + nodeNow := e.NodeRWCounter(ctx, nodeID) + if clusterNow-e.ClusterCounter > nodeNow-e.NodeRequests[nodeID] { + log.Panicf("requests were served by other nodes: cluster %f -> %f, node %d %f -> %f", + e.ClusterCounter, clusterNow, + nodeID, + e.NodeRequests[nodeID], nodeNow, + ) + } +} diff --git a/tests/slo/native/node_hints/main.go b/tests/slo/native/node_hints/main.go new file mode 100644 index 000000000..a875a3519 --- /dev/null +++ b/tests/slo/native/node_hints/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "context" + "fmt" + "os/signal" + "strconv" + "sync" + "syscall" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + + "slo/internal/config" + "slo/internal/generator" + "slo/internal/log" + "slo/internal/workers" +) + +var ( + ref string + label string + jobName = "slo_native_bulk_upsert" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + defer cancel() + + cfg, err := config.New() + if err != nil { + panic(fmt.Errorf("create config failed: %w", err)) + } + + log.Println("program started") + defer log.Println("program finished") + + ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) + defer cancel() + + go func() { + <-ctx.Done() + log.Println("exiting...") + }() + + // pool size similar to query variant + s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, "no_session") + if err != nil { + panic(fmt.Errorf("create storage failed: %w", err)) + } + defer func() { + var ( + shutdownCtx context.Context + shutdownCancel context.CancelFunc + ) + if cfg.ShutdownTime > 0 { + shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), + time.Duration(cfg.ShutdownTime)*time.Second) + } else { + shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) + } + defer shutdownCancel() + + _ = s.Close(shutdownCtx) + }() + + log.Println("db init ok") + gen := generator.NewSeeded(120394832798) + + switch cfg.Mode { + case config.CreateMode: + err = s.CreateTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + log.Println("create table ok") + + g := errgroup.Group{} + + for i := uint64(0); i < cfg.InitialDataCount; i++ { + g.Go(func() (err error) { + e, err := gen.Generate() + if err != nil { + return err + } + + _, err = s.WriteBatch(ctx, []generator.Row{e}) + if err != nil { + return err + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + panic(err) + } + + log.Println("entries write ok") + case config.CleanupMode: + err = s.DropTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + + log.Println("cleanup table ok") + case config.RunMode: + // to wait for correct partitions boundaries + time.Sleep(10 * time.Second) + w, err := workers.NewWithBatch(cfg, s, ref, label, jobName) + if err != nil { + panic(fmt.Errorf("create workers failed: %w", err)) + } + ns := s.nodeSelector.Load() + idx, nodeID := ns.GetRandomNodeID(gen) + log.Println("all requests to node id: ", nodeID) + gen.SetRange(ns.LowerBounds[idx], ns.UpperBounds[idx]) + w.Gen = gen + defer func() { + err := w.Close() + if err != nil { + panic(fmt.Errorf("workers close failed: %w", err)) + } + log.Println("workers close ok") + }() + + // collect metrics + estimator := NewEstimator(ctx, s) + // run workers + wg := sync.WaitGroup{} + readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1) + wg.Add(cfg.ReadRPS) + for i := 0; i < cfg.ReadRPS; i++ { + go w.Read(ctx, &wg, readRL) + } + log.Println("started " + strconv.Itoa(cfg.ReadRPS) + " read workers") + + writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1) + wg.Add(cfg.WriteRPS) + for i := 0; i < cfg.WriteRPS; i++ { + go w.Write(ctx, &wg, writeRL, gen) + } + log.Println("started " + strconv.Itoa(cfg.WriteRPS) + " write workers") + + metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1) + wg.Add(1) + go w.Metrics(ctx, &wg, metricsRL) + + wg.Wait() + w.FailOnError() + // check all load is sent to a single node + ectx, ecancel := context.WithTimeout(context.Background(), 1*time.Second) + defer ecancel() + estimator.OnlyThisNode(ectx, nodeID) + + default: + panic(fmt.Errorf("unknown mode: %v", cfg.Mode)) + } +} diff --git a/tests/slo/native/node_hints/storage.go b/tests/slo/native/node_hints/storage.go new file mode 100644 index 000000000..1d240a5e5 --- /dev/null +++ b/tests/slo/native/node_hints/storage.go @@ -0,0 +1,278 @@ +package main + +import ( + "context" + "fmt" + "path" + "sync/atomic" + "time" + + ydb "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" + + "slo/internal/config" + "slo/internal/generator" + "slo/internal/node_hints" +) + +const createTableQuery = ` +CREATE TABLE IF NOT EXISTS` + " `%s` " + `( + id Uint64?, + payload_str Text?, + payload_double Double?, + payload_timestamp Timestamp?, + payload_hash Uint64?, + PRIMARY KEY (id) +) WITH ( + UNIFORM_PARTITIONS = %d, + AUTO_PARTITIONING_BY_SIZE = DISABLED, + AUTO_PARTITIONING_BY_LOAD = DISABLED +) +` + +const dropTableQuery = "DROP TABLE IF EXISTS `%s`;" + +type Storage struct { + db *ydb.Driver + cfg *config.Config + tablePath string + nodeSelector *atomic.Pointer[node_hints.NodeSelector] + retryBudget interface { + budget.Budget + Stop() + } +} + +func NewStorage(ctx context.Context, cfg *config.Config, poolSize int, label string) (*Storage, error) { + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:mnd + defer cancel() + + retryBudget := budget.Limited(int(float64(poolSize) * 0.1)) //nolint:mnd + + db, err := ydb.Open(ctx, + cfg.Endpoint+cfg.DB, + ydb.WithSessionPoolSizeLimit(poolSize), + ydb.WithRetryBudget(retryBudget), + ydb.WithInsecure(), + ydb.WithAnonymousCredentials(), + ydb.WithTLSSInsecureSkipVerify(), + ) + if err != nil { + return nil, err + } + + prefix := path.Join(db.Name(), label) + + tablePath := path.Join(prefix, cfg.Table) + + var nsPtr *atomic.Pointer[node_hints.NodeSelector] + if cfg.Mode == config.RunMode { + nsPtr, err = node_hints.RunUpdates(ctx, db, tablePath, time.Second*5) + if err != nil { + return nil, fmt.Errorf("create node selector: %w", err) + } + } + + s := &Storage{ + db: db, + cfg: cfg, + tablePath: path.Join(prefix, cfg.Table), + retryBudget: retryBudget, + nodeSelector: nsPtr, + } + + return s, nil +} + +func (s *Storage) WriteBatch(ctx context.Context, e []generator.Row) (attempts int, finalErr error) { + if err := ctx.Err(); err != nil { + return attempts, err + } + + rows := make([]types.Value, 0, len(e)) + + for _, row := range e { + rows = append(rows, types.StructValue( + types.StructFieldValue("id", types.Uint64Value(row.ID)), + types.StructFieldValue("payload_str", types.OptionalValue(types.TextValue(*row.PayloadStr))), + types.StructFieldValue("payload_double", types.OptionalValue(types.DoubleValue(*row.PayloadDouble))), + types.StructFieldValue( + "payload_timestamp", + types.OptionalValue(types.TimestampValue(uint64(row.PayloadTimestamp.UnixMicro()))), + ), + types.StructFieldValue("payload_hash", types.OptionalValue(types.Uint64Value(*row.PayloadHash))), + )) + } + + t := &trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + } + + var reqCtx context.Context + if s.nodeSelector != nil { + reqCtx = s.nodeSelector.Load().WithNodeHint(ctx, e[0].ID) + } else { + reqCtx = ctx + } + reqCtx, cancel := context.WithTimeout(reqCtx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + err := s.db.Table().BulkUpsert( + reqCtx, + s.tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), + table.WithRetryOptions([]retry.Option{ //nolint:staticcheck + retry.WithTrace(t), + }), + table.WithIdempotent(), + table.WithLabel("WRITE"), + ) + + return attempts, err +} + +func (s *Storage) ReadBatch(ctx context.Context, rowIDs []generator.RowID) ( + _ []generator.Row, + attempts int, + finalErr error, +) { + if err := ctx.Err(); err != nil { + return []generator.Row{}, attempts, err + } + + t := &trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + } + + keys := make([]types.Value, 0, len(rowIDs)) + for _, rowID := range rowIDs { + key := types.StructValue( + types.StructFieldValue("id", types.Uint64Value(rowID)), + ) + keys = append(keys, key) + } + + var reqCtx context.Context + if s.nodeSelector != nil { + reqCtx = s.nodeSelector.Load().WithNodeHint(ctx, rowIDs[0]) + } else { + reqCtx = ctx + } + reqCtx, cancel := context.WithTimeout(reqCtx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond) + defer cancel() + + res, err := s.db.Table().ReadRows(reqCtx, s.tablePath, types.ListValue(keys...), []options.ReadRowsOption{}, + table.WithRetryOptions([]retry.Option{ //nolint:staticcheck + retry.WithTrace(t), + }), + table.WithIdempotent(), + table.WithLabel("READ"), + ) + if err != nil { + return nil, attempts, err + } + defer func() { + _ = res.Close() + }() + + readRows := make([]generator.Row, 0, len(rowIDs)) + + for res.NextResultSet(ctx) { + if err = res.Err(); err != nil { + return nil, attempts, err + } + + if res.CurrentResultSet().Truncated() { + return nil, attempts, fmt.Errorf("read rows result set truncated") + } + + for res.NextRow() { + readRow := generator.Row{} + scans := []named.Value{ + named.Required("id", &readRow.ID), + named.Optional("payload_str", &readRow.PayloadStr), + named.Optional("payload_double", &readRow.PayloadDouble), + named.Optional("payload_timestamp", &readRow.PayloadTimestamp), + named.Optional("payload_hash", &readRow.PayloadHash), + } + + err = res.ScanNamed(scans...) + if err != nil { + return nil, attempts, err + } + + readRows = append(readRows, readRow) + } + } + + return readRows, attempts, nil +} + +func (s *Storage) CreateTable(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + return s.db.Query().Do(ctx, + func(ctx context.Context, session query.Session) error { + fmt.Println(fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount)) + + return session.Exec(ctx, + fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount), + query.WithTxControl(query.EmptyTxControl()), + ) + }, query.WithIdempotent(), + query.WithLabel("CREATE TABLE"), + ) +} + +func (s *Storage) DropTable(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + return s.db.Query().Do(ctx, + func(ctx context.Context, session query.Session) error { + return session.Exec(ctx, + fmt.Sprintf(dropTableQuery, s.tablePath), + query.WithTxControl(query.EmptyTxControl()), + ) + }, + query.WithIdempotent(), + query.WithLabel("DROP TABLE"), + ) +} + +func (s *Storage) Close(ctx context.Context) error { + s.retryBudget.Stop() + + var ( + shutdownCtx context.Context + shutdownCancel context.CancelFunc + ) + if s.cfg.ShutdownTime > 0 { + shutdownCtx, shutdownCancel = context.WithTimeout(ctx, time.Duration(s.cfg.ShutdownTime)*time.Second) + } else { + shutdownCtx, shutdownCancel = context.WithCancel(ctx) + } + defer shutdownCancel() + + return s.db.Close(shutdownCtx) +}