Skip to content

Commit 0f73166

Browse files
committed
lint
1 parent 361353e commit 0f73166

File tree

7 files changed

+79
-53
lines changed

7 files changed

+79
-53
lines changed

tests/slo/internal/generator/generator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@ type Generator interface {
1717
Generate() (Row, error)
1818
}
1919

20-
type GeneratorImpl struct {
20+
type Impl struct {
2121
currentID RowID
2222
mu sync.Mutex
2323
}
2424

25-
func New(id RowID) *GeneratorImpl {
26-
return &GeneratorImpl{
25+
func New(id RowID) *Impl {
26+
return &Impl{
2727
currentID: id,
2828
}
2929
}
3030

31-
func (g *GeneratorImpl) Generate() (Row, error) {
31+
func (g *Impl) Generate() (Row, error) {
3232
g.mu.Lock()
3333
id := g.currentID
3434
g.currentID++

tests/slo/internal/generator/seeded.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func (g *SeededGenerator) Generate() (Row, error) {
4646
} else {
4747
row.ID = g.setRange.Left + g.rng.Uint64()%(g.setRange.Right-g.setRange.Left)
4848
}
49+
4950
return row, nil
5051
}
5152

@@ -54,4 +55,4 @@ func (g *SeededGenerator) SetRange(l uint64, r uint64) {
5455
Left: l,
5556
Right: r,
5657
}
57-
}
58+
}

tests/slo/internal/node_hints/node_hints.go

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import (
66
"log"
77
"math/rand"
88
"slices"
9-
"slo/internal/generator"
109
"sync/atomic"
1110
"time"
1211

1312
"github.com/ydb-platform/ydb-go-sdk/v3"
1413
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1514
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
1615
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
16+
17+
"slo/internal/generator"
1718
)
1819

1920
func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (desc options.Description, err error) {
@@ -25,10 +26,12 @@ func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (d
2526
options.WithShardKeyBounds(),
2627
options.WithShardNodesInfo(),
2728
)
29+
2830
return err
2931
},
3032
table.WithIdempotent(),
3133
)
34+
3235
return desc, err
3336
}
3437

@@ -42,9 +45,9 @@ func extractKey(v types.Value, side int) (uint64, error) {
4245
if types.IsNull(v) {
4346
if side == LEFT {
4447
return 0, nil
45-
} else {
46-
return ^uint64(0), nil
4748
}
49+
50+
return ^uint64(0), nil
4851
}
4952
parts, err := types.TupleItems(v)
5053
if err != nil {
@@ -66,7 +69,6 @@ const (
6669

6770
func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string) (*NodeSelector, error) {
6871
dsc, err := describeTable(ctx, driver, tableName)
69-
7072
if err != nil {
7173
return nil, err
7274
}
@@ -98,60 +100,70 @@ func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string)
98100
for _, ps := range dsc.Stats.PartitionStats {
99101
s.NodeIDs = append(s.NodeIDs, ps.LeaderNodeID)
100102
}
103+
101104
return &s, nil
102105
}
103106

104-
func (s *NodeSelector) findNodeID(key uint64) uint32 {
105-
idx, found := slices.BinarySearch(s.UpperBounds, key)
107+
func (ns *NodeSelector) findNodeID(key uint64) uint32 {
108+
idx, found := slices.BinarySearch(ns.UpperBounds, key)
106109
if found {
107110
idx++
108111
}
109-
return s.NodeIDs[idx]
112+
113+
return ns.NodeIDs[idx]
110114
}
111115

112-
func (s *NodeSelector) WithNodeHint(ctx context.Context, key uint64) context.Context {
113-
if s == nil || len(s.NodeIDs) == 0 {
116+
func (ns *NodeSelector) WithNodeHint(ctx context.Context, key uint64) context.Context {
117+
if ns == nil || len(ns.NodeIDs) == 0 {
114118
return ctx
115119
}
116-
return ydb.WithPreferredNodeID(ctx, s.findNodeID(key))
120+
121+
return ydb.WithPreferredNodeID(ctx, ns.findNodeID(key))
117122
}
118123

119-
func (s *NodeSelector) GeneratePartitionKey(partitionId uint64) uint64 {
120-
l := s.UpperBounds[partitionId] - s.LowerBounds[partitionId]
121-
return s.LowerBounds[partitionId] + rand.Uint64()%l
124+
func (ns *NodeSelector) GeneratePartitionKey(partitionID uint64) uint64 {
125+
l := ns.UpperBounds[partitionID] - ns.LowerBounds[partitionID]
126+
127+
return ns.LowerBounds[partitionID] + rand.Uint64()%l
122128
}
123129

124-
func RunUpdates(ctx context.Context, driver *ydb.Driver, tableName string, frequency time.Duration) (*atomic.Pointer[NodeSelector], error) {
130+
func RunUpdates(
131+
ctx context.Context,
132+
driver *ydb.Driver,
133+
tableName string,
134+
frequency time.Duration,
135+
) (*atomic.Pointer[NodeSelector], error) {
125136
var ns atomic.Pointer[NodeSelector]
126137
updateSelector := func() error {
127138
selector, err := MakeNodeSelector(ctx, driver, tableName)
128139
if err != nil {
129140
return err
130141
}
131142
ns.Store(selector)
143+
132144
return nil
133145
}
134146

135147
err := updateSelector()
136148
if err != nil {
137149
return nil, err
138-
} else {
139-
ticker := time.NewTicker(frequency)
140-
go func() {
141-
defer ticker.Stop()
142-
for {
143-
select {
144-
case <-ctx.Done():
145-
return
146-
case <-ticker.C:
147-
err = updateSelector()
148-
if err != nil {
149-
log.Printf("node hints update error: %v\n", err)
150-
}
150+
}
151+
ticker := time.NewTicker(frequency)
152+
go func() {
153+
defer ticker.Stop()
154+
for {
155+
select {
156+
case <-ctx.Done():
157+
return
158+
case <-ticker.C:
159+
err = updateSelector()
160+
if err != nil {
161+
log.Printf("node hints update error: %v\n", err)
151162
}
152163
}
153-
}()
154-
}
164+
}
165+
}()
166+
155167
return &ns, nil
156168
}
157169

@@ -167,5 +179,6 @@ func (ns *NodeSelector) GetRandomNodeID(generator generator.Generator) (int, uin
167179
}
168180
}
169181
log.Panicf("GetRandomNodeID: no nodeID found for shift: %d", shift)
182+
170183
return 0, 0
171184
}

tests/slo/internal/workers/read.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (w *Workers) ReadID() uint64 {
4141
if err != nil {
4242
log.Panicf("generate error: %v", err)
4343
}
44+
4445
return row.ID
4546
}
4647

@@ -57,6 +58,7 @@ func (w *Workers) ReadIDs() []uint64 {
5758
ids = append(ids, row.ID)
5859
}
5960
}
61+
6062
return ids
6163
}
6264

tests/slo/native/node_hints/dynnode_traffic.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"slo/internal/config"
7-
"slo/internal/log"
86
"strconv"
97
"strings"
108
"time"
119

1210
"github.com/prometheus/client_golang/api"
1311
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
1412
"github.com/prometheus/common/model"
13+
14+
"slo/internal/config"
15+
"slo/internal/log"
1516
)
1617

1718
type Estimator struct {
@@ -36,18 +37,23 @@ func getMetric(ctx context.Context, cfg *config.Config, query string) model.Vect
3637
query,
3738
time.Now(),
3839
)
40+
if err != nil {
41+
log.Panicf("query failed: %v", err)
42+
}
3943
if len(warnings) > 0 {
4044
fmt.Println("Warnings: ", warnings)
4145
}
4246
vector, ok := result.(model.Vector)
4347
if !ok || len(vector) == 0 {
4448
log.Panicf("no data found for query: %s", query)
4549
}
50+
4651
return vector
4752
}
4853

4954
func getMetricValue(ctx context.Context, cfg *config.Config, query string) float64 {
5055
vector := getMetric(ctx, cfg, query)
56+
5157
return float64(vector[0].Value)
5258
}
5359

@@ -56,6 +62,7 @@ func formatNodeID(v model.LabelValue) uint32 {
5662
if err != nil {
5763
log.Panicf("formatNodeID failed: %v", err)
5864
}
65+
5966
return uint32(id)
6067
}
6168

@@ -67,23 +74,23 @@ func NewEstimator(ctx context.Context, storage *Storage) *Estimator {
6774
allNodeIDs := make(map[uint32]bool)
6875
instanceID := make(map[string]map[uint32]bool)
6976
nodeInstance := make(map[uint32]string)
70-
//get all node ids
77+
// get all node ids
7178
for _, v := range vec {
7279
allNodeIDs[formatNodeID(v.Metric["peer_node_id"])] = true
7380
}
74-
//for target instance, the only absent node id is correct
81+
// for target instance, the only absent node id is correct
7582
for _, v := range vec {
7683
instance := string(v.Metric["instance"])
7784
instanceID[instance] = make(map[uint32]bool)
78-
for nodeID, _ := range allNodeIDs {
85+
for nodeID := range allNodeIDs {
7986
instanceID[instance][nodeID] = true
8087
}
8188
}
8289
for _, v := range vec {
8390
instance := string(v.Metric["instance"])
8491
instanceID[instance][formatNodeID(v.Metric["peer_node_id"])] = false
8592
}
86-
//backwards mapping
93+
// backwards mapping
8794
for instance, nodeIDs := range instanceID {
8895
if strings.Contains(instance, "storage") {
8996
continue
@@ -96,32 +103,34 @@ func NewEstimator(ctx context.Context, storage *Storage) *Estimator {
96103
}
97104
e.NodeInstances = nodeInstance
98105
e.NodeRequests = make(map[uint32]float64)
99-
//collect counters
100-
for nodeID, _ := range e.NodeInstances {
106+
// collect counters
107+
for nodeID := range e.NodeInstances {
101108
e.NodeRequests[nodeID] = e.NodeRWCounter(ctx, nodeID)
102109
}
103110
e.ClusterCounter = e.ClusterRWCounter(ctx)
111+
104112
return e
105113
}
106114

107-
func (e *Estimator) NodeGrpcApiCounter(ctx context.Context, method string, nodeID uint32) float64 {
115+
func (e *Estimator) NodeGrpcAPICounter(ctx context.Context, method string, nodeID uint32) float64 {
108116
instance, ok := e.NodeInstances[nodeID]
109117
if !ok {
110118
log.Panicf("no instance found for nodeID: %d", nodeID)
111119
}
120+
112121
return getMetricValue(ctx, e.cfg, fmt.Sprintf(`api_grpc_request_count{instance="%s",method="%s"}`, instance, method))
113122
}
114123

115-
func (e *Estimator) ClusterGrpcApiCounter(ctx context.Context, method string) float64 {
124+
func (e *Estimator) ClusterGrpcAPICounter(ctx context.Context, method string) float64 {
116125
return getMetricValue(ctx, e.cfg, fmt.Sprintf(`sum(api_grpc_request_count{method="%s"})`, method))
117126
}
118127

119128
func (e *Estimator) NodeRWCounter(ctx context.Context, nodeID uint32) float64 {
120-
return e.NodeGrpcApiCounter(ctx, "ReadRows", nodeID) + e.NodeGrpcApiCounter(ctx, "BulkUpsert", nodeID)
129+
return e.NodeGrpcAPICounter(ctx, "ReadRows", nodeID) + e.NodeGrpcAPICounter(ctx, "BulkUpsert", nodeID)
121130
}
122131

123132
func (e *Estimator) ClusterRWCounter(ctx context.Context) float64 {
124-
return e.ClusterGrpcApiCounter(ctx, "ReadRows") + e.ClusterGrpcApiCounter(ctx, "BulkUpsert")
133+
return e.ClusterGrpcAPICounter(ctx, "ReadRows") + e.ClusterGrpcAPICounter(ctx, "BulkUpsert")
125134
}
126135

127136
func (e *Estimator) OnlyThisNode(ctx context.Context, nodeID uint32) {

tests/slo/native/node_hints/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func main() {
108108

109109
log.Println("cleanup table ok")
110110
case config.RunMode:
111-
//to wait for correct partitions boundaries
111+
// to wait for correct partitions boundaries
112112
time.Sleep(10 * time.Second)
113113
w, err := workers.NewWithBatch(cfg, s, ref, label, jobName)
114114
if err != nil {
@@ -127,9 +127,9 @@ func main() {
127127
log.Println("workers close ok")
128128
}()
129129

130-
//collect metrics
130+
// collect metrics
131131
estimator := NewEstimator(ctx, s)
132-
//run workers
132+
// run workers
133133
wg := sync.WaitGroup{}
134134
readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1)
135135
wg.Add(cfg.ReadRPS)
@@ -150,7 +150,7 @@ func main() {
150150
go w.Metrics(ctx, &wg, metricsRL)
151151

152152
wg.Wait()
153-
//check all load is sent to a single node
153+
// check all load is sent to a single node
154154
ectx, ecancel := context.WithTimeout(context.Background(), 1*time.Second)
155155
defer ecancel()
156156
estimator.OnlyThisNode(ectx, nodeID)

tests/slo/native/node_hints/storage.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"path"
7-
"slo/internal/node_hints"
87
"sync/atomic"
98
"time"
109

@@ -20,6 +19,7 @@ import (
2019

2120
"slo/internal/config"
2221
"slo/internal/generator"
22+
"slo/internal/node_hints"
2323
)
2424

2525
const createTableQuery = `
@@ -173,8 +173,8 @@ func (s *Storage) ReadBatch(ctx context.Context, rowIDs []generator.RowID) (
173173
} else {
174174
reqCtx = ctx
175175
}
176-
//reqCtx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond)
177-
//defer cancel()
176+
// reqCtx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.ReadTimeout)*time.Millisecond)
177+
// defer cancel()
178178

179179
res, err := s.db.Table().ReadRows(reqCtx, s.tablePath, types.ListValue(keys...), []options.ReadRowsOption{},
180180
table.WithRetryOptions([]retry.Option{ //nolint:staticcheck
@@ -226,6 +226,7 @@ func (s *Storage) ReadBatch(ctx context.Context, rowIDs []generator.RowID) (
226226
func (s *Storage) CreateTable(ctx context.Context) error {
227227
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond)
228228
defer cancel()
229+
229230
return s.db.Query().Do(ctx,
230231
func(ctx context.Context, session query.Session) error {
231232
fmt.Println(fmt.Sprintf(createTableQuery, s.tablePath, s.cfg.MinPartitionsCount))

0 commit comments

Comments
 (0)