From 762867bd18192486174b364515126254b01f7063 Mon Sep 17 00:00:00 2001 From: venkata kishore <61379748+venkata-kishore@users.noreply.github.com> Date: Fri, 28 Mar 2025 23:13:44 +0530 Subject: [PATCH 1/6] bump pom version --- pom.xml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 47a7450..1814602 100644 --- a/pom.xml +++ b/pom.xml @@ -7,18 +7,18 @@ today.bonfire.oss bonfire-oss-parent - 1.1.8 + 1.1.11 today.bonfire.oss bth4j - 2.3.0 + 2.4.1 jar UTF-8 bonfire.oss.bth4j - 5.2.1 + 5.2.2 9.2.1 @@ -63,6 +63,7 @@ org.apache.commons commons-lang3 + ${commons-lang3.version} @@ -82,7 +83,7 @@ com.alibaba.fastjson2 fastjson2 - 2.0.53 + 2.0.56 test From 34cbd1910ad26f1373d8a62073cc58b5f8af84f5 Mon Sep 17 00:00:00 2001 From: venkata kishore <61379748+venkata-kishore@users.noreply.github.com> Date: Fri, 28 Mar 2025 23:16:24 +0530 Subject: [PATCH 2/6] added a new method --- .../bonfire/oss/bth4j/service/TaskOps.java | 142 +++++++++--------- 1 file changed, 75 insertions(+), 67 deletions(-) diff --git a/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java b/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java index 5153633..0ec317b 100644 --- a/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java +++ b/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java @@ -58,10 +58,6 @@ Task getTaskFromQueue(String queueName) { return new Task(t, eventParser); } - public void addTaskToQueue(Task task, Object data) { - addTaskToQueue(task, data, false); - } - /** * */ @@ -86,65 +82,10 @@ void addTaskToQueue(Task task, Object data, boolean isDataRaw) { log.info("Added {} to queue: {}", task, queueName); } - public String addRecurringTask(Task task) { - jedis.hset(keys.RECURRING_TASK_SET, task.taskString(), Instant.now().getEpochSecond() + ""); - log.info("Added recurring task: {}", task); - return task.taskString(); - } - - public void deleteRecurringTask(Task task) { - jedis.hdel(keys.RECURRING_TASK_SET, task.taskString()); - log.info("Deleted recurring task: {}", task); - } - - /** - * @param uniqueId unique identifier for the task - * @return data associated with the given task id. - * Will return null if task id is blank or no data is associated with the given task id. - * This will not do any conversion of the data to the desired type. just the pure string value. - * Use {@link #getDataForTask(String, Class)} for type conversion - */ - public String getDataForTask(String uniqueId) { - if (StringUtils.isBlank(uniqueId)) return null; - return jedis.get(keys.DATA + uniqueId); - } - - /** - * Gets the data associated with the given task id. - * This will try to convert the data to the given type. - * If the task id is blank or no data is associated with the given task id or - * if the type conversion fails, the method will return throw an exception. - * - * @param uniqueId unique identifier for the task - * @param clazz the desired type of the data - * @return data associated with the given task id, converted to the given type. - * May return null. - */ - public T getDataForTask(String uniqueId, Class clazz) { - if (StringUtils.isBlank(uniqueId) || clazz == null) { - return null; - } - try { - var r = jedis.get(keys.DATA + uniqueId); - return mapper.fromJson(r, clazz); - } catch (Exception e) { - log.error("Error getting data for task: {}", uniqueId, e); - throw new TaskDataException("Error getting data for task: " + uniqueId); - } - } - - public T getDataForTask(Task task, Class clazz) { - return getDataForTask(task.uniqueId(), clazz); - } - int incrementRetryCount(String uniqueId) { return (int) jedis.hincrBy(keys.TASK_RETRY_COUNT, uniqueId, 1); } - public void updateExecutionTimeForRecurringTasks(String key, Instant instant) { - jedis.hset(keys.RECURRING_TASK_SET, key, String.valueOf(instant.getEpochSecond())); - } - String getLock(String lockName) { return jedis.get(lockName); } @@ -163,10 +104,6 @@ void releaseLock(String lockDelayedTasks) { jedis.del(lockDelayedTasks); } - public long queueSize(String queueName) { - return jedis.llen(queueName); - } - void deleteTaskFromInProgressQueue(Task task) { try (var pipeline = jedis.pipelined()) { pipeline.del(keys.DATA + task.uniqueId()); @@ -208,10 +145,6 @@ void deleteFromRotationListAndAddToQueue(List items) { } } - public long getNumberOfTaskInProgress() { - return jedis.zcard(keys.IN_PROGRESS_TASKS); - } - ScanResult scanSortedSet(String key, String cursor) { return jedis.zscan(key, cursor); } @@ -277,4 +210,79 @@ void removeTasksFromSortedSetBasedOnTime(String key, Instant time) { }); } while (true); } + + public void addTaskToQueue(Task task, Object data) { + addTaskToQueue(task, data, false); + } + + public String addRecurringTask(Task task) { + jedis.hset(keys.RECURRING_TASK_SET, task.taskString(), Instant.now().getEpochSecond() + ""); + log.info("Added recurring task: {}", task); + return task.taskString(); + } + + public void deleteRecurringTask(Task task) { + jedis.hdel(keys.RECURRING_TASK_SET, task.taskString()); + log.info("Deleted recurring task: {}", task); + } + + public void deleteAllRecurringTasks() { + log.info("{} recurring tasks found", jedis.hlen(keys.RECURRING_TASK_SET)); + jedis.del(keys.RECURRING_TASK_SET); + log.info("Deleted all recurring tasks"); + } + + /** + * @param uniqueId unique identifier for the task + * + * @return data associated with the given task id. + * Will return null if task id is blank or no data is associated with the given task id. + * This will not do any conversion of the data to the desired type. just the pure string value. + * Use {@link #getDataForTask(String, Class)} for type conversion + */ + public String getDataForTask(String uniqueId) { + if (StringUtils.isBlank(uniqueId)) return null; + return jedis.get(keys.DATA + uniqueId); + } + + /** + * Gets the data associated with the given task id. + * This will try to convert the data to the given type. + * If the task id is blank or no data is associated with the given task id or + * if the type conversion fails, the method will return throw an exception. + * + * @param uniqueId unique identifier for the task + * @param clazz the desired type of the data + * + * @return data associated with the given task id, converted to the given type. + * May return null. + */ + public T getDataForTask(String uniqueId, Class clazz) { + if (StringUtils.isBlank(uniqueId) || clazz == null) { + return null; + } + try { + var r = jedis.get(keys.DATA + uniqueId); + return mapper.fromJson(r, clazz); + } catch (Exception e) { + log.error("Error getting data for task: {}", uniqueId, e); + throw new TaskDataException("Error getting data for task: " + uniqueId); + } + } + + public T getDataForTask(Task task, Class clazz) { + return getDataForTask(task.uniqueId(), clazz); + } + + public void updateExecutionTimeForRecurringTasks(String key, Instant instant) { + jedis.hset(keys.RECURRING_TASK_SET, key, String.valueOf(instant.getEpochSecond())); + } + + public long queueSize(String queueName) { + return jedis.llen(queueName); + } + + public long getNumberOfTaskInProgress() { + return jedis.zcard(keys.IN_PROGRESS_TASKS); + } } From 1c97611d2353ccd13cd4a158cbf35255a0fe0931 Mon Sep 17 00:00:00 2001 From: venkata kishore <61379748+venkata-kishore@users.noreply.github.com> Date: Tue, 1 Apr 2025 00:34:59 +0530 Subject: [PATCH 3/6] release alpha version --- pom.xml | 2 +- .../oss/bth4j/service/TaskRunnerWrapper.java | 52 +++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pom.xml b/pom.xml index 1814602..c30bda8 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ today.bonfire.oss bth4j - 2.4.1 + 2.4.1-ALPHA jar diff --git a/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java b/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java index 6fea063..729b96f 100644 --- a/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java +++ b/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java @@ -27,6 +27,29 @@ public class TaskRunnerWrapper implements Runnable { this.taskOps = taskOps; } + private void rescheduleTask(Task task, long delay) { + var newTask = Task.Builder.newTask() + .event(task.event()) + .accountId(task.accountId()) + .queueName(task.queueName()) + .executeAfter(delay) + .build(); + taskOps.addTaskToQueue(newTask, taskOps.getDataForTask(task.uniqueId()), true); + log.info("Rescheduling old task {} to run after {} seconds, new task id {}", + task.uniqueId(), delay, newTask.uniqueId()); + deleteTask(task); + } + + private void deleteTask(Task task) { + // delete the task and data if present + taskOps.deleteTaskFromInProgressQueue(task); + } + + private void moveToDeadQueue(Task task) { + // move to dead list if task failed because of BGTaskUnrecoverableException + taskOps.moveToDeadQueue(task); + } + /** * if task fails it may be retired automatically by maintenance service * unless the exception is UnrecoverableException in which case the task is marked @@ -41,44 +64,21 @@ public void run() { callbacks.onSuccess().accept(task); } catch (Exception e) { callbacks.onError().accept(task, e); - log.error("Task {} failed", task.taskString(), e); if (e instanceof TaskUnrecoverableException || e instanceof TaskDataException) { + log.error("Task {} failed", task.taskString(), e); // task will not be retried moveToDeadQueue(task); } else if (e instanceof TaskErrorException) { // task can be retried - log.info("Task will be retried"); + log.info("Task {} failed", task.taskString(), e); } else if (e instanceof TaskRescheduleException ex) { rescheduleTask(task, ex.delay()); } else { - // unknown exception task can be retried + // unknown exception task may be retried log.info("Unhandled exception. Task will be retried"); } } finally { callbacks.afterTask().accept(task); } } - - private void rescheduleTask(Task task, long delay) { - var newTask = Task.Builder.newTask() - .event(task.event()) - .accountId(task.accountId()) - .queueName(task.queueName()) - .executeAfter(delay) - .build(); - taskOps.addTaskToQueue(newTask, taskOps.getDataForTask(task.uniqueId()), true); - log.info("Rescheduling old task {} to run after {} seconds, new task id {}", - task.uniqueId(), delay, newTask.uniqueId()); - deleteTask(task); - } - - private void deleteTask(Task task) { - // delete the task and data if present - taskOps.deleteTaskFromInProgressQueue(task); - } - - private void moveToDeadQueue(Task task) { - // move to dead list if task failed because of BGTaskUnrecoverableException - taskOps.moveToDeadQueue(task); - } } From 56504bddcbcb1604290c13c309ee6a1d8a97090c Mon Sep 17 00:00:00 2001 From: venkata kishore <61379748+venkata-kishore@users.noreply.github.com> Date: Sun, 6 Apr 2025 00:13:07 +0530 Subject: [PATCH 4/6] minor ref --- .../today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java b/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java index 729b96f..5018d88 100644 --- a/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java +++ b/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java @@ -70,12 +70,12 @@ public void run() { moveToDeadQueue(task); } else if (e instanceof TaskErrorException) { // task can be retried - log.info("Task {} failed", task.taskString(), e); + log.warn("Task {} failed", task.taskString(), e); } else if (e instanceof TaskRescheduleException ex) { rescheduleTask(task, ex.delay()); } else { // unknown exception task may be retried - log.info("Unhandled exception. Task will be retried"); + log.warn("Unhandled exception. Task will be retried", e); } } finally { callbacks.afterTask().accept(task); From 5ae3fc064111d31927a46bb31a08870f65c2bd90 Mon Sep 17 00:00:00 2001 From: venkata kishore <61379748+venkata-kishore@users.noreply.github.com> Date: Tue, 8 Apr 2025 02:34:00 +0530 Subject: [PATCH 5/6] bump version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c30bda8..b32958e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ today.bonfire.oss bth4j - 2.4.1-ALPHA + 2.4.2-ALPHA jar From 54a7fd15b3d4aa65eb2c07b4e5a0d98b5d1329f6 Mon Sep 17 00:00:00 2001 From: venkata kishore <61379748+venkata-kishore@users.noreply.github.com> Date: Tue, 8 Apr 2025 03:18:59 +0530 Subject: [PATCH 6/6] bug fix for delete in progress task --- pom.xml | 2 +- src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b32958e..e80854e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ today.bonfire.oss bth4j - 2.4.2-ALPHA + 2.4.3-ALPHA jar diff --git a/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java b/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java index 0ec317b..99edb10 100644 --- a/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java +++ b/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java @@ -176,6 +176,7 @@ void deleteFromInProgressQueueAndAddToQueue( transaction.expire(keys.DATA + t.uniqueId(), THC.Time.T_30_DAYS); transaction.hdel(keys.TASK_RETRY_COUNT, t.uniqueId()); }); + transaction.exec(); } }