@@ -5,12 +5,15 @@ package fs
55
66import (
77 "fmt"
8+ "runtime"
89 "sync"
910 "sync/atomic"
1011 "syscall"
1112 "time"
1213
1314 "github.com/Meesho/BharatMLStack/flashring/pkg/metrics"
15+ "github.com/rs/zerolog/log"
16+ "golang.org/x/sys/unix"
1417)
1518
1619// batchReadResult holds the outcome of a single batched pread.
@@ -47,11 +50,25 @@ var batchReqPool = sync.Pool{
4750//
4851// CQEs are dispatched individually as they complete (no head-of-line blocking).
4952type BatchIoUringReader struct {
50- ring * IoUring
51- reqCh chan * batchReadRequest
52- maxBatch int
53- closeCh chan struct {}
54- wg sync.WaitGroup
53+ ring * IoUring
54+ reqCh chan * batchReadRequest
55+ maxBatch int
56+ closeCh chan struct {}
57+ wg sync.WaitGroup
58+ pinToCore int // -1 = do not pin; >= 0 = CPU core index to pin this loop's thread to
59+ }
60+
61+ // pinThreadToCore pins the current OS thread to the given CPU core.
62+ // Must be called from a goroutine that has already called runtime.LockOSThread()
63+ // so that the same thread is used for the rest of the goroutine's lifetime.
64+ // cpu is the core index (e.g. 0, 1, 2, ...). No-op if cpu < 0.
65+ func pinThreadToCore (cpu int ) error {
66+ if cpu < 0 {
67+ return nil
68+ }
69+ var set unix.CPUSet
70+ set .Set (cpu )
71+ return unix .SchedSetaffinity (0 , & set )
5572}
5673
5774// BatchIoUringConfig configures the batch reader.
@@ -60,6 +77,13 @@ type BatchIoUringConfig struct {
6077 MaxBatch int // max requests per batch (capped to RingDepth)
6178 Window time.Duration // unused, kept for config compatibility
6279 QueueSize int // channel buffer size (default 1024)
80+ // PinToCore pins this reader's loop goroutine to the given CPU core index.
81+ // -1 or negative = do not pin. Requires runtime.LockOSThread for the loop.
82+ PinToCore int
83+ // PinToCores is used by NewParallelBatchIoUringReader: if non-nil and
84+ // len(PinToCores) >= numRings, ring i is pinned to core PinToCores[i].
85+ // Ignored when creating a single BatchIoUringReader.
86+ PinToCores []int
6387}
6488
6589// NewBatchIoUringReader creates a batch reader with its own io_uring ring
@@ -81,10 +105,11 @@ func NewBatchIoUringReader(cfg BatchIoUringConfig) (*BatchIoUringReader, error)
81105 }
82106
83107 b := & BatchIoUringReader {
84- ring : ring ,
85- reqCh : make (chan * batchReadRequest , cfg .QueueSize ),
86- maxBatch : cfg .MaxBatch ,
87- closeCh : make (chan struct {}),
108+ ring : ring ,
109+ reqCh : make (chan * batchReadRequest , cfg .QueueSize ),
110+ maxBatch : cfg .MaxBatch ,
111+ closeCh : make (chan struct {}),
112+ pinToCore : cfg .PinToCore ,
88113 }
89114 b .wg .Add (1 )
90115 go b .loop ()
@@ -113,6 +138,7 @@ func (b *BatchIoUringReader) Submit(fd int, buf []byte, offset uint64) (int, err
113138 result := <- req .done
114139 n , err := result .N , result .Err
115140 if metrics .Enabled () {
141+ metrics .Incr (metrics .KEY_PREAD_COUNT , []string {})
116142 metrics .Timing (metrics .KEY_PREAD_LATENCY , time .Since (startTime ), []string {})
117143 }
118144
@@ -140,6 +166,13 @@ func (b *BatchIoUringReader) Close() {
140166func (b * BatchIoUringReader ) loop () {
141167 defer b .wg .Done ()
142168
169+ if b .pinToCore >= 0 {
170+ runtime .LockOSThread ()
171+ if err := pinThreadToCore (b .pinToCore ); err != nil {
172+ log .Warn ().Err (err ).Int ("core" , b .pinToCore ).Msg ("failed to pin io_uring loop to core, continuing without pinning" )
173+ }
174+ }
175+
143176 batch := make ([]* batchReadRequest , 0 , b .maxBatch )
144177
145178 for {
@@ -268,13 +301,19 @@ type ParallelBatchIoUringReader struct {
268301
269302// NewParallelBatchIoUringReader creates numRings independent batch readers.
270303// Each ring gets its own io_uring instance and background goroutine.
304+ // If cfg.PinToCores has at least numRings elements, ring i is pinned to core PinToCores[i].
271305func NewParallelBatchIoUringReader (cfg BatchIoUringConfig , numRings int ) (* ParallelBatchIoUringReader , error ) {
272306 if numRings <= 0 {
273307 numRings = 1
274308 }
275309 readers := make ([]* BatchIoUringReader , numRings )
276310 for i := 0 ; i < numRings ; i ++ {
277- r , err := NewBatchIoUringReader (cfg )
311+ ringCfg := cfg
312+ ringCfg .PinToCore = - 1
313+ if len (cfg .PinToCores ) > i {
314+ ringCfg .PinToCore = cfg .PinToCores [i ]
315+ }
316+ r , err := NewBatchIoUringReader (ringCfg )
278317 if err != nil {
279318 for j := 0 ; j < i ; j ++ {
280319 readers [j ].Close ()
0 commit comments