@@ -6,14 +6,15 @@ import (
66 "log"
77 "math/rand"
88 "slices"
9- "slo/internal/generator"
109 "sync/atomic"
1110 "time"
1211
1312 "github.com/ydb-platform/ydb-go-sdk/v3"
1413 "github.com/ydb-platform/ydb-go-sdk/v3/table"
1514 "github.com/ydb-platform/ydb-go-sdk/v3/table/options"
1615 "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
16+
17+ "slo/internal/generator"
1718)
1819
1920func describeTable (ctx context.Context , driver * ydb.Driver , tableName string ) (desc options.Description , err error ) {
@@ -25,10 +26,12 @@ func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (d
2526 options .WithShardKeyBounds (),
2627 options .WithShardNodesInfo (),
2728 )
29+
2830 return err
2931 },
3032 table .WithIdempotent (),
3133 )
34+
3235 return desc , err
3336}
3437
@@ -42,9 +45,9 @@ func extractKey(v types.Value, side int) (uint64, error) {
4245 if types .IsNull (v ) {
4346 if side == LEFT {
4447 return 0 , nil
45- } else {
46- return ^ uint64 (0 ), nil
4748 }
49+
50+ return ^ uint64 (0 ), nil
4851 }
4952 parts , err := types .TupleItems (v )
5053 if err != nil {
@@ -66,7 +69,6 @@ const (
6669
6770func MakeNodeSelector (ctx context.Context , driver * ydb.Driver , tableName string ) (* NodeSelector , error ) {
6871 dsc , err := describeTable (ctx , driver , tableName )
69-
7072 if err != nil {
7173 return nil , err
7274 }
@@ -98,60 +100,70 @@ func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string)
98100 for _ , ps := range dsc .Stats .PartitionStats {
99101 s .NodeIDs = append (s .NodeIDs , ps .LeaderNodeID )
100102 }
103+
101104 return & s , nil
102105}
103106
104- func (s * NodeSelector ) findNodeID (key uint64 ) uint32 {
105- idx , found := slices .BinarySearch (s .UpperBounds , key )
107+ func (ns * NodeSelector ) findNodeID (key uint64 ) uint32 {
108+ idx , found := slices .BinarySearch (ns .UpperBounds , key )
106109 if found {
107110 idx ++
108111 }
109- return s .NodeIDs [idx ]
112+
113+ return ns .NodeIDs [idx ]
110114}
111115
112- func (s * NodeSelector ) WithNodeHint (ctx context.Context , key uint64 ) context.Context {
113- if s == nil || len (s .NodeIDs ) == 0 {
116+ func (ns * NodeSelector ) WithNodeHint (ctx context.Context , key uint64 ) context.Context {
117+ if ns == nil || len (ns .NodeIDs ) == 0 {
114118 return ctx
115119 }
116- return ydb .WithPreferredNodeID (ctx , s .findNodeID (key ))
120+
121+ return ydb .WithPreferredNodeID (ctx , ns .findNodeID (key ))
117122}
118123
119- func (s * NodeSelector ) GeneratePartitionKey (partitionId uint64 ) uint64 {
120- l := s .UpperBounds [partitionId ] - s .LowerBounds [partitionId ]
121- return s .LowerBounds [partitionId ] + rand .Uint64 ()% l
124+ func (ns * NodeSelector ) GeneratePartitionKey (partitionID uint64 ) uint64 {
125+ l := ns .UpperBounds [partitionID ] - ns .LowerBounds [partitionID ]
126+
127+ return ns .LowerBounds [partitionID ] + rand .Uint64 ()% l
122128}
123129
124- func RunUpdates (ctx context.Context , driver * ydb.Driver , tableName string , frequency time.Duration ) (* atomic.Pointer [NodeSelector ], error ) {
130+ func RunUpdates (
131+ ctx context.Context ,
132+ driver * ydb.Driver ,
133+ tableName string ,
134+ frequency time.Duration ,
135+ ) (* atomic.Pointer [NodeSelector ], error ) {
125136 var ns atomic.Pointer [NodeSelector ]
126137 updateSelector := func () error {
127138 selector , err := MakeNodeSelector (ctx , driver , tableName )
128139 if err != nil {
129140 return err
130141 }
131142 ns .Store (selector )
143+
132144 return nil
133145 }
134146
135147 err := updateSelector ()
136148 if err != nil {
137149 return nil , err
138- } else {
139- ticker := time .NewTicker (frequency )
140- go func () {
141- defer ticker .Stop ()
142- for {
143- select {
144- case <- ctx .Done ():
145- return
146- case <- ticker .C :
147- err = updateSelector ()
148- if err != nil {
149- log .Printf ("node hints update error: %v\n " , err )
150- }
150+ }
151+ ticker := time .NewTicker (frequency )
152+ go func () {
153+ defer ticker .Stop ()
154+ for {
155+ select {
156+ case <- ctx .Done ():
157+ return
158+ case <- ticker .C :
159+ err = updateSelector ()
160+ if err != nil {
161+ log .Printf ("node hints update error: %v\n " , err )
151162 }
152163 }
153- }()
154- }
164+ }
165+ }()
166+
155167 return & ns , nil
156168}
157169
@@ -167,5 +179,6 @@ func (ns *NodeSelector) GetRandomNodeID(generator generator.Generator) (int, uin
167179 }
168180 }
169181 log .Panicf ("GetRandomNodeID: no nodeID found for shift: %d" , shift )
182+
170183 return 0 , 0
171184}
0 commit comments