From 361353e8fd82ef5205af5b272b27e9dae1872463 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Thu, 4 Dec 2025 15:22:50 +0100 Subject: [PATCH 1/3] node hinting workload --- tests/slo/go.mod | 2 + tests/slo/go.sum | 2 + tests/slo/internal/config/config.go | 4 +- tests/slo/internal/generator/generator.go | 16 +- tests/slo/internal/generator/seeded.go | 57 ++++ tests/slo/internal/node_hints/node_hints.go | 171 +++++++++++ tests/slo/internal/workers/read.go | 34 ++- tests/slo/internal/workers/workers.go | 1 + tests/slo/internal/workers/write.go | 4 +- .../slo/native/node_hints/dynnode_traffic.go | 137 +++++++++ tests/slo/native/node_hints/main.go | 161 ++++++++++ tests/slo/native/node_hints/storage.go | 277 ++++++++++++++++++ 12 files changed, 852 insertions(+), 14 deletions(-) create mode 100644 tests/slo/internal/generator/seeded.go create mode 100644 tests/slo/internal/node_hints/node_hints.go create mode 100644 tests/slo/native/node_hints/dynnode_traffic.go create mode 100644 tests/slo/native/node_hints/main.go create mode 100644 tests/slo/native/node_hints/storage.go diff --git a/tests/slo/go.mod b/tests/slo/go.mod index 323c080e5..71a12b8b1 100644 --- a/tests/slo/go.mod +++ b/tests/slo/go.mod @@ -5,6 +5,8 @@ go 1.24.3 toolchain go1.24.10 require ( + github.com/prometheus/client_golang v1.3.0 + github.com/prometheus/common v0.7.0 github.com/ydb-platform/gorm-driver v0.1.3 github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0 github.com/ydb-platform/ydb-go-sdk/v3 v3.67.0 diff --git a/tests/slo/go.sum b/tests/slo/go.sum index 9038ee624..1c9da9350 100644 --- a/tests/slo/go.sum +++ b/tests/slo/go.sum @@ -1905,6 +1905,7 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -1918,6 +1919,7 @@ github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= diff --git a/tests/slo/internal/config/config.go b/tests/slo/internal/config/config.go index 36648fa7d..58fb4d8c8 100644 --- a/tests/slo/internal/config/config.go +++ b/tests/slo/internal/config/config.go @@ -34,7 +34,8 @@ type Config struct { Time int ShutdownTime int - BatchSize int + BatchSize int + PrometheusEndpoint string } func New() (*Config, error) { @@ -95,6 +96,7 @@ func New() (*Config, error) { fs.StringVar(&cfg.OTLPEndpoint, "otlp-endpoint", "", "OTLP HTTP endpoint for metrics") fs.IntVar(&cfg.ReportPeriod, "report-period", 250, "metrics reporting period in milliseconds") + fs.StringVar(&cfg.PrometheusEndpoint, "prometheus-endpoint", "", "Prometheus endpoint") fs.IntVar(&cfg.ReadRPS, "read-rps", 1000, "read RPS") fs.IntVar(&cfg.WriteRPS, "write-rps", 100, "write RPS") diff --git a/tests/slo/internal/generator/generator.go b/tests/slo/internal/generator/generator.go index 85efc23fc..d60cc4d3b 100755 --- a/tests/slo/internal/generator/generator.go +++ b/tests/slo/internal/generator/generator.go @@ -13,18 +13,22 @@ const ( MaxLength = 40 ) -type Generator struct { +type Generator interface { + Generate() (Row, error) +} + +type GeneratorImpl struct { currentID RowID mu sync.Mutex } -func New(id RowID) *Generator { - return &Generator{ +func New(id RowID) *GeneratorImpl { + return &GeneratorImpl{ currentID: id, } } -func (g *Generator) Generate() (Row, error) { +func (g *GeneratorImpl) Generate() (Row, error) { g.mu.Lock() id := g.currentID g.currentID++ @@ -37,7 +41,7 @@ func (g *Generator) Generate() (Row, error) { } var err error - e.PayloadStr, err = g.genPayloadString() + e.PayloadStr, err = genPayloadString() if err != nil { return Row{}, err } @@ -45,7 +49,7 @@ func (g *Generator) Generate() (Row, error) { return e, nil } -func (g *Generator) genPayloadString() (*string, error) { +func genPayloadString() (*string, error) { l := MinLength + rand.Intn(MaxLength-MinLength+1) //nolint:gosec // speed more important sl := make([]byte, l) diff --git a/tests/slo/internal/generator/seeded.go b/tests/slo/internal/generator/seeded.go new file mode 100644 index 000000000..c7a0dff14 --- /dev/null +++ b/tests/slo/internal/generator/seeded.go @@ -0,0 +1,57 @@ +package generator + +import ( + "math/rand" + "time" +) + +type Range struct { + Left uint64 + Right uint64 +} +type SeededGenerator struct { + rng *rand.Rand + setRange *Range +} + +func NewSeeded(seed int64) *SeededGenerator { + return &SeededGenerator{ + rng: rand.New(rand.NewSource(seed)), + } +} + +func (g *SeededGenerator) ConstructRow() (Row, error) { + e := Row{ + PayloadDouble: func(a float64) *float64 { return &a }(rand.Float64()), //nolint:gosec // speed more important + PayloadTimestamp: func(a time.Time) *time.Time { return &a }(time.Now()), + PayloadHash: func(a uint64) *uint64 { return &a }(rand.Uint64()), //nolint:gosec + } + + var err error + e.PayloadStr, err = genPayloadString() + if err != nil { + return Row{}, err + } + + return e, nil +} + +func (g *SeededGenerator) Generate() (Row, error) { + row, err := g.ConstructRow() + if err != nil { + return Row{}, err + } + if g.setRange == nil { + row.ID = g.rng.Uint64() + } else { + row.ID = g.setRange.Left + g.rng.Uint64()%(g.setRange.Right-g.setRange.Left) + } + return row, nil +} + +func (g *SeededGenerator) SetRange(l uint64, r uint64) { + g.setRange = &Range{ + Left: l, + Right: r, + } +} \ No newline at end of file diff --git a/tests/slo/internal/node_hints/node_hints.go b/tests/slo/internal/node_hints/node_hints.go new file mode 100644 index 000000000..dba13c83a --- /dev/null +++ b/tests/slo/internal/node_hints/node_hints.go @@ -0,0 +1,171 @@ +package node_hints + +import ( + "context" + "fmt" + "log" + "math/rand" + "slices" + "slo/internal/generator" + "sync/atomic" + "time" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" +) + +func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (desc options.Description, err error) { + err = driver.Table().Do(ctx, + func(ctx context.Context, session table.Session) (err error) { + desc, err = session.DescribeTable(ctx, tableName, + options.WithTableStats(), + options.WithPartitionStats(), + options.WithShardKeyBounds(), + options.WithShardNodesInfo(), + ) + return err + }, + table.WithIdempotent(), + ) + return desc, err +} + +type NodeSelector struct { + LowerBounds []uint64 + UpperBounds []uint64 + NodeIDs []uint32 +} + +func extractKey(v types.Value, side int) (uint64, error) { + if types.IsNull(v) { + if side == LEFT { + return 0, nil + } else { + return ^uint64(0), nil + } + } + parts, err := types.TupleItems(v) + if err != nil { + return 0, fmt.Errorf("extract tuple: %w", err) + } + + var res uint64 + if err := types.CastTo(parts[0], &res); err != nil { + return 0, fmt.Errorf("cast to uint64: %w", err) + } + + return res, nil +} + +const ( + LEFT = iota + RIGHT = iota +) + +func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string) (*NodeSelector, error) { + dsc, err := describeTable(ctx, driver, tableName) + + if err != nil { + return nil, err + } + + s := NodeSelector{} + + for _, kr := range dsc.KeyRanges { + l, err := extractKey(kr.From, LEFT) + if err != nil { + return nil, err + } + s.LowerBounds = append(s.LowerBounds, l) + r, err := extractKey(kr.To, RIGHT) + if err != nil { + return nil, err + } + s.UpperBounds = append(s.UpperBounds, r) + } + + for i := range len(s.UpperBounds) - 1 { + if s.UpperBounds[i] >= s.UpperBounds[i+1] { + for _, b := range s.UpperBounds { + log.Println(b) + } + log.Fatalf("boundaries are not sorted") + } + } + + for _, ps := range dsc.Stats.PartitionStats { + s.NodeIDs = append(s.NodeIDs, ps.LeaderNodeID) + } + return &s, nil +} + +func (s *NodeSelector) findNodeID(key uint64) uint32 { + idx, found := slices.BinarySearch(s.UpperBounds, key) + if found { + idx++ + } + return s.NodeIDs[idx] +} + +func (s *NodeSelector) WithNodeHint(ctx context.Context, key uint64) context.Context { + if s == nil || len(s.NodeIDs) == 0 { + return ctx + } + return ydb.WithPreferredNodeID(ctx, s.findNodeID(key)) +} + +func (s *NodeSelector) GeneratePartitionKey(partitionId uint64) uint64 { + l := s.UpperBounds[partitionId] - s.LowerBounds[partitionId] + return s.LowerBounds[partitionId] + rand.Uint64()%l +} + +func RunUpdates(ctx context.Context, driver *ydb.Driver, tableName string, frequency time.Duration) (*atomic.Pointer[NodeSelector], error) { + var ns atomic.Pointer[NodeSelector] + updateSelector := func() error { + selector, err := MakeNodeSelector(ctx, driver, tableName) + if err != nil { + return err + } + ns.Store(selector) + return nil + } + + err := updateSelector() + if err != nil { + return nil, err + } else { + ticker := time.NewTicker(frequency) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err = updateSelector() + if err != nil { + log.Printf("node hints update error: %v\n", err) + } + } + } + }() + } + return &ns, nil +} + +func (ns *NodeSelector) GetRandomNodeID(generator generator.Generator) (int, uint32) { + r, err := generator.Generate() + if err != nil { + log.Panicf("GetRandomNodeID: generator.Generate failed: %v", err) + } + shift := r.ID % uint64(len(ns.NodeIDs)) + for id, nodeID := range ns.NodeIDs { + if id == int(shift) { + return id, nodeID + } + } + log.Panicf("GetRandomNodeID: no nodeID found for shift: %d", shift) + return 0, 0 +} diff --git a/tests/slo/internal/workers/read.go b/tests/slo/internal/workers/read.go index 99a4264a6..743839e1a 100644 --- a/tests/slo/internal/workers/read.go +++ b/tests/slo/internal/workers/read.go @@ -33,19 +33,43 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter } } +func (w *Workers) ReadID() uint64 { + if w.Gen == nil { + return uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint: gosec + } + row, err := w.Gen.Generate() + if err != nil { + log.Panicf("generate error: %v", err) + } + return row.ID +} + +func (w *Workers) ReadIDs() []uint64 { + ids := make([]uint64, 0, w.cfg.BatchSize) + for range w.cfg.BatchSize { + if w.Gen == nil { + ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint: gosec + } else { + row, err := w.Gen.Generate() + if err != nil { + log.Panicf("generate error: %v", err) + } + ids = append(ids, row.ID) + } + } + return ids +} + func (w *Workers) read(ctx context.Context) error { var m metrics.Span var attempts int var err error if w.s != nil { - id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important + id := w.ReadID() m = w.m.Start(metrics.OperationTypeRead) _, attempts, err = w.s.Read(ctx, id) } else { - ids := make([]uint64, 0, w.cfg.BatchSize) - for range w.cfg.BatchSize { - ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint:gosec - } + ids := w.ReadIDs() m = w.m.Start(metrics.OperationTypeRead) _, attempts, err = w.sb.ReadBatch(ctx, ids) } diff --git a/tests/slo/internal/workers/workers.go b/tests/slo/internal/workers/workers.go index f763502b9..717495735 100644 --- a/tests/slo/internal/workers/workers.go +++ b/tests/slo/internal/workers/workers.go @@ -24,6 +24,7 @@ type Workers struct { s ReadWriter sb BatchReadWriter m *metrics.Metrics + Gen generator.Generator } func New(cfg *config.Config, s ReadWriter, ref, label, jobName string) (*Workers, error) { diff --git a/tests/slo/internal/workers/write.go b/tests/slo/internal/workers/write.go index 814904024..c50290b1e 100644 --- a/tests/slo/internal/workers/write.go +++ b/tests/slo/internal/workers/write.go @@ -11,7 +11,7 @@ import ( "slo/internal/metrics" ) -func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter, gen *generator.Generator) { +func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter, gen generator.Generator) { defer wg.Done() for { select { @@ -33,7 +33,7 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite } } -func (w *Workers) write(ctx context.Context, gen *generator.Generator) (finalErr error) { +func (w *Workers) write(ctx context.Context, gen generator.Generator) (finalErr error) { m := w.m.Start(metrics.OperationTypeWrite) var attempts int if w.s != nil { diff --git a/tests/slo/native/node_hints/dynnode_traffic.go b/tests/slo/native/node_hints/dynnode_traffic.go new file mode 100644 index 000000000..88b86be6b --- /dev/null +++ b/tests/slo/native/node_hints/dynnode_traffic.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "fmt" + "slo/internal/config" + "slo/internal/log" + "strconv" + "strings" + "time" + + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" +) + +type Estimator struct { + cfg *config.Config + NodeInstances map[uint32]string + NodeRequests map[uint32]float64 + ClusterCounter float64 +} + +func getMetric(ctx context.Context, cfg *config.Config, query string) model.Vector { + client, err := api.NewClient(api.Config{ + Address: cfg.PrometheusEndpoint, + }) + if err != nil { + log.Panicf("api.NewClient failed: %v", err) + } + + v1api := v1.NewAPI(client) + + result, warnings, err := v1api.Query( + ctx, + query, + time.Now(), + ) + if len(warnings) > 0 { + fmt.Println("Warnings: ", warnings) + } + vector, ok := result.(model.Vector) + if !ok || len(vector) == 0 { + log.Panicf("no data found for query: %s", query) + } + return vector +} + +func getMetricValue(ctx context.Context, cfg *config.Config, query string) float64 { + vector := getMetric(ctx, cfg, query) + return float64(vector[0].Value) +} + +func formatNodeID(v model.LabelValue) uint32 { + id, err := strconv.Atoi(string(v)) + if err != nil { + log.Panicf("formatNodeID failed: %v", err) + } + return uint32(id) +} + +func NewEstimator(ctx context.Context, storage *Storage) *Estimator { + e := &Estimator{ + cfg: storage.cfg, + } + vec := getMetric(ctx, e.cfg, `Traffic{}`) + allNodeIDs := make(map[uint32]bool) + instanceID := make(map[string]map[uint32]bool) + nodeInstance := make(map[uint32]string) + //get all node ids + for _, v := range vec { + allNodeIDs[formatNodeID(v.Metric["peer_node_id"])] = true + } + //for target instance, the only absent node id is correct + for _, v := range vec { + instance := string(v.Metric["instance"]) + instanceID[instance] = make(map[uint32]bool) + for nodeID, _ := range allNodeIDs { + instanceID[instance][nodeID] = true + } + } + for _, v := range vec { + instance := string(v.Metric["instance"]) + instanceID[instance][formatNodeID(v.Metric["peer_node_id"])] = false + } + //backwards mapping + for instance, nodeIDs := range instanceID { + if strings.Contains(instance, "storage") { + continue + } + for k, v := range nodeIDs { + if v { + nodeInstance[k] = instance + } + } + } + e.NodeInstances = nodeInstance + e.NodeRequests = make(map[uint32]float64) + //collect counters + for nodeID, _ := range e.NodeInstances { + e.NodeRequests[nodeID] = e.NodeRWCounter(ctx, nodeID) + } + e.ClusterCounter = e.ClusterRWCounter(ctx) + return e +} + +func (e *Estimator) NodeGrpcApiCounter(ctx context.Context, method string, nodeID uint32) float64 { + instance, ok := e.NodeInstances[nodeID] + if !ok { + log.Panicf("no instance found for nodeID: %d", nodeID) + } + return getMetricValue(ctx, e.cfg, fmt.Sprintf(`api_grpc_request_count{instance="%s",method="%s"}`, instance, method)) +} + +func (e *Estimator) ClusterGrpcApiCounter(ctx context.Context, method string) float64 { + return getMetricValue(ctx, e.cfg, fmt.Sprintf(`sum(api_grpc_request_count{method="%s"})`, method)) +} + +func (e *Estimator) NodeRWCounter(ctx context.Context, nodeID uint32) float64 { + return e.NodeGrpcApiCounter(ctx, "ReadRows", nodeID) + e.NodeGrpcApiCounter(ctx, "BulkUpsert", nodeID) +} + +func (e *Estimator) ClusterRWCounter(ctx context.Context) float64 { + return e.ClusterGrpcApiCounter(ctx, "ReadRows") + e.ClusterGrpcApiCounter(ctx, "BulkUpsert") +} + +func (e *Estimator) OnlyThisNode(ctx context.Context, nodeID uint32) { + clusterNow := e.ClusterRWCounter(ctx) + nodeNow := e.NodeRWCounter(ctx, nodeID) + if clusterNow-e.ClusterCounter > nodeNow-e.NodeRequests[nodeID] { + log.Panicf("requests were served by other nodes: cluster %f -> %f, node %d %f -> %f", + e.ClusterCounter, clusterNow, + nodeID, + e.NodeRequests[nodeID], nodeNow, + ) + } +} diff --git a/tests/slo/native/node_hints/main.go b/tests/slo/native/node_hints/main.go new file mode 100644 index 000000000..f58642ebb --- /dev/null +++ b/tests/slo/native/node_hints/main.go @@ -0,0 +1,161 @@ +package main + +import ( + "context" + "fmt" + "os/signal" + "strconv" + "sync" + "syscall" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + + "slo/internal/config" + "slo/internal/generator" + "slo/internal/log" + "slo/internal/workers" +) + +var ( + ref string + label string + jobName = "slo_native_bulk_upsert" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + defer cancel() + + cfg, err := config.New() + if err != nil { + panic(fmt.Errorf("create config failed: %w", err)) + } + + log.Println("program started") + defer log.Println("program finished") + + ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) + defer cancel() + + go func() { + <-ctx.Done() + log.Println("exiting...") + }() + + // pool size similar to query variant + s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, "no_session") + if err != nil { + panic(fmt.Errorf("create storage failed: %w", err)) + } + defer func() { + var ( + shutdownCtx context.Context + shutdownCancel context.CancelFunc + ) + if cfg.ShutdownTime > 0 { + shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), + time.Duration(cfg.ShutdownTime)*time.Second) + } else { + shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) + } + defer shutdownCancel() + + _ = s.Close(shutdownCtx) + }() + + log.Println("db init ok") + gen := generator.NewSeeded(120394832798) + + switch cfg.Mode { + case config.CreateMode: + err = s.CreateTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + log.Println("create table ok") + + g := errgroup.Group{} + + for i := uint64(0); i < cfg.InitialDataCount; i++ { + g.Go(func() (err error) { + e, err := gen.Generate() + if err != nil { + return err + } + + _, err = s.WriteBatch(ctx, []generator.Row{e}) + if err != nil { + return err + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + panic(err) + } + + log.Println("entries write ok") + case config.CleanupMode: + err = s.DropTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + + log.Println("cleanup table ok") + case config.RunMode: + //to wait for correct partitions boundaries + time.Sleep(10 * time.Second) + w, err := workers.NewWithBatch(cfg, s, ref, label, jobName) + if err != nil { + panic(fmt.Errorf("create workers failed: %w", err)) + } + ns := s.nodeSelector.Load() + idx, nodeID := ns.GetRandomNodeID(gen) + log.Println("all requests to node id: ", nodeID) + gen.SetRange(ns.LowerBounds[idx], ns.UpperBounds[idx]) + w.Gen = gen + defer func() { + err := w.Close() + if err != nil { + panic(fmt.Errorf("workers close failed: %w", err)) + } + log.Println("workers close ok") + }() + + //collect metrics + estimator := NewEstimator(ctx, s) + //run workers + wg := sync.WaitGroup{} + readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1) + wg.Add(cfg.ReadRPS) + for i := 0; i < cfg.ReadRPS; i++ { + go w.Read(ctx, &wg, readRL) + } + log.Println("started " + strconv.Itoa(cfg.ReadRPS) + " read workers") + + writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1) + wg.Add(cfg.WriteRPS) + for i := 0; i < cfg.WriteRPS; i++ { + go w.Write(ctx, &wg, writeRL, gen) + } + log.Println("started " + strconv.Itoa(cfg.WriteRPS) + " write workers") + + metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1) + wg.Add(1) + go w.Metrics(ctx, &wg, metricsRL) + + wg.Wait() + //check all load is sent to a single node + ectx, ecancel := context.WithTimeout(context.Background(), 1*time.Second) + defer ecancel() + estimator.OnlyThisNode(ectx, nodeID) + + default: + panic(fmt.Errorf("unknown mode: %v", cfg.Mode)) + } +} diff --git a/tests/slo/native/node_hints/storage.go b/tests/slo/native/node_hints/storage.go new file mode 100644 index 000000000..46706ae68 --- /dev/null +++ b/tests/slo/native/node_hints/storage.go @@ -0,0 +1,277 @@ +package main + +import ( + "context" + "fmt" + "path" + "slo/internal/node_hints" + "sync/atomic" + "time" + + ydb "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/retry/budget" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" + + "slo/internal/config" + "slo/internal/generator" +) + +const createTableQuery = ` +CREATE TABLE IF NOT EXISTS` + " `%s` " + `( + id Uint64?, + payload_str Text?, + payload_double Double?, + payload_timestamp Timestamp?, + payload_hash Uint64?, + PRIMARY KEY (id) +) WITH ( + UNIFORM_PARTITIONS = %d, + AUTO_PARTITIONING_BY_SIZE = DISABLED, + AUTO_PARTITIONING_BY_LOAD = DISABLED +) +` + +const dropTableQuery = "DROP TABLE IF EXISTS `%s`;" + +type Storage struct { + db *ydb.Driver + cfg *config.Config + tablePath string + nodeSelector *atomic.Pointer[node_hints.NodeSelector] + retryBudget interface { + budget.Budget + Stop() + } +} + +func NewStorage(ctx context.Context, cfg *config.Config, poolSize int, label string) (*Storage, error) { + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:mnd + defer cancel() + + retryBudget := budget.Limited(int(float64(poolSize) * 0.1)) //nolint:mnd + + db, err := ydb.Open(ctx, + cfg.Endpoint+cfg.DB, + ydb.WithSessionPoolSizeLimit(poolSize), + ydb.WithRetryBudget(retryBudget), + ydb.WithInsecure(), + ydb.WithAnonymousCredentials(), + ydb.WithTLSSInsecureSkipVerify(), + ) + if err != nil { + return nil, err + } + + prefix := path.Join(db.Name(), label) + + tablePath := path.Join(prefix, cfg.Table) + + var nsPtr *atomic.Pointer[node_hints.NodeSelector] + if cfg.Mode == config.RunMode { + nsPtr, err = node_hints.RunUpdates(ctx, db, tablePath, time.Second*10) + if err != nil { + return nil, fmt.Errorf("create node selector: %w", err) + } + } + + s := &Storage{ + db: db, + cfg: cfg, + tablePath: path.Join(prefix, cfg.Table), + retryBudget: retryBudget, + nodeSelector: nsPtr, + } + + return s, nil +} + +func (s *Storage) WriteBatch(ctx context.Context, e []generator.Row) (attempts int, finalErr error) { + if err := ctx.Err(); err != nil { + return attempts, err + } + + rows := make([]types.Value, 0, len(e)) + + for _, row := range e { + rows = append(rows, types.StructValue( + types.StructFieldValue("id", types.Uint64Value(row.ID)), + types.StructFieldValue("payload_str", types.OptionalValue(types.TextValue(*row.PayloadStr))), + types.StructFieldValue("payload_double", types.OptionalValue(types.DoubleValue(*row.PayloadDouble))), + types.StructFieldValue( + "payload_timestamp", + types.OptionalValue(types.TimestampValue(uint64(row.PayloadTimestamp.UnixMicro()))), + ), + types.StructFieldValue("payload_hash", types.OptionalValue(types.Uint64Value(*row.PayloadHash))), + )) + } + + t := &trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + } + + var reqCtx context.Context + if s.nodeSelector != nil { + reqCtx = s.nodeSelector.Load().WithNodeHint(ctx, e[0].ID) + } else { + reqCtx = ctx + } + reqCtx, cancel := context.WithTimeout(reqCtx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + err := s.db.Table().BulkUpsert( + reqCtx, + s.tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), + table.WithRetryOptions([]retry.Option{ //nolint:staticcheck + retry.WithTrace(t), + }), + table.WithIdempotent(), + table.WithLabel("WRITE"), + ) + + return attempts, err +} + +func (s *Storage) ReadBatch(ctx context.Context, rowIDs []generator.RowID) ( + _ []generator.Row, + attempts int, + finalErr error, +) { + if err := ctx.Err(); err != nil { + return []generator.Row{}, attempts, err + } + + t := &trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + } + + keys := make([]types.Value, 0, len(rowIDs)) + for _, rowID := range rowIDs { + key := types.StructValue( + types.StructFieldValue("id", types.Uint64Value(rowID)), + ) + keys = append(keys, key) + } + + var reqCtx context.Context + if s.nodeSelector != nil { + reqCtx = s.nodeSelector.Load().WithNodeHint(ctx, rowIDs[0]) + } else { + reqCtx = ctx + } + //reqCtx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond) + //defer cancel() + + res, err := s.db.Table().ReadRows(reqCtx, s.tablePath, types.ListValue(keys...), []options.ReadRowsOption{}, + table.WithRetryOptions([]retry.Option{ //nolint:staticcheck + retry.WithTrace(t), + }), + table.WithIdempotent(), + table.WithLabel("READ"), + ) + if err != nil { + return nil, attempts, err + } + defer func() { + _ = res.Close() + }() + + readRows := make([]generator.Row, 0, len(rowIDs)) + + for res.NextResultSet(ctx) { + if err = res.Err(); err != nil { + return nil, attempts, err + } + + if res.CurrentResultSet().Truncated() { + return nil, attempts, fmt.Errorf("read rows result set truncated") + } + + for res.NextRow() { + readRow := generator.Row{} + scans := []named.Value{ + named.Required("id", &readRow.ID), + named.Optional("payload_str", &readRow.PayloadStr), + named.Optional("payload_double", &readRow.PayloadDouble), + named.Optional("payload_timestamp", &readRow.PayloadTimestamp), + named.Optional("payload_hash", &readRow.PayloadHash), + } + + err = res.ScanNamed(scans...) + if err != nil { + return nil, attempts, err + } + + readRows = append(readRows, readRow) + } + } + + return readRows, attempts, nil +} + +func (s *Storage) CreateTable(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + return s.db.Query().Do(ctx, + func(ctx context.Context, session query.Session) error { + fmt.Println(fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount)) + + return session.Exec(ctx, + fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount), + query.WithTxControl(query.EmptyTxControl()), + ) + }, query.WithIdempotent(), + query.WithLabel("CREATE TABLE"), + ) +} + +func (s *Storage) DropTable(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) + defer cancel() + + return s.db.Query().Do(ctx, + func(ctx context.Context, session query.Session) error { + return session.Exec(ctx, + fmt.Sprintf(dropTableQuery, s.tablePath), + query.WithTxControl(query.EmptyTxControl()), + ) + }, + query.WithIdempotent(), + query.WithLabel("DROP TABLE"), + ) +} + +func (s *Storage) Close(ctx context.Context) error { + s.retryBudget.Stop() + + var ( + shutdownCtx context.Context + shutdownCancel context.CancelFunc + ) + if s.cfg.ShutdownTime > 0 { + shutdownCtx, shutdownCancel = context.WithTimeout(ctx, time.Duration(s.cfg.ShutdownTime)*time.Second) + } else { + shutdownCtx, shutdownCancel = context.WithCancel(ctx) + } + defer shutdownCancel() + + return s.db.Close(shutdownCtx) +} From 0bb5ef279adb1c6d5a1a1723b757c729b56a33f1 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Fri, 5 Dec 2025 15:24:59 +0100 Subject: [PATCH 2/3] lint, update action yml --- .github/workflows/slo.yml | 32 ++++++--- tests/slo/internal/generator/generator.go | 8 +-- tests/slo/internal/generator/seeded.go | 3 +- tests/slo/internal/node_hints/node_hints.go | 71 +++++++++++-------- tests/slo/internal/workers/read.go | 2 + .../slo/native/node_hints/dynnode_traffic.go | 33 +++++---- tests/slo/native/node_hints/main.go | 9 +-- tests/slo/native/node_hints/storage.go | 9 +-- 8 files changed, 105 insertions(+), 62 deletions(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index b84bbf66a..0349263f8 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -45,26 +45,44 @@ jobs: name: database-sql-table path: ./database/sql/table label: database/sql/table + run_extra_args: '' + create_extra_args: '' - id: database_sql_query name: database-sql-query path: ./database/sql/query label: database/sql/query + run_extra_args: '' + create_extra_args: '' - id: native_query name: native-query path: ./native/query label: native/query + run_extra_args: '' + create_extra_args: '' - id: native_table name: native-table path: ./native/table label: native/table + run_extra_args: '' + create_extra_args: '' - id: native_table_over_query_service name: native-table-over-query-service path: ./native/table/over/query/service label: native/table/over/query/service + run_extra_args: '' + create_extra_args: '' - id: native_bulk_upsert name: native-bulk-upsert path: ./native/bulk-upsert label: native/bulk-upsert + run_extra_args: '-batch-size=10' + create_extra_args: '' + - id: native_node_hints + name: native-node-hints + path: ./native/node_hints + label: native/node_hints + run_extra_args: '-prometheus-endpoint http://172.28.0.2:9090 -batch-size=10' + create_extra_args: '-min-partitions-count 10' concurrency: group: slo-${{ github.ref }}-${{ matrix.sdk.name }} @@ -180,23 +198,25 @@ jobs: fi - name: Initialize YDB SLO - uses: ydb-platform/ydb-slo-action/init@main + uses: ydb-platform/ydb-slo-action/init@21473ae781c9bc8f16b42dedc79489c1867c6e50 with: github_issue: ${{ github.event.inputs.github_issue }} github_token: ${{ secrets.GITHUB_TOKEN }} workload_name: ${{ matrix.sdk.name }} workload_current_ref: ${{ github.head_ref || github.ref_name }} workload_baseline_ref: ${{ steps.baseline.outputs.ref }} + disable_compose_profiles: "${{ matrix.sdk.id == 'native_node_hints' && 'chaos' || '' }}" - name: Prepare SLO Database run: | echo "Preparing SLO database..." + CREATE_EXTRA_ARGS="${{ matrix.sdk.create_extra_args }}" docker run --rm --network ydb_ydb-net \ --add-host "ydb:172.28.0.11" \ --add-host "ydb:172.28.0.12" \ --add-host "ydb:172.28.0.13" \ --add-host "ydb:172.28.0.99" \ - ydb-app-current create grpc://ydb:2136 /Root/testdb + ydb-app-current create grpc://ydb:2136 /Root/testdb $CREATE_EXTRA_ARGS - name: Run SLO Tests (parallel) timeout-minutes: 15 @@ -204,11 +224,7 @@ jobs: DURATION=${{ inputs.slo_workload_duration_seconds || 600 }} READ_RPS=${{ inputs.slo_workload_read_max_rps || 1000 }} WRITE_RPS=${{ inputs.slo_workload_write_max_rps || 1000 }} - - EXTRA_ARGS="" - if [ "${{ matrix.sdk.id }}" = "native_bulk_upsert" ]; then - EXTRA_ARGS="--batch-size=10" - fi + RUN_EXTRA_ARGS="${{ matrix.sdk.run_extra_args }}" ARGS="run grpc://ydb:2136 /Root/testdb \ -otlp-endpoint prometheus:9090 \ @@ -218,7 +234,7 @@ jobs: -write-rps $WRITE_RPS \ -read-timeout 100 \ -write-timeout 100 \ - $EXTRA_ARGS" + $RUN_EXTRA_ARGS" echo "Starting ydb-app-current..." docker run -d \ diff --git a/tests/slo/internal/generator/generator.go b/tests/slo/internal/generator/generator.go index d60cc4d3b..48d1f0652 100755 --- a/tests/slo/internal/generator/generator.go +++ b/tests/slo/internal/generator/generator.go @@ -17,18 +17,18 @@ type Generator interface { Generate() (Row, error) } -type GeneratorImpl struct { +type Impl struct { currentID RowID mu sync.Mutex } -func New(id RowID) *GeneratorImpl { - return &GeneratorImpl{ +func New(id RowID) *Impl { + return &Impl{ currentID: id, } } -func (g *GeneratorImpl) Generate() (Row, error) { +func (g *Impl) Generate() (Row, error) { g.mu.Lock() id := g.currentID g.currentID++ diff --git a/tests/slo/internal/generator/seeded.go b/tests/slo/internal/generator/seeded.go index c7a0dff14..dfa9b5e59 100644 --- a/tests/slo/internal/generator/seeded.go +++ b/tests/slo/internal/generator/seeded.go @@ -46,6 +46,7 @@ func (g *SeededGenerator) Generate() (Row, error) { } else { row.ID = g.setRange.Left + g.rng.Uint64()%(g.setRange.Right-g.setRange.Left) } + return row, nil } @@ -54,4 +55,4 @@ func (g *SeededGenerator) SetRange(l uint64, r uint64) { Left: l, Right: r, } -} \ No newline at end of file +} diff --git a/tests/slo/internal/node_hints/node_hints.go b/tests/slo/internal/node_hints/node_hints.go index dba13c83a..65d92acf6 100644 --- a/tests/slo/internal/node_hints/node_hints.go +++ b/tests/slo/internal/node_hints/node_hints.go @@ -6,7 +6,6 @@ import ( "log" "math/rand" "slices" - "slo/internal/generator" "sync/atomic" "time" @@ -14,6 +13,8 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + + "slo/internal/generator" ) func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (desc options.Description, err error) { @@ -25,10 +26,12 @@ func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (d options.WithShardKeyBounds(), options.WithShardNodesInfo(), ) + return err }, table.WithIdempotent(), ) + return desc, err } @@ -42,9 +45,9 @@ func extractKey(v types.Value, side int) (uint64, error) { if types.IsNull(v) { if side == LEFT { return 0, nil - } else { - return ^uint64(0), nil } + + return ^uint64(0), nil } parts, err := types.TupleItems(v) if err != nil { @@ -66,7 +69,6 @@ const ( func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string) (*NodeSelector, error) { dsc, err := describeTable(ctx, driver, tableName) - if err != nil { return nil, err } @@ -98,30 +100,39 @@ func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string) for _, ps := range dsc.Stats.PartitionStats { s.NodeIDs = append(s.NodeIDs, ps.LeaderNodeID) } + return &s, nil } -func (s *NodeSelector) findNodeID(key uint64) uint32 { - idx, found := slices.BinarySearch(s.UpperBounds, key) +func (ns *NodeSelector) findNodeID(key uint64) uint32 { + idx, found := slices.BinarySearch(ns.UpperBounds, key) if found { idx++ } - return s.NodeIDs[idx] + + return ns.NodeIDs[idx] } -func (s *NodeSelector) WithNodeHint(ctx context.Context, key uint64) context.Context { - if s == nil || len(s.NodeIDs) == 0 { +func (ns *NodeSelector) WithNodeHint(ctx context.Context, key uint64) context.Context { + if ns == nil || len(ns.NodeIDs) == 0 { return ctx } - return ydb.WithPreferredNodeID(ctx, s.findNodeID(key)) + + return ydb.WithPreferredNodeID(ctx, ns.findNodeID(key)) } -func (s *NodeSelector) GeneratePartitionKey(partitionId uint64) uint64 { - l := s.UpperBounds[partitionId] - s.LowerBounds[partitionId] - return s.LowerBounds[partitionId] + rand.Uint64()%l +func (ns *NodeSelector) GeneratePartitionKey(partitionID uint64) uint64 { + l := ns.UpperBounds[partitionID] - ns.LowerBounds[partitionID] + + return ns.LowerBounds[partitionID] + rand.Uint64()%l } -func RunUpdates(ctx context.Context, driver *ydb.Driver, tableName string, frequency time.Duration) (*atomic.Pointer[NodeSelector], error) { +func RunUpdates( + ctx context.Context, + driver *ydb.Driver, + tableName string, + frequency time.Duration, +) (*atomic.Pointer[NodeSelector], error) { var ns atomic.Pointer[NodeSelector] updateSelector := func() error { selector, err := MakeNodeSelector(ctx, driver, tableName) @@ -129,29 +140,30 @@ func RunUpdates(ctx context.Context, driver *ydb.Driver, tableName string, frequ return err } ns.Store(selector) + return nil } err := updateSelector() if err != nil { return nil, err - } else { - ticker := time.NewTicker(frequency) - go func() { - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - err = updateSelector() - if err != nil { - log.Printf("node hints update error: %v\n", err) - } + } + ticker := time.NewTicker(frequency) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err = updateSelector() + if err != nil { + log.Printf("node hints update error: %v\n", err) } } - }() - } + } + }() + return &ns, nil } @@ -167,5 +179,6 @@ func (ns *NodeSelector) GetRandomNodeID(generator generator.Generator) (int, uin } } log.Panicf("GetRandomNodeID: no nodeID found for shift: %d", shift) + return 0, 0 } diff --git a/tests/slo/internal/workers/read.go b/tests/slo/internal/workers/read.go index 743839e1a..d62f2e272 100644 --- a/tests/slo/internal/workers/read.go +++ b/tests/slo/internal/workers/read.go @@ -41,6 +41,7 @@ func (w *Workers) ReadID() uint64 { if err != nil { log.Panicf("generate error: %v", err) } + return row.ID } @@ -57,6 +58,7 @@ func (w *Workers) ReadIDs() []uint64 { ids = append(ids, row.ID) } } + return ids } diff --git a/tests/slo/native/node_hints/dynnode_traffic.go b/tests/slo/native/node_hints/dynnode_traffic.go index 88b86be6b..3eb731bea 100644 --- a/tests/slo/native/node_hints/dynnode_traffic.go +++ b/tests/slo/native/node_hints/dynnode_traffic.go @@ -3,8 +3,6 @@ package main import ( "context" "fmt" - "slo/internal/config" - "slo/internal/log" "strconv" "strings" "time" @@ -12,6 +10,9 @@ import ( "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" + + "slo/internal/config" + "slo/internal/log" ) type Estimator struct { @@ -36,6 +37,9 @@ func getMetric(ctx context.Context, cfg *config.Config, query string) model.Vect query, time.Now(), ) + if err != nil { + log.Panicf("query failed: %v", err) + } if len(warnings) > 0 { fmt.Println("Warnings: ", warnings) } @@ -43,11 +47,13 @@ func getMetric(ctx context.Context, cfg *config.Config, query string) model.Vect if !ok || len(vector) == 0 { log.Panicf("no data found for query: %s", query) } + return vector } func getMetricValue(ctx context.Context, cfg *config.Config, query string) float64 { vector := getMetric(ctx, cfg, query) + return float64(vector[0].Value) } @@ -56,6 +62,7 @@ func formatNodeID(v model.LabelValue) uint32 { if err != nil { log.Panicf("formatNodeID failed: %v", err) } + return uint32(id) } @@ -67,15 +74,15 @@ func NewEstimator(ctx context.Context, storage *Storage) *Estimator { allNodeIDs := make(map[uint32]bool) instanceID := make(map[string]map[uint32]bool) nodeInstance := make(map[uint32]string) - //get all node ids + // get all node ids for _, v := range vec { allNodeIDs[formatNodeID(v.Metric["peer_node_id"])] = true } - //for target instance, the only absent node id is correct + // for target instance, the only absent node id is correct for _, v := range vec { instance := string(v.Metric["instance"]) instanceID[instance] = make(map[uint32]bool) - for nodeID, _ := range allNodeIDs { + for nodeID := range allNodeIDs { instanceID[instance][nodeID] = true } } @@ -83,7 +90,7 @@ func NewEstimator(ctx context.Context, storage *Storage) *Estimator { instance := string(v.Metric["instance"]) instanceID[instance][formatNodeID(v.Metric["peer_node_id"])] = false } - //backwards mapping + // backwards mapping for instance, nodeIDs := range instanceID { if strings.Contains(instance, "storage") { continue @@ -96,32 +103,34 @@ func NewEstimator(ctx context.Context, storage *Storage) *Estimator { } e.NodeInstances = nodeInstance e.NodeRequests = make(map[uint32]float64) - //collect counters - for nodeID, _ := range e.NodeInstances { + // collect counters + for nodeID := range e.NodeInstances { e.NodeRequests[nodeID] = e.NodeRWCounter(ctx, nodeID) } e.ClusterCounter = e.ClusterRWCounter(ctx) + return e } -func (e *Estimator) NodeGrpcApiCounter(ctx context.Context, method string, nodeID uint32) float64 { +func (e *Estimator) NodeGrpcAPICounter(ctx context.Context, method string, nodeID uint32) float64 { instance, ok := e.NodeInstances[nodeID] if !ok { log.Panicf("no instance found for nodeID: %d", nodeID) } + return getMetricValue(ctx, e.cfg, fmt.Sprintf(`api_grpc_request_count{instance="%s",method="%s"}`, instance, method)) } -func (e *Estimator) ClusterGrpcApiCounter(ctx context.Context, method string) float64 { +func (e *Estimator) ClusterGrpcAPICounter(ctx context.Context, method string) float64 { return getMetricValue(ctx, e.cfg, fmt.Sprintf(`sum(api_grpc_request_count{method="%s"})`, method)) } func (e *Estimator) NodeRWCounter(ctx context.Context, nodeID uint32) float64 { - return e.NodeGrpcApiCounter(ctx, "ReadRows", nodeID) + e.NodeGrpcApiCounter(ctx, "BulkUpsert", nodeID) + return e.NodeGrpcAPICounter(ctx, "ReadRows", nodeID) + e.NodeGrpcAPICounter(ctx, "BulkUpsert", nodeID) } func (e *Estimator) ClusterRWCounter(ctx context.Context) float64 { - return e.ClusterGrpcApiCounter(ctx, "ReadRows") + e.ClusterGrpcApiCounter(ctx, "BulkUpsert") + return e.ClusterGrpcAPICounter(ctx, "ReadRows") + e.ClusterGrpcAPICounter(ctx, "BulkUpsert") } func (e *Estimator) OnlyThisNode(ctx context.Context, nodeID uint32) { diff --git a/tests/slo/native/node_hints/main.go b/tests/slo/native/node_hints/main.go index f58642ebb..a875a3519 100644 --- a/tests/slo/native/node_hints/main.go +++ b/tests/slo/native/node_hints/main.go @@ -108,7 +108,7 @@ func main() { log.Println("cleanup table ok") case config.RunMode: - //to wait for correct partitions boundaries + // to wait for correct partitions boundaries time.Sleep(10 * time.Second) w, err := workers.NewWithBatch(cfg, s, ref, label, jobName) if err != nil { @@ -127,9 +127,9 @@ func main() { log.Println("workers close ok") }() - //collect metrics + // collect metrics estimator := NewEstimator(ctx, s) - //run workers + // run workers wg := sync.WaitGroup{} readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1) wg.Add(cfg.ReadRPS) @@ -150,7 +150,8 @@ func main() { go w.Metrics(ctx, &wg, metricsRL) wg.Wait() - //check all load is sent to a single node + w.FailOnError() + // check all load is sent to a single node ectx, ecancel := context.WithTimeout(context.Background(), 1*time.Second) defer ecancel() estimator.OnlyThisNode(ectx, nodeID) diff --git a/tests/slo/native/node_hints/storage.go b/tests/slo/native/node_hints/storage.go index 46706ae68..1d240a5e5 100644 --- a/tests/slo/native/node_hints/storage.go +++ b/tests/slo/native/node_hints/storage.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "path" - "slo/internal/node_hints" "sync/atomic" "time" @@ -20,6 +19,7 @@ import ( "slo/internal/config" "slo/internal/generator" + "slo/internal/node_hints" ) const createTableQuery = ` @@ -74,7 +74,7 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int, label str var nsPtr *atomic.Pointer[node_hints.NodeSelector] if cfg.Mode == config.RunMode { - nsPtr, err = node_hints.RunUpdates(ctx, db, tablePath, time.Second*10) + nsPtr, err = node_hints.RunUpdates(ctx, db, tablePath, time.Second*5) if err != nil { return nil, fmt.Errorf("create node selector: %w", err) } @@ -173,8 +173,8 @@ func (s *Storage) ReadBatch(ctx context.Context, rowIDs []generator.RowID) ( } else { reqCtx = ctx } - //reqCtx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond) - //defer cancel() + reqCtx, cancel := context.WithTimeout(reqCtx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond) + defer cancel() res, err := s.db.Table().ReadRows(reqCtx, s.tablePath, types.ListValue(keys...), []options.ReadRowsOption{}, table.WithRetryOptions([]retry.Option{ //nolint:staticcheck @@ -226,6 +226,7 @@ func (s *Storage) ReadBatch(ctx context.Context, rowIDs []generator.RowID) ( func (s *Storage) CreateTable(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond) defer cancel() + return s.db.Query().Do(ctx, func(ctx context.Context, session query.Session) error { fmt.Println(fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount)) From 89fb2e2df78492a0f647712faabbcfc9154e6863 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Fri, 5 Dec 2025 16:13:15 +0100 Subject: [PATCH 3/3] CodeQL fix --- tests/slo/native/node_hints/dynnode_traffic.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/slo/native/node_hints/dynnode_traffic.go b/tests/slo/native/node_hints/dynnode_traffic.go index 3eb731bea..d2fc5e293 100644 --- a/tests/slo/native/node_hints/dynnode_traffic.go +++ b/tests/slo/native/node_hints/dynnode_traffic.go @@ -58,12 +58,12 @@ func getMetricValue(ctx context.Context, cfg *config.Config, query string) float } func formatNodeID(v model.LabelValue) uint32 { - id, err := strconv.Atoi(string(v)) + i64, err := strconv.ParseUint(string(v), 10, 32) if err != nil { log.Panicf("formatNodeID failed: %v", err) } - return uint32(id) + return uint32(i64) } func NewEstimator(ctx context.Context, storage *Storage) *Estimator {