diff --git a/pom.xml b/pom.xml index 47a7450..e80854e 100644 --- a/pom.xml +++ b/pom.xml @@ -7,18 +7,18 @@ today.bonfire.oss bonfire-oss-parent - 1.1.8 + 1.1.11 today.bonfire.oss bth4j - 2.3.0 + 2.4.3-ALPHA jar UTF-8 bonfire.oss.bth4j - 5.2.1 + 5.2.2 9.2.1 @@ -63,6 +63,7 @@ org.apache.commons commons-lang3 + ${commons-lang3.version} @@ -82,7 +83,7 @@ com.alibaba.fastjson2 fastjson2 - 2.0.53 + 2.0.56 test diff --git a/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java b/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java index 5153633..99edb10 100644 --- a/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java +++ b/src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java @@ -58,10 +58,6 @@ Task getTaskFromQueue(String queueName) { return new Task(t, eventParser); } - public void addTaskToQueue(Task task, Object data) { - addTaskToQueue(task, data, false); - } - /** * */ @@ -86,65 +82,10 @@ void addTaskToQueue(Task task, Object data, boolean isDataRaw) { log.info("Added {} to queue: {}", task, queueName); } - public String addRecurringTask(Task task) { - jedis.hset(keys.RECURRING_TASK_SET, task.taskString(), Instant.now().getEpochSecond() + ""); - log.info("Added recurring task: {}", task); - return task.taskString(); - } - - public void deleteRecurringTask(Task task) { - jedis.hdel(keys.RECURRING_TASK_SET, task.taskString()); - log.info("Deleted recurring task: {}", task); - } - - /** - * @param uniqueId unique identifier for the task - * @return data associated with the given task id. - * Will return null if task id is blank or no data is associated with the given task id. - * This will not do any conversion of the data to the desired type. just the pure string value. - * Use {@link #getDataForTask(String, Class)} for type conversion - */ - public String getDataForTask(String uniqueId) { - if (StringUtils.isBlank(uniqueId)) return null; - return jedis.get(keys.DATA + uniqueId); - } - - /** - * Gets the data associated with the given task id. - * This will try to convert the data to the given type. - * If the task id is blank or no data is associated with the given task id or - * if the type conversion fails, the method will return throw an exception. - * - * @param uniqueId unique identifier for the task - * @param clazz the desired type of the data - * @return data associated with the given task id, converted to the given type. - * May return null. - */ - public T getDataForTask(String uniqueId, Class clazz) { - if (StringUtils.isBlank(uniqueId) || clazz == null) { - return null; - } - try { - var r = jedis.get(keys.DATA + uniqueId); - return mapper.fromJson(r, clazz); - } catch (Exception e) { - log.error("Error getting data for task: {}", uniqueId, e); - throw new TaskDataException("Error getting data for task: " + uniqueId); - } - } - - public T getDataForTask(Task task, Class clazz) { - return getDataForTask(task.uniqueId(), clazz); - } - int incrementRetryCount(String uniqueId) { return (int) jedis.hincrBy(keys.TASK_RETRY_COUNT, uniqueId, 1); } - public void updateExecutionTimeForRecurringTasks(String key, Instant instant) { - jedis.hset(keys.RECURRING_TASK_SET, key, String.valueOf(instant.getEpochSecond())); - } - String getLock(String lockName) { return jedis.get(lockName); } @@ -163,10 +104,6 @@ void releaseLock(String lockDelayedTasks) { jedis.del(lockDelayedTasks); } - public long queueSize(String queueName) { - return jedis.llen(queueName); - } - void deleteTaskFromInProgressQueue(Task task) { try (var pipeline = jedis.pipelined()) { pipeline.del(keys.DATA + task.uniqueId()); @@ -208,10 +145,6 @@ void deleteFromRotationListAndAddToQueue(List items) { } } - public long getNumberOfTaskInProgress() { - return jedis.zcard(keys.IN_PROGRESS_TASKS); - } - ScanResult scanSortedSet(String key, String cursor) { return jedis.zscan(key, cursor); } @@ -243,6 +176,7 @@ void deleteFromInProgressQueueAndAddToQueue( transaction.expire(keys.DATA + t.uniqueId(), THC.Time.T_30_DAYS); transaction.hdel(keys.TASK_RETRY_COUNT, t.uniqueId()); }); + transaction.exec(); } } @@ -277,4 +211,79 @@ void removeTasksFromSortedSetBasedOnTime(String key, Instant time) { }); } while (true); } + + public void addTaskToQueue(Task task, Object data) { + addTaskToQueue(task, data, false); + } + + public String addRecurringTask(Task task) { + jedis.hset(keys.RECURRING_TASK_SET, task.taskString(), Instant.now().getEpochSecond() + ""); + log.info("Added recurring task: {}", task); + return task.taskString(); + } + + public void deleteRecurringTask(Task task) { + jedis.hdel(keys.RECURRING_TASK_SET, task.taskString()); + log.info("Deleted recurring task: {}", task); + } + + public void deleteAllRecurringTasks() { + log.info("{} recurring tasks found", jedis.hlen(keys.RECURRING_TASK_SET)); + jedis.del(keys.RECURRING_TASK_SET); + log.info("Deleted all recurring tasks"); + } + + /** + * @param uniqueId unique identifier for the task + * + * @return data associated with the given task id. + * Will return null if task id is blank or no data is associated with the given task id. + * This will not do any conversion of the data to the desired type. just the pure string value. + * Use {@link #getDataForTask(String, Class)} for type conversion + */ + public String getDataForTask(String uniqueId) { + if (StringUtils.isBlank(uniqueId)) return null; + return jedis.get(keys.DATA + uniqueId); + } + + /** + * Gets the data associated with the given task id. + * This will try to convert the data to the given type. + * If the task id is blank or no data is associated with the given task id or + * if the type conversion fails, the method will return throw an exception. + * + * @param uniqueId unique identifier for the task + * @param clazz the desired type of the data + * + * @return data associated with the given task id, converted to the given type. + * May return null. + */ + public T getDataForTask(String uniqueId, Class clazz) { + if (StringUtils.isBlank(uniqueId) || clazz == null) { + return null; + } + try { + var r = jedis.get(keys.DATA + uniqueId); + return mapper.fromJson(r, clazz); + } catch (Exception e) { + log.error("Error getting data for task: {}", uniqueId, e); + throw new TaskDataException("Error getting data for task: " + uniqueId); + } + } + + public T getDataForTask(Task task, Class clazz) { + return getDataForTask(task.uniqueId(), clazz); + } + + public void updateExecutionTimeForRecurringTasks(String key, Instant instant) { + jedis.hset(keys.RECURRING_TASK_SET, key, String.valueOf(instant.getEpochSecond())); + } + + public long queueSize(String queueName) { + return jedis.llen(queueName); + } + + public long getNumberOfTaskInProgress() { + return jedis.zcard(keys.IN_PROGRESS_TASKS); + } } diff --git a/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java b/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java index 6fea063..5018d88 100644 --- a/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java +++ b/src/main/java/today/bonfire/oss/bth4j/service/TaskRunnerWrapper.java @@ -27,6 +27,29 @@ public class TaskRunnerWrapper implements Runnable { this.taskOps = taskOps; } + private void rescheduleTask(Task task, long delay) { + var newTask = Task.Builder.newTask() + .event(task.event()) + .accountId(task.accountId()) + .queueName(task.queueName()) + .executeAfter(delay) + .build(); + taskOps.addTaskToQueue(newTask, taskOps.getDataForTask(task.uniqueId()), true); + log.info("Rescheduling old task {} to run after {} seconds, new task id {}", + task.uniqueId(), delay, newTask.uniqueId()); + deleteTask(task); + } + + private void deleteTask(Task task) { + // delete the task and data if present + taskOps.deleteTaskFromInProgressQueue(task); + } + + private void moveToDeadQueue(Task task) { + // move to dead list if task failed because of BGTaskUnrecoverableException + taskOps.moveToDeadQueue(task); + } + /** * if task fails it may be retired automatically by maintenance service * unless the exception is UnrecoverableException in which case the task is marked @@ -41,44 +64,21 @@ public void run() { callbacks.onSuccess().accept(task); } catch (Exception e) { callbacks.onError().accept(task, e); - log.error("Task {} failed", task.taskString(), e); if (e instanceof TaskUnrecoverableException || e instanceof TaskDataException) { + log.error("Task {} failed", task.taskString(), e); // task will not be retried moveToDeadQueue(task); } else if (e instanceof TaskErrorException) { // task can be retried - log.info("Task will be retried"); + log.warn("Task {} failed", task.taskString(), e); } else if (e instanceof TaskRescheduleException ex) { rescheduleTask(task, ex.delay()); } else { - // unknown exception task can be retried - log.info("Unhandled exception. Task will be retried"); + // unknown exception task may be retried + log.warn("Unhandled exception. Task will be retried", e); } } finally { callbacks.afterTask().accept(task); } } - - private void rescheduleTask(Task task, long delay) { - var newTask = Task.Builder.newTask() - .event(task.event()) - .accountId(task.accountId()) - .queueName(task.queueName()) - .executeAfter(delay) - .build(); - taskOps.addTaskToQueue(newTask, taskOps.getDataForTask(task.uniqueId()), true); - log.info("Rescheduling old task {} to run after {} seconds, new task id {}", - task.uniqueId(), delay, newTask.uniqueId()); - deleteTask(task); - } - - private void deleteTask(Task task) { - // delete the task and data if present - taskOps.deleteTaskFromInProgressQueue(task); - } - - private void moveToDeadQueue(Task task) { - // move to dead list if task failed because of BGTaskUnrecoverableException - taskOps.moveToDeadQueue(task); - } }