|
27 | 27 |
|
28 | 28 | import java.time.Duration; |
29 | 29 | import java.util.List; |
30 | | - |
31 | 30 | import org.springframework.core.convert.converter.Converter; |
32 | 31 | import org.springframework.data.redis.connection.RedisStreamCommands; |
33 | 32 | import org.springframework.data.redis.connection.RedisStreamCommands.MaxLenTrimStrategy; |
34 | 33 | import org.springframework.data.redis.connection.RedisStreamCommands.MinIdTrimStrategy; |
35 | 34 | import org.springframework.data.redis.connection.RedisStreamCommands.TrimOperator; |
36 | 35 | import org.springframework.data.redis.connection.RedisStreamCommands.TrimOptions; |
37 | 36 | import org.springframework.data.redis.connection.RedisStreamCommands.TrimStrategy; |
38 | | -import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; |
39 | 37 | import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; |
40 | | -import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; |
| 38 | +import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; |
41 | 39 | import org.springframework.data.redis.connection.RedisStreamCommands.XDelOptions; |
| 40 | +import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions; |
42 | 41 | import org.springframework.data.redis.connection.stream.ByteRecord; |
43 | 42 | import org.springframework.data.redis.connection.stream.Consumer; |
44 | 43 | import org.springframework.data.redis.connection.stream.PendingMessagesSummary; |
@@ -82,15 +81,66 @@ static XClaimArgs toXClaimArgs(XClaimOptions options) { |
82 | 81 | } |
83 | 82 |
|
84 | 83 | static XAddArgs toXAddArgs(RecordId recordId, XAddOptions options) { |
85 | | - XAddArgs args = XAddOptionsToXAddArgsConverter.INSTANCE.convert(options); |
| 84 | + |
| 85 | + XAddArgs args = new XAddArgs(); |
| 86 | + |
| 87 | + if (options.isNoMkStream()) { |
| 88 | + args.nomkstream(); |
| 89 | + } |
| 90 | + |
| 91 | + if (options.hasTrimOptions()) { |
| 92 | + TrimOptions trimOptions = options.getTrimOptions(); |
| 93 | + TrimStrategy trimStrategy = trimOptions.getTrimStrategy(); |
| 94 | + if (trimStrategy instanceof MaxLenTrimStrategy maxLenTrimStrategy) { |
| 95 | + args.maxlen(maxLenTrimStrategy.threshold()); |
| 96 | + } else if (trimStrategy instanceof MinIdTrimStrategy minIdTrimStrategy) { |
| 97 | + args.minId(minIdTrimStrategy.threshold().getValue()); |
| 98 | + } |
| 99 | + |
| 100 | + if (trimOptions.hasLimit()) { |
| 101 | + args.limit(trimOptions.getLimit()); |
| 102 | + } |
| 103 | + |
| 104 | + args.exactTrimming(trimOptions.getTrimOperator() == TrimOperator.EXACT); |
| 105 | + args.approximateTrimming(trimOptions.getTrimOperator() == TrimOperator.APPROXIMATE); |
| 106 | + |
| 107 | + if (trimOptions.hasDeletionPolicy()) { |
| 108 | + args.trimmingMode(toStreamDeletionPolicy(trimOptions.getPendingReferences())); |
| 109 | + } |
| 110 | + } |
| 111 | + |
86 | 112 | if (!recordId.shouldBeAutoGenerated()) { |
87 | 113 | args.id(recordId.getValue()); |
88 | 114 | } |
| 115 | + |
89 | 116 | return args; |
90 | 117 | } |
91 | 118 |
|
92 | 119 | static XTrimArgs toXTrimArgs(XTrimOptions options) { |
93 | | - return XTrimOptionsToXTrimArgsConverter.INSTANCE.convert(options); |
| 120 | + |
| 121 | + XTrimArgs args = new XTrimArgs(); |
| 122 | + |
| 123 | + TrimOptions trimOptions = options.getTrimOptions(); |
| 124 | + TrimStrategy trimStrategy = trimOptions.getTrimStrategy(); |
| 125 | + if (trimStrategy instanceof MaxLenTrimStrategy maxLenTrimStrategy) { |
| 126 | + args.maxlen(maxLenTrimStrategy.threshold()); |
| 127 | + } |
| 128 | + else if (trimStrategy instanceof MinIdTrimStrategy minIdTrimStrategy) { |
| 129 | + args.minId(minIdTrimStrategy.threshold().getValue()); |
| 130 | + } |
| 131 | + |
| 132 | + if (trimOptions.hasLimit()) { |
| 133 | + args.limit(trimOptions.getLimit()); |
| 134 | + } |
| 135 | + |
| 136 | + args.exactTrimming(trimOptions.getTrimOperator() == TrimOperator.EXACT); |
| 137 | + args.approximateTrimming(trimOptions.getTrimOperator() == TrimOperator.APPROXIMATE); |
| 138 | + |
| 139 | + if (trimOptions.hasDeletionPolicy()) { |
| 140 | + args.trimmingMode(toStreamDeletionPolicy(trimOptions.getPendingReferences())); |
| 141 | + } |
| 142 | + |
| 143 | + return args; |
94 | 144 | } |
95 | 145 |
|
96 | 146 | static StreamDeletionPolicy toXDelArgs(XDelOptions options) { |
@@ -200,84 +250,6 @@ public XClaimArgs convert(XClaimOptions source) { |
200 | 250 | } |
201 | 251 | } |
202 | 252 |
|
203 | | - /** |
204 | | - * {@link Converter} to convert {@link XAddOptions} to Lettuce's {@link XAddArgs}. |
205 | | - * |
206 | | - * @since 4.1 |
207 | | - */ |
208 | | - enum XAddOptionsToXAddArgsConverter implements Converter<XAddOptions, XAddArgs> { |
209 | | - |
210 | | - INSTANCE; |
211 | | - |
212 | | - @Override |
213 | | - public XAddArgs convert(XAddOptions source) { |
214 | | - |
215 | | - XAddArgs args = new XAddArgs(); |
216 | | - |
217 | | - if (source.isNoMkStream()) { |
218 | | - args.nomkstream(); |
219 | | - } |
220 | | - |
221 | | - if (!source.hasTrimOptions()) { |
222 | | - return args; |
223 | | - } |
224 | | - |
225 | | - TrimOptions trimOptions = source.getTrimOptions(); |
226 | | - TrimStrategy trimStrategy = trimOptions.getTrimStrategy(); |
227 | | - if (trimStrategy instanceof MaxLenTrimStrategy maxLenTrimStrategy) { |
228 | | - args.maxlen(maxLenTrimStrategy.threshold()); |
229 | | - } |
230 | | - else if (trimStrategy instanceof MinIdTrimStrategy minIdTrimStrategy) { |
231 | | - args.minId(minIdTrimStrategy.threshold().getValue()); |
232 | | - } |
233 | | - |
234 | | - if (trimOptions.hasLimit()) { |
235 | | - args.limit(trimOptions.getLimit()); |
236 | | - } |
237 | | - |
238 | | - args.exactTrimming(trimOptions.getTrimOperator() == TrimOperator.EXACT); |
239 | | - args.approximateTrimming(trimOptions.getTrimOperator() == TrimOperator.APPROXIMATE); |
240 | | - |
241 | | - if (trimOptions.hasDeletionPolicy()) { |
242 | | - args.trimmingMode(toStreamDeletionPolicy(trimOptions.getPendingReferences())); |
243 | | - } |
244 | | - |
245 | | - return args; |
246 | | - } |
247 | | - } |
248 | | - |
249 | | - enum XTrimOptionsToXTrimArgsConverter implements Converter<XTrimOptions, XTrimArgs> { |
250 | | - INSTANCE; |
251 | | - |
252 | | - @Override |
253 | | - public XTrimArgs convert(XTrimOptions source) { |
254 | | - |
255 | | - XTrimArgs args = new XTrimArgs(); |
256 | | - |
257 | | - TrimOptions trimOptions = source.getTrimOptions(); |
258 | | - TrimStrategy trimStrategy = trimOptions.getTrimStrategy(); |
259 | | - if (trimStrategy instanceof MaxLenTrimStrategy maxLenTrimStrategy) { |
260 | | - args.maxlen(maxLenTrimStrategy.threshold()); |
261 | | - } |
262 | | - else if (trimStrategy instanceof MinIdTrimStrategy minIdTrimStrategy) { |
263 | | - args.minId(minIdTrimStrategy.threshold().getValue()); |
264 | | - } |
265 | | - |
266 | | - if (trimOptions.hasLimit()) { |
267 | | - args.limit(trimOptions.getLimit()); |
268 | | - } |
269 | | - |
270 | | - args.exactTrimming(trimOptions.getTrimOperator() == TrimOperator.EXACT); |
271 | | - args.approximateTrimming(trimOptions.getTrimOperator() == TrimOperator.APPROXIMATE); |
272 | | - |
273 | | - if (trimOptions.hasDeletionPolicy()) { |
274 | | - args.trimmingMode(toStreamDeletionPolicy(trimOptions.getPendingReferences())); |
275 | | - } |
276 | | - |
277 | | - return args; |
278 | | - } |
279 | | - } |
280 | | - |
281 | 253 | public static StreamDeletionPolicy toStreamDeletionPolicy(RedisStreamCommands.StreamDeletionPolicy deletionPolicy) { |
282 | 254 |
|
283 | 255 | return switch (deletionPolicy) { |
|
0 commit comments