@@ -31,12 +31,28 @@ typedef struct
3131{
3232 /*
3333 * The clock-sweep counter is atomically updated by 1 at every tick. Use
34- * the macro CLOCKSWEEP_HAND () to find the location of the hand on the
35- * clock. Use CLOCKSWEEP_PASSES () to calculate the number of times the
34+ * the function ClockSweepHand () to find the location of the hand on the
35+ * clock. Use ClockSweepPasses () to calculate the number of times the
3636 * clock-sweep hand has made a complete pass around the clock.
3737 */
3838 pg_atomic_uint64 clockSweepCounter ;
3939
40+ /*
41+ * Division and modulo can be expensive to calculate repeatedly. Given
42+ * that the buffer manager is a very hot code path we implement a more
43+ * efficient method based on using "Division by invariant Integers using
44+ * Multiplication" (https://gmplib.org/~tege/divcnst-pldi94.pdf) by
45+ * Granlund-Montgomery. Our implementation below was inspired by the MIT
46+ * Licensed "fastdiv" (https://github.com/jmtilli/fastdiv).
47+ */
48+ struct
49+ {
50+ uint32 mul ;
51+ uint32 mod ;
52+ uint8 shift1 :1 ;
53+ uint8 shift2 :7 ;
54+ } md ;
55+
4056 /*
4157 * Statistics. These counters should be wide enough that they can't
4258 * overflow during a single bgwriter cycle.
@@ -86,10 +102,67 @@ static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy,
86102static void AddBufferToRing (BufferAccessStrategy strategy ,
87103 BufferDesc * buf );
88104
89- #define CLOCKSWEEP_HAND (counter ) \
90- ((counter) & 0xFFFFFFFF) % NBuffers
91- #define CLOCKSWEEP_PASSES (counter ) \
92- (uint32) ((counter) / NBuffers)
105+ static inline uint32
106+ InvariantDivision (uint64 n )
107+ {
108+ /* Compute quotient using multiplication */
109+ uint64 product = n * StrategyControl -> md .mul ;
110+ uint32 quotient = (uint32 ) (product >> 32 );
111+
112+ /*
113+ * The invariant multiplication gives us an approximation that may be off
114+ * by 1.
115+ */
116+ n -= quotient ;
117+ n >>= StrategyControl -> md .shift1 ;
118+ n += quotient ;
119+ n >>= StrategyControl -> md .shift2 ;
120+
121+ return n ;
122+ }
123+
124+ static inline uint32
125+ InvariantModulo (uint64 n )
126+ {
127+ /* Compute quotient using multiplication */
128+ uint64 product = n * StrategyControl -> md .mul ;
129+ uint32 quotient = (uint32 ) (product >> 32 );
130+ uint32 on = n ;
131+
132+ /*
133+ * The invariant multiplication gives us an approximation that may be off
134+ * by 1.
135+ */
136+ n -= quotient ;
137+ n >>= StrategyControl -> md .shift1 ;
138+ n += quotient ;
139+ n >>= StrategyControl -> md .shift2 ;
140+
141+ quotient = StrategyControl -> md .mod * n ;
142+ return on - quotient ;
143+ }
144+
145+ static inline uint32
146+ ClockSweepHand (uint64 counter )
147+ {
148+ uint32 result = InvariantModulo (counter );
149+
150+ Assert (result < NBuffers );
151+ Assert (result == (uint32 ) counter % NBuffers );
152+
153+ return result ;
154+ }
155+
156+ static inline uint32
157+ ClockSweepPasses (uint64 counter )
158+ {
159+ uint32 result = InvariantDivision (counter );
160+
161+ /* Verify our result matches standard division */
162+ Assert (result == (uint32 ) (counter / NBuffers ));
163+
164+ return result ;
165+ }
93166
94167/*
95168 * ClockSweepTick - Helper routine for StrategyGetBuffer()
@@ -110,7 +183,7 @@ ClockSweepTick(void)
110183 */
111184 counter = pg_atomic_fetch_add_u64 (& StrategyControl -> clockSweepCounter , 1 );
112185
113- hand = CLOCKSWEEP_HAND (counter );
186+ hand = ClockSweepHand (counter );
114187 Assert (hand < NBuffers );
115188
116189 return hand ;
@@ -244,10 +317,10 @@ StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc)
244317 uint32 result ;
245318
246319 counter = pg_atomic_read_u64 (& StrategyControl -> clockSweepCounter );
247- result = CLOCKSWEEP_HAND (counter );
320+ result = ClockSweepHand (counter );
248321
249322 if (complete_passes )
250- * complete_passes = CLOCKSWEEP_PASSES (counter );
323+ * complete_passes = ClockSweepPasses (counter );
251324
252325 if (num_buf_alloc )
253326 * num_buf_alloc = pg_atomic_exchange_u32 (& StrategyControl -> numBufferAllocs , 0 );
@@ -326,11 +399,27 @@ StrategyInitialize(bool init)
326399
327400 if (!found )
328401 {
402+ uint8 shift2 = 0 ;
403+ uint32 divisor = NBuffers ;
404+ uint8 is_pow2 = (divisor & (divisor - 1 )) == 0 ? 0 : 1 ;
405+
329406 /*
330407 * Only done once, usually in postmaster
331408 */
332409 Assert (init );
333410
411+ /* Calculate the constants used for speeding up division and modulo */
412+ Assert (NBuffers > 0 && NBuffers < (1U << 31 ));
413+
414+ /* shift2 = ilog(NBuffers) */
415+ for (uint32 n = divisor ; n >>= 1 ;)
416+ shift2 ++ ;
417+
418+ StrategyControl -> md .shift1 = is_pow2 ;
419+ StrategyControl -> md .shift2 = shift2 ;
420+ StrategyControl -> md .mod = NBuffers ;
421+ StrategyControl -> md .mul = (1ULL << (32 + is_pow2 + shift2 )) / NBuffers + 1 ;
422+
334423 /* Initialize combined clock-sweep pointer/complete passes counter */
335424 pg_atomic_init_u64 (& StrategyControl -> clockSweepCounter , 0 );
336425
0 commit comments