Skip to content

Commit 0884bf0

Browse files
add worker heart beat (#290)
* add worker heart beat * Apply Palantir Java Format * wip * fixed: failing test * Apply Palantir Java Format --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent aff8ae7 commit 0884bf0

File tree

75 files changed

+4099
-242
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+4099
-242
lines changed

docs/CHANGELOG.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,30 @@ layout: default
88

99
All notable user-facing changes to this project are documented in this file.
1010

11+
## Release [4.0.0.RC2] 24-Mar-2026
12+
13+
{: .highlight}
14+
This is a release candidate for 4.0.0. It targets Spring Boot 4.x and Spring Framework 7.x.
15+
Please test thoroughly before using in production.
16+
17+
### Features
18+
* **Pluggable message ID generation** — added `RqueueMessageIdGenerator` with a
19+
default UUIDv4 implementation so applications can override message ID generation
20+
with a custom bean, including time-ordered strategies such as UUIDv7.
21+
* **Worker registry for dashboard visibility** — added an optional
22+
`rqueue.worker.registry.enabled` registry that tracks worker metadata and
23+
queue-level poller activity for dashboard use.
24+
* **Workers dashboard page** — added a dedicated workers view showing worker
25+
identity, queue pollers, last poll activity, and recent capacity exhaustion.
26+
* **Queue and workers pagination** — added server-side pagination for dashboard
27+
queue and worker listings, with configurable page sizes.
28+
* **Dashboard enqueue controls for scheduled messages** — messages in scheduled
29+
queues can now be moved back to the main queue from the dashboard, including
30+
explicit front/rear enqueue options for non-periodic messages.
31+
* **Dashboard refresh and usability improvements** — refreshed queue, worker, and
32+
explorer UI with improved layouts, duration formatting, feedback modals, and
33+
more readable queue metadata.
34+
1135
## Release [4.0.0.RC1] 18-Mar-2026
1236

1337
{: .highlight}

docs/configuration/configuration.md

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,49 @@ The serialized form encodes both the envelope class and the type parameter:
242242
`List<Event<Order>>` are not supported.
243243
- Multi-level nesting (e.g. `Wrapper<Event<T>>`) is not supported.
244244

245+
## Message ID Generator
246+
247+
Rqueue now resolves message IDs through the `RqueueMessageIdGenerator` abstraction.
248+
By default, Rqueue registers a UUIDv4-based implementation, but applications can
249+
override it by defining their own bean.
250+
251+
This is useful when you need:
252+
253+
- time-ordered IDs such as UUIDv7
254+
- custom prefixes or tenant-aware IDs
255+
- IDs generated by an external system
256+
257+
```java
258+
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
259+
260+
@Configuration
261+
public class RqueueConfiguration {
262+
263+
@Bean
264+
public RqueueMessageIdGenerator rqueueMessageIdGenerator() {
265+
return () -> java.util.UUID.randomUUID().toString();
266+
}
267+
}
268+
```
269+
270+
{: .note}
271+
The default implementation is still UUIDv4. Custom generators are applied to the
272+
normal enqueue APIs such as `enqueue`, `enqueueIn`, `enqueueAt`, and `enqueuePeriodic`
273+
whenever Rqueue generates the message ID internally.
274+
245275
## Additional Configuration
246276

247277
- **`rqueue.retry.per.poll`**: Determines how many times a polled message is retried
248278
immediately if processing fails, before it is moved back to the queue for a
249279
subsequent poll. The default value is `1`. If increased to `N`, the message will
250280
be retried `N` times consecutively within the same polling cycle.
251-
281+
- **`rqueue.worker.registry.enabled`**: Enables worker and queue-poller tracking for
282+
the dashboard. Default: `true`.
283+
- **`rqueue.worker.registry.worker.ttl`**: TTL in seconds for worker metadata stored
284+
in Redis. Default: `300`.
285+
- **`rqueue.worker.registry.worker.heartbeat.interval`**: Interval in seconds for
286+
refreshing worker metadata. Default: `60`.
287+
- **`rqueue.worker.registry.queue.ttl`**: TTL in seconds for queue poller hashes.
288+
Default: `3600`.
289+
- **`rqueue.worker.registry.queue.heartbeat.interval`**: Interval in seconds for
290+
queue poller heartbeats. Default: `15`.

docs/dashboard.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ message processing.
2525

2626
* **Task Operations**: Facilitates moving tasks between different queues.
2727

28+
* **Worker Visibility**: Shows which worker is polling a queue, when it last polled,
29+
and whether the queue recently ran out of execution capacity.
30+
31+
* **Scheduled Message Recovery**: Allows non-periodic scheduled messages to be moved
32+
back to the main queue from the dashboard, either at the front or rear.
33+
2834
Access the dashboard at: [http://localhost:8080/rqueue](http://localhost:8080/rqueue)
2935

3036
## Configuration
@@ -71,6 +77,8 @@ Example URL with a configured prefix:
7177
* `rqueue.web.enable`: Enable or disable the web dashboard (default: `true`).
7278
* `rqueue.web.max.message.move.count`: Maximum number of messages to move in a single request
7379
from the utility tab (default: `1000`).
80+
* `rqueue.web.queue.page.size`: Number of queue cards shown per page (default: `12`).
81+
* `rqueue.web.worker.page.size`: Number of worker cards shown per page (default: `10`).
7482
* `rqueue.web.collect.listener.stats`: Enable collection of task execution statistics
7583
(default: `false`).
7684
* `rqueue.web.collect.listener.stats.thread.count`: Number of threads used for metrics aggregation.
@@ -82,6 +90,40 @@ Example URL with a configured prefix:
8290
* `rqueue.web.collect.statistic.aggregate.shutdown.wait.time`: Wait time in milliseconds for
8391
forced aggregation of pending events during application shutdown.
8492

93+
## Worker Registry
94+
95+
The dashboard can optionally maintain lightweight worker metadata in Redis to show:
96+
97+
- worker host and process ID
98+
- queue-level polling activity
99+
- recent queue capacity exhaustion
100+
- worker and queue drill-down views
101+
102+
This feature is controlled by the following properties:
103+
104+
- `rqueue.worker.registry.enabled`
105+
- `rqueue.worker.registry.worker.ttl`
106+
- `rqueue.worker.registry.worker.heartbeat.interval`
107+
- `rqueue.worker.registry.queue.ttl`
108+
- `rqueue.worker.registry.queue.heartbeat.interval`
109+
110+
{: .note}
111+
The worker registry is intended for dashboard visibility. Instance-level liveness
112+
should still be monitored through your infrastructure or platform health checks.
113+
114+
## Queue Explorer Actions
115+
116+
The queue explorer supports queue-specific administrative actions:
117+
118+
- delete pending, running, dead-letter, or scheduled messages
119+
- move messages between Redis collections from the Utility tab
120+
- enqueue scheduled messages back to the main queue
121+
122+
For scheduled messages:
123+
124+
- periodic messages can be deleted, but are not offered queue-to-front or queue-to-rear actions
125+
- non-periodic scheduled messages can be queued to the front or rear of the main queue
126+
85127
### Dashboard Screenshots
86128

87129
#### Latency Graph

docs/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ model through annotation-driven APIs and minimal setup.
5252
* Use callbacks for dead-letter, discard, and related flows
5353
* Subscribe to bootstrap and task execution events
5454
* Monitor in-flight, queued, and scheduled messages with metrics
55-
* Use the built-in web dashboard for queue visibility and monitoring
55+
* Use the built-in web dashboard for queue visibility, worker activity, and message operations
56+
* Override message ID generation with a custom `RqueueMessageIdGenerator` bean
5657

5758
* **Redis and platform support**
5859
* Support Redis standalone, Sentinel, and Cluster setups

rqueue-core/src/main/java/com/github/sonus21/rqueue/common/RqueueRedisTemplate.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public Long rpush(String listName, V val) {
6666
return redisTemplate.opsForList().rightPush(listName, val);
6767
}
6868

69+
public Long lpush(String listName, V val) {
70+
return redisTemplate.opsForList().leftPush(listName, val);
71+
}
72+
6973
public Long addToSet(String setName, V... values) {
7074
return redisTemplate.opsForSet().add(setName, values);
7175
}
@@ -98,6 +102,23 @@ public void set(String key, V val, Duration duration) {
98102
redisTemplate.opsForValue().set(key, val, duration.toMillis(), TimeUnit.MILLISECONDS);
99103
}
100104

105+
public void putHashValue(String key, String hashKey, V val) {
106+
redisTemplate.opsForHash().put(key, hashKey, val);
107+
}
108+
109+
@SuppressWarnings("unchecked")
110+
public Map<String, V> getHashEntries(String key) {
111+
return (Map<String, V>) (Map<?, ?>) redisTemplate.opsForHash().entries(key);
112+
}
113+
114+
public Long deleteHashValues(String key, String... hashKeys) {
115+
return redisTemplate.opsForHash().delete(key, (Object[]) hashKeys);
116+
}
117+
118+
public Boolean expire(String key, Duration duration) {
119+
return redisTemplate.expire(key, duration.toMillis(), TimeUnit.MILLISECONDS);
120+
}
121+
101122
public Boolean setIfAbsent(String lockKey, V val, Duration duration) {
102123
boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, val);
103124
if (result) {

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
@Configuration
4444
public class RqueueConfig {
4545

46+
@Getter
4647
private static final String brokerId = UUID.randomUUID().toString();
48+
4749
private static final AtomicLong counter = new AtomicLong(1);
4850
private final RedisConnectionFactory connectionFactory;
4951
private final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;
@@ -139,9 +141,26 @@ public class RqueueConfig {
139141
@Value("${rqueue.completed.job.cleanup.interval:30000}")
140142
private long completedJobCleanupIntervalInMs;
141143

142-
public static String getBrokerId() {
143-
return brokerId;
144-
}
144+
@Value("${rqueue.worker.registry.enabled:true}")
145+
private boolean workerRegistryEnabled;
146+
147+
@Value("${rqueue.worker.registry.worker.ttl:300}")
148+
private long workerRegistryWorkerTtlInSeconds;
149+
150+
@Value("${rqueue.worker.registry.worker.heartbeat.interval:60}")
151+
private long workerRegistryWorkerHeartbeatIntervalInSeconds;
152+
153+
@Value("${rqueue.worker.registry.queue.ttl:3600}")
154+
private long workerRegistryQueueTtlInSeconds;
155+
156+
@Value("${rqueue.worker.registry.queue.heartbeat.interval:15}")
157+
private long workerRegistryQueueHeartbeatIntervalInSeconds;
158+
159+
@Value("${rqueue.worker.registry.key.prefix:worker::}")
160+
private String workerRegistryKeyPrefix;
161+
162+
@Value("${rqueue.worker.registry.queue.key.prefix:q-pollers::}")
163+
private String workerRegistryQueueKeyPrefix;
145164

146165
public boolean messageInTerminalStateShouldBeStored() {
147166
return getMessageDurabilityInTerminalStateInSecond() > 0;
@@ -294,6 +313,14 @@ public String getJobsKey(String messageId) {
294313
return prefix + jobsCollectionNamePrefix + messageId;
295314
}
296315

316+
public String getWorkerRegistryKey(String workerId) {
317+
return prefix + workerRegistryKeyPrefix + workerId;
318+
}
319+
320+
public String getWorkerRegistryQueueKey(String queueName) {
321+
return prefix + workerRegistryQueueKeyPrefix + getTaggedName(queueName);
322+
}
323+
297324
public String getDelDataName(String queueName) {
298325
return prefix
299326
+ delPrefix
@@ -307,6 +334,22 @@ public Duration getJobDurabilityInTerminalState() {
307334
return Duration.ofSeconds(jobDurabilityInTerminalStateInSecond);
308335
}
309336

337+
public Duration getWorkerRegistryWorkerTtl() {
338+
return Duration.ofSeconds(workerRegistryWorkerTtlInSeconds);
339+
}
340+
341+
public Duration getWorkerRegistryWorkerHeartbeatInterval() {
342+
return Duration.ofSeconds(workerRegistryWorkerHeartbeatIntervalInSeconds);
343+
}
344+
345+
public Duration getWorkerRegistryQueueTtl() {
346+
return Duration.ofSeconds(workerRegistryQueueTtlInSeconds);
347+
}
348+
349+
public Duration getWorkerRegistryQueueHeartbeatInterval() {
350+
return Duration.ofSeconds(workerRegistryQueueHeartbeatIntervalInSeconds);
351+
}
352+
310353
public String getLibVersion() {
311354
if (StringUtils.isEmpty(version)) {
312355
ClassPathResource resource =

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,23 @@
2525
import com.github.sonus21.rqueue.core.ProcessingQueueMessageScheduler;
2626
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
2727
import com.github.sonus21.rqueue.core.RqueueInternalPubSubChannel;
28+
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
2829
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2930
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
3031
import com.github.sonus21.rqueue.core.ScheduledQueueMessageScheduler;
3132
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
33+
import com.github.sonus21.rqueue.core.impl.UuidV4RqueueMessageIdGenerator;
3234
import com.github.sonus21.rqueue.dao.RqueueStringDao;
3335
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
3436
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3537
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
3638
import com.github.sonus21.rqueue.utils.RedisUtils;
39+
import com.github.sonus21.rqueue.utils.condition.MissingRqueueMessageIdGenerator;
3740
import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled;
3841
import com.github.sonus21.rqueue.utils.pebble.ResourceLoader;
3942
import com.github.sonus21.rqueue.utils.pebble.RqueuePebbleExtension;
43+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
44+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistryImpl;
4045
import io.pebbletemplates.pebble.PebbleEngine;
4146
import io.pebbletemplates.spring.extension.SpringExtension;
4247
import io.pebbletemplates.spring.reactive.PebbleReactiveViewResolver;
@@ -151,6 +156,12 @@ public RqueueWebConfig rqueueWebConfig() {
151156
return new RqueueWebConfig();
152157
}
153158

159+
@Bean
160+
@Conditional(MissingRqueueMessageIdGenerator.class)
161+
public RqueueMessageIdGenerator rqueueMessageIdGenerator() {
162+
return new UuidV4RqueueMessageIdGenerator();
163+
}
164+
154165
@Bean
155166
public RqueueSchedulerConfig rqueueSchedulerConfig() {
156167
return new RqueueSchedulerConfig();
@@ -215,6 +226,11 @@ public RqueueStringDao rqueueStringDao(RqueueConfig rqueueConfig) {
215226
return new RqueueStringDaoImpl(rqueueConfig);
216227
}
217228

229+
@Bean
230+
public RqueueWorkerRegistry rqueueWorkerRegistry(RqueueConfig rqueueConfig) {
231+
return new RqueueWorkerRegistryImpl(rqueueConfig);
232+
}
233+
218234
@Bean
219235
public RqueueLockManager rqueueLockManager(RqueueStringDao rqueueStringDao) {
220236
return new RqueueLockManagerImpl(rqueueStringDao);

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueWebConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ public class RqueueWebConfig {
4848
@Value("${rqueue.web.max.message.move.count:1000}")
4949
private int maxMessageMoveCount;
5050

51+
@Value("${rqueue.web.queue.page.size:12}")
52+
private int queuePageSize;
53+
54+
@Value("${rqueue.web.worker.page.size:10}")
55+
private int workerPageSize;
56+
5157
/**
5258
* Whether queue stats should be collected or not. When this flag is disabled, metric data won't
5359
* be available in the dashboard.

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
2626
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
2727
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
28+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
2829
import lombok.Getter;
2930
import lombok.Setter;
3031
import org.springframework.beans.factory.annotation.Autowired;
@@ -55,6 +56,9 @@ public class RqueueBeanProvider {
5556
@Autowired(required = false)
5657
private RqueueMetricsCounter rqueueMetricsCounter;
5758

59+
@Autowired(required = false)
60+
private RqueueWorkerRegistry rqueueWorkerRegistry;
61+
5862
@Autowired
5963
private RqueueMessageHandler rqueueMessageHandler;
6064

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.core;
18+
19+
@FunctionalInterface
20+
public interface RqueueMessageIdGenerator {
21+
22+
String generate();
23+
}

0 commit comments

Comments
 (0)