diff --git a/fineract-core/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java index 51a501599e9..fcd4ec7b935 100644 --- a/fineract-core/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java +++ b/fineract-core/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java @@ -61,6 +61,7 @@ public enum JobName { ADD_PERIODIC_ACCRUAL_ENTRIES_FOR_SAVINGS_WITH_INCOME_POSTED_AS_TRANSACTIONS("Add Accrual Transactions For Savings"), // JOURNAL_ENTRY_AGGREGATION("Journal Entry Aggregation"), // WORKING_CAPITAL_LOAN_COB_JOB("Working Capital Loan COB"), // + SAVINGS_COB("Savings COB"), // ; // private final String name; diff --git a/fineract-doc/src/docs/en/chapters/features/index.adoc b/fineract-doc/src/docs/en/chapters/features/index.adoc index 2a1dab1fc63..311e532876b 100644 --- a/fineract-doc/src/docs/en/chapters/features/index.adoc +++ b/fineract-doc/src/docs/en/chapters/features/index.adoc @@ -31,3 +31,4 @@ include::working-capital-planned-projected-balances-eir.adoc[leveloffset=+1] include::working-capital-discount.adoc[leveloffset=+1] include::savings-interest-posting.adoc[leveloffset=+1] include::working-capital-breach-management.adoc[leveloffset=+1] +include::savings-cob.adoc[leveloffset=+1] diff --git a/fineract-doc/src/docs/en/chapters/features/savings-cob.adoc b/fineract-doc/src/docs/en/chapters/features/savings-cob.adoc new file mode 100644 index 00000000000..567b0806243 --- /dev/null +++ b/fineract-doc/src/docs/en/chapters/features/savings-cob.adoc @@ -0,0 +1,361 @@ += Savings COB (Close of Business) + +== Overview + +The Savings Close of Business (COB) feature is a Spring Batch–based, partitioned pipeline that executes daily end-of-day processing on every non-closed savings account. It mirrors the manager/worker architecture used by Loan COB but operates on `SavingsAccount` aggregates: a manager node partitions the account population by id ranges, dispatches partitions through a message channel, and one or more worker nodes pull partitions, lock the accounts, run an ordered list of business steps against each account, persist the result, and release the lock. + +=== Purpose + +A modern core banking platform needs a deterministic, fault-tolerant, horizontally scalable mechanism to advance the state of every savings account once per business day — posting interest, applying scheduled charges, marking dormancy, and so on. The legacy `POST_INTEREST_FOR_SAVINGS` job processed accounts sequentially in a single tasklet without distributed locking, fault tolerance, or extensibility. Savings COB replaces that with the same partitioned framework already used by Loan COB so that: + +* the work can be split across worker nodes and threads, +* each account is processed in its own transaction, +* per-account failures are isolated (skipped and recorded on the lock) rather than aborting the batch, +* the list of operations performed at COB is configurable per tenant through `m_batch_business_steps`. + +=== Scope + +The scope of this document includes: + +* The `SAVINGS_COB` Spring Batch job and its manager/worker step topology +* The `SavingsCOBBusinessStep` extension point and the bundled `POST_INTEREST_FOR_SAVINGS` step +* Distributed locking via `m_savings_account_locks` and the `SavingsLockOwner` enum +* The `last_closed_business_date` tracking column on `m_savings_account` +* Partitioning, catch-up mode, retry/skip semantics, and the stayed-locked business event +* Tenant-level configuration through `application.properties` + +=== Applicability + +* All savings accounts in a non-closed status: `SUBMITTED_AND_PENDING_APPROVAL`, `APPROVED`, `ACTIVE`, `TRANSFER_IN_PROGRESS`, `TRANSFER_ON_HOLD`. +* Tenants that have at least one row in `m_batch_business_steps` with `job_name = 'SAVINGS_CLOSE_OF_BUSINESS'`. If no business steps are configured, the partitioner stops the job execution. + +=== Definitions and Key Concepts + +*COB business date:* The business date being closed by this run, obtained via `BusinessDateType.COB_DATE` from the tenant's `ThreadLocalContextUtil`. Stored in the job's execution context as `BusinessDate`. + +*Catch-up mode:* When `IS_CATCH_UP=true`, the partitioner and reader only consider accounts whose `last_closed_business_date` equals the target date (i.e. accounts already partially processed for that date), as opposed to advancing accounts that have never been processed. + +*Partition:* A contiguous range of savings account ids (`minId`–`maxId`) of size `partition-size`. Each partition is dispatched as a separate worker step via the `outboundRequests` channel and pulled by a worker through `inboundRequests`. + +*Business step:* A `SavingsCOBBusinessStep` bean executed inside the worker's item processor. The ordered list of steps per tenant is stored in `m_batch_business_steps` and resolved by `COBBusinessStepService.getCOBBusinessSteps(SavingsCOBBusinessStep.class, "SAVINGS_CLOSE_OF_BUSINESS")`. + +*Lock owner:* The role under which a lock row in `m_savings_account_locks` was placed — `SAVINGS_COB_CHUNK_PROCESSING` for batch processing or `SAVINGS_INLINE_COB_PROCESSING` for inline catch-up triggered by a write API. + +*Stayed-locked account:* An account whose lock row was not removed by the end of the run because chunk processing failed for it. Surfaced via the `SavingsAccountsStayedLockedBusinessEvent` event. + +== Design Decisions and Considerations + +=== Reusing the COB framework instead of forking + +The job reuses `COBBusinessStepService`, `CustomJobParameterResolver`, `InitialisationTasklet`, `ResetContextTasklet`, `CobWorkerStepListener`, `RemotePartitioningManagerStepBuilderFactory`, and `RemotePartitioningWorkerStepBuilderFactory` from `fineract-cob`. Only the savings-specific pieces (entity, lock, partitioner, reader/processor/writer, business step interface) are new. This keeps the operational behavior — partition size, poll interval, chunk size, retry/skip — identical to Loan COB and configurable through the same `fineract.partitioned-job.partitioned-job-properties[*]` keys. + +=== Per-account lock with version column + +`m_savings_account_locks` uses `savings_id` as the primary key plus an optimistic-lock `version` column. The lock is inserted by `ApplySavingsLockTasklet` *before* chunk processing starts and is removed by `AbstractSavingsItemWriter` after the account is successfully saved. This prevents two workers (or worker + inline COB) from updating the same savings account concurrently, and lets per-chunk listeners record errors against the lock row without losing the lock. + +=== Skip-and-record over fail-fast + +The worker step is `faultTolerant()` with `retry(Exception.class)`, `retryLimit`, `skip(Exception.class)` and `skipLimit = chunkSize + 1`. A failure on a single account is caught by `ChunkProcessingSavingsItemListener`, which writes the error message and serialized stacktrace to that account's lock row in a new transaction and lets the rest of the chunk continue. The lock is intentionally *not* released on failure — the row remains so an operator can investigate, and the `StayedLockedSavingsTasklet` at the end of the job publishes a `SavingsAccountsStayedLockedBusinessEvent` listing those accounts. + +=== Garbage-collecting orphaned locks + +Because the lock-release happens inside the chunk-writer transaction, a crash between business-step success and writer commit can leave a row in `m_savings_account_locks` whose `error` is `null` but whose account was actually advanced. `UnlockProcessedSavingsTasklet`, the last step of the job, runs the `removeOrphanedLocksForProcessedAccounts` JPQL delete to clear those rows. + +== Database Design + +=== Overview + +Savings COB adds one new table (`m_savings_account_locks`) and one new column on the existing `m_savings_account` table for date tracking, plus rows in the standard `job` and `m_batch_business_steps` tables. + +=== Existing Tables + +*m_savings_account*: Read by `RetrieveSavingsIdServiceImpl` for partitioning and chunk loading, then updated by the writer to advance `last_closed_business_date`. + +*m_batch_business_steps*: Holds the per-tenant ordered list of `SavingsCOBBusinessStep` step names under `job_name = 'SAVINGS_CLOSE_OF_BUSINESS'`. + +*job*: Standard scheduled-job table — a `Savings COB` row is inserted so the scheduler can trigger the batch job. + +*m_external_event_configuration*: Receives an entry for `SavingsAccountsStayedLockedBusinessEvent` so the event can be externalized. + +=== Table: m_savings_account_locks + +The `m_savings_account_locks` table stores one row per savings account that is currently held by a COB run. Created by changelog `2006_add_last_cob_business_date_to_savings_account.xml`. + +[cols="1,2,1,3",options="header"] +|=== +| Column Name | Type | Constraints | Description +| savings_id | BIGINT | PK, FK to `m_savings_account.id`, not null | The locked savings account +| lock_owner | VARCHAR(100) | not null | `SavingsLockOwner` enum value: `SAVINGS_COB_CHUNK_PROCESSING` or `SAVINGS_INLINE_COB_PROCESSING` +| lock_placed_on_cob_business_date | DATE | | The COB business date the lock belongs to; used by stayed-locked and orphaned-lock cleanup queries +| error | VARCHAR(255) | | Per-account failure message; populated by `ChunkProcessingSavingsItemListener` when the chunk listener fires +| stacktrace | TEXT | | Serialized throwable that produced `error` +| version | BIGINT | not null | JPA `@Version` optimistic-lock counter, incremented by `upgradeLock` +| lock_placed_on | DATETIME / TIMESTAMP WITH TIME ZONE | not null | Audit timestamp when the lock was placed (DB-specific column type) +|=== + +=== Changes to Existing Tables + +==== m_savings_account + +The `last_closed_business_date` column is added by changelog `2004_add_savings_account_cob_infrastructure.xml`. An index `FK_savings_account_last_closed_business_date` is added on the same column to keep the partitioner query selective on large account populations. + +[cols="1,2,1,3",options="header"] +|=== +| Column Name | Type | Constraints | Description +| last_closed_business_date | DATE | | The most recent COB business date for which all configured business steps completed for this account. Set by `AbstractSavingsItemProcessor.setLastRun(...)` after the business-step chain returns. +|=== + +=== Reference rows inserted + +`2007_add_savings_cob_job_data.xml` inserts the schedulable job: + +[source,sql] +---- +INSERT INTO job (name, display_name, cron_expression, job_key, short_name, is_active, ...) +VALUES ('Savings COB', 'Savings COB', '0 0 1 * * ?', + 'Savings COB dayJobDetail1 _ DEFAULT', 'SAV_COB', false, ...); +---- + +`2008_add_savings_cob_business_steps.xml` seeds the default step list: + +[source,sql] +---- +INSERT INTO m_batch_business_steps (job_name, step_name, step_order) +VALUES ('SAVINGS_CLOSE_OF_BUSINESS', 'POST_INTEREST_FOR_SAVINGS', 1); +---- + +== Configuration + +=== Job properties + +The job is registered as a partitioned job in `application.properties` and inherits all standard partitioned-job knobs. The defaults below can be overridden via environment variables. + +[source,properties] +---- +fineract.partitioned-job.partitioned-job-properties[1].job-name=SAVINGS_COB +fineract.partitioned-job.partitioned-job-properties[1].chunk-size=${SAVINGS_COB_CHUNK_SIZE:100} +fineract.partitioned-job.partitioned-job-properties[1].partition-size=${SAVINGS_COB_PARTITION_SIZE:100} +fineract.partitioned-job.partitioned-job-properties[1].thread-pool-core-pool-size=${SAVINGS_COB_THREAD_POOL_CORE_POOL_SIZE:5} +fineract.partitioned-job.partitioned-job-properties[1].thread-pool-max-pool-size=${SAVINGS_COB_THREAD_POOL_MAX_POOL_SIZE:5} +fineract.partitioned-job.partitioned-job-properties[1].thread-pool-queue-capacity=${SAVINGS_COB_THREAD_POOL_QUEUE_CAPACITY:20} +fineract.partitioned-job.partitioned-job-properties[1].retry-limit=${SAVINGS_COB_RETRY_LIMIT:5} +fineract.partitioned-job.partitioned-job-properties[1].poll-interval=${SAVINGS_COB_POLL_INTERVAL:500} +---- + +[NOTE] +==== +When `thread-pool-max-pool-size = 1` the worker step uses a `SyncTaskExecutor` and runs accounts sequentially. With a larger pool, work is dispatched through a `ThreadPoolTaskExecutor` decorated by `SavingsContextAwareTaskDecorator`, which propagates the `FineractContext` (tenant, business date, authenticated user) into each worker thread. +==== + +=== Business step configuration + +The ordered list of steps executed per account is read from `m_batch_business_steps` filtered by `job_name = 'SAVINGS_CLOSE_OF_BUSINESS'` and ordered by `step_order`. Operators add or remove steps per tenant by editing this table. A step is only resolvable if its `step_name` matches a `SavingsCOBBusinessStep` bean's `getEnumStyledName()`. + +By default the seed migration installs only `POST_INTEREST_FOR_SAVINGS` at order `1`. + +=== Manager vs Worker mode + +`SavingsCOBManagerConfiguration` is only active when `BatchManagerCondition` evaluates to true and `SavingsCOBWorkerConfiguration` only when `BatchWorkerCondition` does. In a single-node deployment both conditions are true and the same JVM does both roles; in a manager/worker deployment they activate on different nodes. + +== Job Topology + +The `savingsCOBJob` is composed of four sequential steps on the manager node: + +[source] +---- +savingsCOBJob (JobName.SAVINGS_COB) + ├── Resolve custom job parameters - Savings Step + ├── Savings COB partition - Step (partitioner -> outboundRequests) + ├── Stayed locked savings accounts - Step + └── Unlock processed savings accounts - Step +---- + +Each worker partition runs the following flow (driven by `CobWorkerStepListener`): + +[source] +---- +savingsCOBWorkerStep + ├── savingsInitialiseContext (InitialisationTasklet — sets up FineractContext) + ├── savingsApplyLock (ApplySavingsLockTasklet — inserts locks) + ├── chunk(SavingsAccount) (reader -> processor -> writer) + │ reader : SavingsItemReader + │ processor: SavingsItemProcessor (runs the SavingsCOBBusinessStep chain) + │ writer : SavingsItemWriter (saveAll + delete locks) + └── savingsResetContext (ResetContextTasklet) +---- + +The chunk step is fault tolerant: it retries up to `retryLimit` on any `Exception`, then skips with `skipLimit = chunkSize + 1`. `ChunkProcessingSavingsItemListener` intercepts `@OnReadError`, `@OnProcessError`, and `@OnWriteError` and records the failure against the per-account lock row in a `REQUIRES_NEW` transaction. + +=== Partitioning logic + +`SavingsCOBPartitioner.partition(int)` computes its partitions by paging through eligible account ids. The SQL is dialect-portable through `NamedParameterJdbcTemplate` and uses `row_number()` window-paging: + +* Filter: `status_enum IN (SUBMITTED_AND_PENDING_APPROVAL, APPROVED, ACTIVE, TRANSFER_IN_PROGRESS, TRANSFER_ON_HOLD)`. +* When `isCatchUp = true`: only accounts where `last_closed_business_date = businessDate - 1`. +* When `isCatchUp = false`: accounts where `last_closed_business_date = businessDate - 1 OR last_closed_business_date IS NULL`. +* Output: a list of `COBPartition(min, max, page, count)` rows whose ids fall into the same `floor((rowNumber - 1) / partitionSize)` bucket. + +If the configured business steps list is empty the partitioner calls `jobOperator.stop(jobId)` and returns no partitions. If there are no eligible accounts, a synthetic `(0, 0, 1, 0)` partition is emitted so the downstream worker step still runs cleanly. + +=== Reader filter + +`SavingsItemReader.beforeStep` re-runs `retrieveAllNonClosedSavingsByLastClosedBusinessDateAndMinAndMaxSavingsId` to get the savings ids within the partition's `[min, max]` range, then *retains only* those that actually have a `SAVINGS_COB_CHUNK_PROCESSING` lock placed for them by the `ApplySavingsLockTasklet`. This guarantees the chunk processor never touches an account that was not successfully locked first. + +== Business Steps + +Business steps are implementations of `SavingsCOBBusinessStep extends COBBusinessStep`. They are Spring beans, discovered by `COBBusinessStepService`, and invoked in `step_order` for every `SavingsAccount` flowing through the chunk processor. + +[source,java] +---- +public interface SavingsCOBBusinessStep extends COBBusinessStep { + // String getEnumStyledName(); // matches m_batch_business_steps.step_name + // String getHumanReadableName(); + // SavingsAccount execute(SavingsAccount input); +} +---- + +=== POST_INTEREST_FOR_SAVINGS + +`PostInterestForSavingsBusinessStep` is the only step shipped by default. It delegates to `SavingsAccountWritePlatformService.postInterest(account, false, businessDate, backdatedTxnsAllowedTill)`, where `backdatedTxnsAllowedTill` is read from `ConfigurationDomainService.retrievePivotDateConfig()`. + +[cols="1,3",options="header"] +|=== +| Property | Value +| `getEnumStyledName()` | `POST_INTEREST_FOR_SAVINGS` +| `getHumanReadableName()` | `Post interest for savings` +| Default step order | 1 +|=== + +== Locking + +=== Lock ownership + +`SavingsLockOwner` enumerates who placed the lock: + +[cols="1,3",options="header"] +|=== +| Value | Description +| `SAVINGS_COB_CHUNK_PROCESSING` | Held by the batch worker while a chunk is being processed +| `SAVINGS_INLINE_COB_PROCESSING` | Held by an inline catch-up COB triggered by a write API +|=== + +=== Lock lifecycle + +. `ApplySavingsLockTasklet` selects all eligible ids in the partition, removes those already locked (so it never re-locks), and bulk-inserts rows into `m_savings_account_locks` via `SavingsLockingServiceImpl.applyLock(...)`. The insert runs in a `PROPAGATION_REQUIRES_NEW` transaction. It retries up to 3 commit-count cycles before raising `LockCannotBeAppliedException`. +. The chunk reader filters its working set down to ids that actually have a `SAVINGS_COB_CHUNK_PROCESSING` lock. +. The processor runs the business-step chain and sets `lastClosedBusinessDate = businessDate` on the account. +. The writer persists the chunk via `RepositoryItemWriter` and then calls `deleteBySavingsIdInAndLockOwner(ids, SAVINGS_COB_CHUNK_PROCESSING)` in the same transaction. +. If any read/process/write error occurs, `ChunkProcessingSavingsItemListener` writes the error message and stacktrace to the lock row and the lock is *kept*. +. The `StayedLockedSavingsTasklet` at the end of the job calls `findAllStayedLockedByCobBusinessDate(cobBusinessDate)` and, if any rows come back, publishes a `SavingsAccountsStayedLockedBusinessEvent`. +. The `UnlockProcessedSavingsTasklet` runs `removeOrphanedLocksForProcessedAccounts`, which deletes any `SAVINGS_COB_CHUNK_PROCESSING` lock row whose `error` is `null` (i.e. processing actually finished but the writer's lock delete didn't land). + +=== Locking SQL + +`SavingsLockingServiceImpl` uses two JDBC batch statements: + +[source,sql] +---- +-- applyLock +INSERT INTO m_savings_account_locks + (savings_id, version, lock_owner, lock_placed_on, lock_placed_on_cob_business_date) +VALUES (?, ?, ?, ?, ?); + +-- upgradeLock +UPDATE m_savings_account_locks +SET version = version + 1, lock_owner = ?, lock_placed_on = ? +WHERE savings_id = ?; +---- + +Both are batched in slices of `fineract.query.in-clause-parameter-size-limit` rows. + +== Business Events + +=== Overview + +The COB job emits one business event when accounts could not be unlocked at the end of the run. + +=== SavingsAccountsStayedLockedBusinessEvent + +*Category:* `Savings COB` + +*Type:* `SavingsAccountsStayedLockedBusinessEvent` + +Emitted by `StayedLockedSavingsTasklet` at the end of `savingsCOBJob` whenever +`findAllStayedLockedByCobBusinessDate(cobBusinessDate)` returns a non-empty list. The payload `SavingsAccountsStayedLockedData` contains a list of `COBIdAndExternalIdAndAccountNo(id, externalId, accountNo)` for every savings account whose lock row remained associated with the current COB business date. + +The event is registered in `m_external_event_configuration` (migration `2004_add_savings_account_cob_infrastructure.xml`) with `enabled = false` by default — operators turn it on per tenant to receive externalized notifications. + +== Validation Rules + +The job has no user-facing input and therefore no API-level validation. Internal invariants enforced by the pipeline: + +* The partitioner refuses to dispatch any partition when no `SavingsCOBBusinessStep` is configured for `SAVINGS_CLOSE_OF_BUSINESS` in `m_batch_business_steps`; it stops the job execution. +* The reader filters out any partition member that does not currently hold a `SAVINGS_COB_CHUNK_PROCESSING` lock. +* `ApplySavingsLockTasklet` only retries lock acquisition while `stepExecution.commitCount <= 3`; beyond that it raises `LockCannotBeAppliedException`. + +== Business Rules + +=== Account selection + +* Only savings accounts in `SUBMITTED_AND_PENDING_APPROVAL`, `APPROVED`, `ACTIVE`, `TRANSFER_IN_PROGRESS`, or `TRANSFER_ON_HOLD` are eligible. +* Accounts whose `last_closed_business_date` is already at or beyond `cobBusinessDate - NUMBER_OF_DAYS_BEHIND` (i.e. already processed for that date) are skipped. +* `NUMBER_OF_DAYS_BEHIND` is inherited from `COBConstant.NUMBER_OF_DAYS_BEHIND = 1L`. + +=== Catch-up mode + +* When the job parameter `IS_CATCH_UP = true`, the partitioner and reader only consider accounts where `last_closed_business_date = cobBusinessDate - 1`. This is the entry point used to re-process accounts whose previous COB run left them in a stayed-locked state, after manual intervention has cleared the lock. + +=== Idempotence + +* The processor sets `lastClosedBusinessDate` to the current business date as the last action of `process(...)`. A re-run for the same business date will therefore skip already-advanced accounts at partition selection time. +* Business steps are expected to be idempotent within a single business date; a chunk-level retry will re-invoke `execute(...)` for the same account. + +== Example Scenarios + +=== Scenario #1: Nightly COB advances a healthy tenant + +*Setup:* + +* `m_batch_business_steps` for `SAVINGS_CLOSE_OF_BUSINESS` contains only `POST_INTEREST_FOR_SAVINGS` at order 1. +* `BusinessDate` is 2026-06-09; all eligible savings accounts have `last_closed_business_date = 2026-06-08` or `NULL`. +* `partition-size = 100`, `chunk-size = 100`, `thread-pool-max-pool-size = 5`. + +*Action:* + +The `Savings COB` scheduled job fires. The manager partitions accounts into id-range buckets of 100 and pushes them onto `outboundRequests`. Workers pull each partition, place locks, run `PostInterestForSavingsBusinessStep` against each account, persist, release locks. + +*Expected Behavior:* + +* Every eligible account has `last_closed_business_date` advanced to 2026-06-09. +* `m_savings_account_locks` is empty at the end of the run. +* `StayedLockedSavingsTasklet` finds no rows and emits no event. +* `UnlockProcessedSavingsTasklet` deletes 0 rows. + +=== Scenario #2: A single account fails interest posting + +*Setup:* + +* Same configuration as Scenario #1. +* Account `12345` is in an inconsistent state that makes `postInterest` throw a `RuntimeException` every time, including on retry. + +*Action:* + +The chunk-step retries `12345` up to `retryLimit` times, then skips it. `ChunkProcessingSavingsItemListener.onProcessError` writes `"SavingsAccount (id: 12345) processing is failed"` and the serialized stacktrace to the row in `m_savings_account_locks` for that account. + +*Expected Behavior:* + +* All accounts other than `12345` advance and have their locks released. +* The lock row for `12345` remains, with `error` and `stacktrace` populated. +* `StayedLockedSavingsTasklet` finds `12345` and publishes a `SavingsAccountsStayedLockedBusinessEvent` containing `(12345, externalId, accountNo)`. +* `UnlockProcessedSavingsTasklet` does *not* delete `12345` because its `error` column is non-null. +* On the next day's run, `12345` is excluded from partitioning until the lock is manually cleared (e.g. via `removeLockByOwner` after operator review). + +== Summary + +Savings COB ports the proven LoanCOB partitioned-batch architecture to savings accounts, providing a configurable, fault-tolerant, horizontally scalable end-of-day pipeline. Key aspects: + +* Manager/worker Spring Batch topology with `RemotePartitioningManagerStepBuilderFactory` and `RemotePartitioningWorkerStepBuilderFactory`, activated through `BatchManagerCondition` / `BatchWorkerCondition`. +* Per-account distributed lock in `m_savings_account_locks` with two ownership flavours (chunk processing and inline) and explicit stayed-lock + orphaned-lock cleanup steps. +* Extensible pipeline through the `SavingsCOBBusinessStep` interface and the tenant-scoped `m_batch_business_steps` table, shipping with `POST_INTEREST_FOR_SAVINGS` enabled by default. +* Catch-up mode that targets only accounts whose previous COB run did not complete, enabling safe re-processing. +* Business-event `SavingsAccountsStayedLockedBusinessEvent` that surfaces every account left locked at end-of-run, so downstream tooling can alert or auto-remediate. diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/listener/ChunkProcessingSavingsItemListener.java b/fineract-provider/src/main/java/org/apache/fineract/cob/listener/ChunkProcessingSavingsItemListener.java new file mode 100644 index 00000000000..832571a78a2 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/listener/ChunkProcessingSavingsItemListener.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.listener; + +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW; + +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.exceptions.LockedReadException; +import org.apache.fineract.cob.savings.SavingsLockOwner; +import org.apache.fineract.cob.savings.SavingsLockingService; +import org.apache.fineract.infrastructure.core.domain.AbstractPersistableCustom; +import org.apache.fineract.infrastructure.core.serialization.ThrowableSerialization; +import org.apache.fineract.portfolio.savings.domain.SavingsAccount; +import org.springframework.batch.core.annotation.OnProcessError; +import org.springframework.batch.core.annotation.OnReadError; +import org.springframework.batch.core.annotation.OnSkipInProcess; +import org.springframework.batch.core.annotation.OnSkipInRead; +import org.springframework.batch.core.annotation.OnSkipInWrite; +import org.springframework.batch.core.annotation.OnWriteError; +import org.springframework.batch.item.Chunk; +import org.springframework.lang.NonNull; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +@Slf4j +@RequiredArgsConstructor +public class ChunkProcessingSavingsItemListener { + + private final SavingsLockingService savingsLockingService; + private final TransactionTemplate batchJdbcTransactionTemplate; + + private void updateLockWithError(List savingsIds, String msgTemplate, Throwable e) { + batchJdbcTransactionTemplate.setPropagationBehavior(PROPAGATION_REQUIRES_NEW); + batchJdbcTransactionTemplate.execute(new TransactionCallbackWithoutResult() { + + @Override + protected void doInTransactionWithoutResult(@NonNull TransactionStatus status) { + String stacktrace = ThrowableSerialization.serialize(e); + for (Long savingsId : savingsIds) { + savingsLockingService.updateLockError(savingsId, SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING, + String.format(msgTemplate, savingsId), stacktrace); + } + } + }); + } + + @OnReadError + public void onReadError(Exception e) { + if (e instanceof LockedReadException ee) { + log.warn("Read error for SavingsAccount (id={}) due to: {}", ee.getId(), ThrowableSerialization.serialize(e)); + updateLockWithError(List.of(ee.getId()), "SavingsAccount (id: %d) reading is failed", e); + } else { + log.error("Could not handle read error", e); + } + } + + @OnProcessError + public void onProcessError(@NonNull SavingsAccount item, Exception e) { + log.warn("Process error for SavingsAccount (id={}) due to: {}", item.getId(), ThrowableSerialization.serialize(e)); + updateLockWithError(List.of(item.getId()), "SavingsAccount (id: %d) processing is failed", e); + } + + @OnWriteError + public void onWriteError(Exception e, @NonNull Chunk items) { + List ids = items.getItems().stream().map(AbstractPersistableCustom::getId).toList(); + log.warn("Write error for SavingsAccounts (ids={}) due to: {}", ids, ThrowableSerialization.serialize(e)); + updateLockWithError(ids, "SavingsAccount (id: %d) writing is failed", e); + } + + @OnSkipInRead + public void onSkipInRead(@NonNull Throwable e) { + log.warn("Skipping triggered during savings COB read!"); + } + + @OnSkipInProcess + public void onSkipInProcess(@NonNull SavingsAccount item, @NonNull Throwable e) { + log.warn("Skipping triggered during processing of SavingsAccount (id={})", item.getId()); + } + + @OnSkipInWrite + public void onSkipInWrite(@NonNull SavingsAccount item, @NonNull Throwable e) { + log.warn("Skipping triggered during writing of SavingsAccount (id={})", item.getId()); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemProcessor.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemProcessor.java new file mode 100644 index 00000000000..bfdf854885b --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemProcessor.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import org.apache.fineract.cob.COBBusinessStepService; +import org.apache.fineract.cob.processor.AbstractItemProcessor; +import org.apache.fineract.portfolio.savings.domain.SavingsAccount; + +public abstract class AbstractSavingsItemProcessor extends AbstractItemProcessor { + + public AbstractSavingsItemProcessor(COBBusinessStepService cobBusinessStepService) { + super(cobBusinessStepService); + } + + @Override + public void setLastRun(SavingsAccount savingsAccount) { + savingsAccount.setLastClosedBusinessDate(getBusinessDate()); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemReader.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemReader.java new file mode 100644 index 00000000000..bdcb2558d20 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemReader.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import java.util.concurrent.LinkedBlockingQueue; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.exceptions.LockedReadException; +import org.apache.fineract.portfolio.savings.domain.SavingsAccount; +import org.apache.fineract.portfolio.savings.domain.SavingsAccountRepository; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.AfterStep; +import org.springframework.batch.item.ItemReader; +import org.springframework.lang.NonNull; + +@Slf4j +@RequiredArgsConstructor +public abstract class AbstractSavingsItemReader implements ItemReader { + + protected final SavingsAccountRepository savingsAccountRepository; + + @Setter(AccessLevel.PROTECTED) + private LinkedBlockingQueue remainingData; + + @Override + public SavingsAccount read() throws Exception { + final Long savingsId = remainingData.poll(); + if (savingsId != null) { + try { + return savingsAccountRepository.findById(savingsId) + .orElseThrow(() -> new RuntimeException("SavingsAccount not found: " + savingsId)); + } catch (Exception e) { + throw new LockedReadException(savingsId, e); + } + } + return null; + } + + @AfterStep + public ExitStatus afterStep(@NonNull StepExecution stepExecution) { + return ExitStatus.COMPLETED; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemWriter.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemWriter.java new file mode 100644 index 00000000000..fe3fc802c8d --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/AbstractSavingsItemWriter.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.infrastructure.core.domain.AbstractPersistableCustom; +import org.apache.fineract.portfolio.savings.domain.SavingsAccount; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.data.RepositoryItemWriter; +import org.springframework.lang.NonNull; + +@Slf4j +@RequiredArgsConstructor +public abstract class AbstractSavingsItemWriter extends RepositoryItemWriter { + + private final SavingsLockingService savingsLockingService; + + @Override + public void write(@NonNull Chunk items) throws Exception { + if (!items.isEmpty()) { + super.write(items); + List savingsIds = items.getItems().stream().map(AbstractPersistableCustom::getId).toList(); + savingsLockingService.deleteBySavingsIdInAndLockOwner(savingsIds, getLockOwner()); + } + } + + protected abstract SavingsLockOwner getLockOwner(); +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/ApplySavingsLockTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/ApplySavingsLockTasklet.java new file mode 100644 index 00000000000..0911dc49785 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/ApplySavingsLockTasklet.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW; + +import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.converter.COBParameterConverter; +import org.apache.fineract.cob.data.COBParameter; +import org.apache.fineract.cob.exceptions.LockCannotBeAppliedException; +import org.apache.fineract.cob.resolver.CatchUpFlagResolver; +import org.apache.fineract.infrastructure.core.config.FineractProperties; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.lang.NonNull; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +@Slf4j +@RequiredArgsConstructor +public class ApplySavingsLockTasklet implements Tasklet { + + private static final long NUMBER_OF_RETRIES = 3; + private final FineractProperties fineractProperties; + private final SavingsLockingService savingsLockingService; + private final RetrieveSavingsIdService retrieveSavingsIdService; + private final TransactionTemplate batchJdbcTransactionTemplate; + + @Override + @SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT") + public RepeatStatus execute(@NonNull StepContribution contribution, @NonNull ChunkContext chunkContext) + throws LockCannotBeAppliedException { + ExecutionContext executionContext = contribution.getStepExecution().getExecutionContext(); + long numberOfExecutions = contribution.getStepExecution().getCommitCount(); + COBParameter savingsCOBParameter = COBParameterConverter.convert(executionContext.get(SavingsCOBConstant.SAVINGS_COB_PARAMETER)); + boolean isCatchUp = CatchUpFlagResolver.resolve(contribution.getStepExecution()); + + List savingsIds; + if (Objects.isNull(savingsCOBParameter) + || (Objects.isNull(savingsCOBParameter.getMinAccountId()) && Objects.isNull(savingsCOBParameter.getMaxAccountId())) + || (savingsCOBParameter.getMinAccountId().equals(0L) && savingsCOBParameter.getMaxAccountId().equals(0L))) { + savingsIds = Collections.emptyList(); + } else { + savingsIds = new ArrayList<>(retrieveSavingsIdService + .retrieveAllNonClosedSavingsByLastClosedBusinessDateAndMinAndMaxSavingsId(savingsCOBParameter, isCatchUp)); + } + + int limit = fineractProperties.getQuery().getInClauseParameterSizeLimit(); + List> partitions = Lists.partition(savingsIds, limit); + List alreadyLocked = new ArrayList<>(); + partitions.forEach(part -> alreadyLocked.addAll(savingsLockingService.findLockIdsBySavingsIdIn(part))); + + List toBeProcessed = new ArrayList<>(savingsIds); + toBeProcessed.removeAll(alreadyLocked); + + try { + applyLocks(toBeProcessed); + } catch (Exception e) { + if (numberOfExecutions > NUMBER_OF_RETRIES) { + String message = "There was an error applying lock to savings accounts."; + log.error("{}", message, e); + throw new LockCannotBeAppliedException(message, e); + } else { + return RepeatStatus.CONTINUABLE; + } + } + return RepeatStatus.FINISHED; + } + + private void applyLocks(List savingsIds) { + batchJdbcTransactionTemplate.setPropagationBehavior(PROPAGATION_REQUIRES_NEW); + batchJdbcTransactionTemplate.execute(new TransactionCallbackWithoutResult() { + + @Override + protected void doInTransactionWithoutResult(@NonNull TransactionStatus status) { + log.info("Applying savings COB locks for {} accounts", savingsIds.size()); + savingsLockingService.applyLock(savingsIds, SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + } + }); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/ResolveSavingsCOBCustomJobParametersTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/ResolveSavingsCOBCustomJobParametersTasklet.java new file mode 100644 index 00000000000..fec1ac2e009 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/ResolveSavingsCOBCustomJobParametersTasklet.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.COBConstant; +import org.apache.fineract.cob.common.CustomJobParameterResolver; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; + +@Slf4j +@RequiredArgsConstructor +public class ResolveSavingsCOBCustomJobParametersTasklet implements Tasklet { + + private final CustomJobParameterResolver customJobParameterResolver; + + @Override + public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception { + customJobParameterResolver.resolveToJobExecutionContext(contribution, chunkContext, + new String[] { COBConstant.BUSINESS_DATE_PARAMETER_NAME }, new String[] { COBConstant.IS_CATCH_UP_PARAMETER_NAME }); + return RepeatStatus.FINISHED; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/RetrieveSavingsIdServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/RetrieveSavingsIdServiceImpl.java index ba81701380a..5a8a75ce425 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/RetrieveSavingsIdServiceImpl.java +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/RetrieveSavingsIdServiceImpl.java @@ -110,8 +110,6 @@ public List retrieveAllNonClosedSavingsByLastClosedBusinessDateAndMinAndMa @Override public List findAllStayedLockedByCobBusinessDate(LocalDate cobBusinessDate) { - // This will be implemented when we add the query to join with SavingsAccountLock - // For now, return empty list as the lock table doesn't exist yet - return List.of(); + return savingsAccountRepository.findAllStayedLockedByCobBusinessDate(cobBusinessDate); } } diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBManagerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBManagerConfiguration.java new file mode 100644 index 00000000000..3e8789c34b8 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBManagerConfiguration.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import org.apache.fineract.cob.COBBusinessStepService; +import org.apache.fineract.cob.common.CustomJobParameterResolver; +import org.apache.fineract.cob.conditions.BatchManagerCondition; +import org.apache.fineract.cob.domain.SavingsAccountLockRepository; +import org.apache.fineract.cob.listener.COBExecutionListenerRunner; +import org.apache.fineract.infrastructure.event.business.service.BusinessEventNotifierService; +import org.apache.fineract.infrastructure.jobs.service.JobName; +import org.apache.fineract.infrastructure.springbatch.PropertyService; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; +import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.transaction.PlatformTransactionManager; + +@Configuration +@EnableBatchIntegration +@Conditional(BatchManagerCondition.class) +public class SavingsCOBManagerConfiguration { + + @Autowired + private JobRepository jobRepository; + @Autowired + private PlatformTransactionManager transactionManager; + @Autowired + private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory; + @Autowired + private PropertyService propertyService; + @Autowired + private DirectChannel outboundRequests; + @Autowired + private COBBusinessStepService cobBusinessStepService; + @Autowired + private JobOperator jobOperator; + @Autowired + private ApplicationContext applicationContext; + @Autowired + private RetrieveSavingsIdService retrieveSavingsIdService; + @Autowired + private BusinessEventNotifierService businessEventNotifierService; + @Autowired + private CustomJobParameterResolver customJobParameterResolver; + @Autowired + private SavingsLockingService savingsLockingService; + @Autowired + private SavingsAccountLockRepository savingsAccountLockRepository; + + @Bean + @StepScope + public SavingsCOBPartitioner savingsPartitioner(@Value("#{stepExecution}") StepExecution stepExecution) { + return new SavingsCOBPartitioner(propertyService, cobBusinessStepService, retrieveSavingsIdService, jobOperator, stepExecution); + } + + @Bean("savingsCOBStep") + public Step savingsCOBStep(SavingsCOBPartitioner savingsPartitioner) { + return stepBuilderFactory.get(SavingsCOBConstant.SAVINGS_COB_PARTITIONER_STEP) + .partitioner(SavingsCOBConstant.SAVINGS_COB_WORKER_STEP, savingsPartitioner) + .pollInterval(propertyService.getPollInterval(SavingsCOBConstant.JOB_NAME)).outputChannel(outboundRequests).build(); + } + + @Bean("savingsResolveCustomJobParametersStep") + public Step savingsResolveCustomJobParametersStep() { + return new StepBuilder("Resolve custom job parameters - Savings Step", jobRepository) + .tasklet(savingsResolveCustomJobParametersTasklet(), transactionManager).build(); + } + + @Bean("savingsStayedLockedStep") + public Step savingsStayedLockedStep() { + return new StepBuilder("Stayed locked savings accounts - Step", jobRepository) + .tasklet(savingsStayedLockedTasklet(), transactionManager).build(); + } + + @Bean("savingsUnlockProcessedStep") + public Step savingsUnlockProcessedStep() { + return new StepBuilder("Unlock processed savings accounts - Step", jobRepository) + .tasklet(savingsUnlockProcessedTasklet(), transactionManager).build(); + } + + @Bean("savingsResolveCustomJobParametersTasklet") + public ResolveSavingsCOBCustomJobParametersTasklet savingsResolveCustomJobParametersTasklet() { + return new ResolveSavingsCOBCustomJobParametersTasklet(customJobParameterResolver); + } + + @Bean("savingsStayedLockedTasklet") + public StayedLockedSavingsTasklet savingsStayedLockedTasklet() { + return new StayedLockedSavingsTasklet(businessEventNotifierService, retrieveSavingsIdService); + } + + @Bean("savingsUnlockProcessedTasklet") + public UnlockProcessedSavingsTasklet savingsUnlockProcessedTasklet() { + return new UnlockProcessedSavingsTasklet(savingsAccountLockRepository); + } + + @Bean(name = "savingsCOBJob") + public Job savingsCOBJob(SavingsCOBPartitioner savingsPartitioner) { + return new JobBuilder(JobName.SAVINGS_COB.name(), jobRepository) // + .listener(new COBExecutionListenerRunner(applicationContext, JobName.SAVINGS_COB.name())) // + .start(savingsResolveCustomJobParametersStep()) // + .next(savingsCOBStep(savingsPartitioner)) // + .next(savingsStayedLockedStep()) // + .next(savingsUnlockProcessedStep()) // + .incrementer(new RunIdIncrementer()) // + .build(); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBPartitioner.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBPartitioner.java new file mode 100644 index 00000000000..6525cd8c86d --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBPartitioner.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.COBBusinessStepService; +import org.apache.fineract.cob.COBConstant; +import org.apache.fineract.cob.data.BusinessStepNameAndOrder; +import org.apache.fineract.cob.data.COBParameter; +import org.apache.fineract.cob.data.COBPartition; +import org.apache.fineract.cob.resolver.BusinessDateResolver; +import org.apache.fineract.cob.resolver.CatchUpFlagResolver; +import org.apache.fineract.infrastructure.springbatch.PropertyService; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.launch.JobExecutionNotRunningException; +import org.springframework.batch.core.launch.JobOperator; +import org.springframework.batch.core.launch.NoSuchJobExecutionException; +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.lang.NonNull; +import org.springframework.util.StopWatch; + +@Slf4j +@RequiredArgsConstructor +public class SavingsCOBPartitioner implements Partitioner { + + private final PropertyService propertyService; + private final COBBusinessStepService cobBusinessStepService; + private final RetrieveSavingsIdService retrieveSavingsIdService; + private final JobOperator jobOperator; + private final StepExecution stepExecution; + + @NonNull + @Override + public Map partition(int gridSize) { + Set cobBusinessSteps = cobBusinessStepService.getCOBBusinessSteps(SavingsCOBBusinessStep.class, + SavingsCOBConstant.SAVINGS_COB_JOB_NAME); + if (cobBusinessSteps.isEmpty()) { + stopJobExecution(); + return Map.of(); + } + + int partitionSize = propertyService.getPartitionSize(SavingsCOBConstant.JOB_NAME); + LocalDate businessDate = BusinessDateResolver.resolve(stepExecution); + boolean isCatchUp = CatchUpFlagResolver.resolve(stepExecution); + + StopWatch sw = new StopWatch(); + sw.start(); + List partitions = new ArrayList<>(retrieveSavingsIdService + .retrieveSavingsCOBPartitions(SavingsCOBConstant.NUMBER_OF_DAYS_BEHIND, businessDate, isCatchUp, partitionSize)); + sw.stop(); + + if (partitions.isEmpty()) { + partitions.add(new COBPartition(0L, 0L, 1L, 0L)); + } + long totalCount = partitions.stream().map(COBPartition::getCount).reduce(0L, Long::sum); + log.info("SavingsCOBPartitioner found {} savings accounts to process. {} partitions created (size={}). Took {} ms.", totalCount, + partitions.size(), partitionSize, sw.getTotalTimeMillis()); + + return partitions.stream().collect(Collectors.toMap(p -> COBConstant.PARTITION_PREFIX + p.getPageNo(), + p -> createExecutionContext(cobBusinessSteps, p, businessDate, isCatchUp))); + } + + private ExecutionContext createExecutionContext(Set cobBusinessSteps, COBPartition partition, + LocalDate businessDate, boolean isCatchUp) { + ExecutionContext ctx = new ExecutionContext(); + ctx.put(COBConstant.BUSINESS_STEPS, cobBusinessSteps); + ctx.put(SavingsCOBConstant.SAVINGS_COB_PARAMETER, new COBParameter(partition.getMinId(), partition.getMaxId())); + ctx.put(COBConstant.PARTITION_KEY, COBConstant.PARTITION_PREFIX + partition.getPageNo()); + ctx.put(COBConstant.BUSINESS_DATE_PARAMETER_NAME, businessDate.toString()); + ctx.put(COBConstant.IS_CATCH_UP_PARAMETER_NAME, Boolean.toString(isCatchUp)); + return ctx; + } + + private void stopJobExecution() { + Long jobId = stepExecution.getJobExecution().getId(); + try { + jobOperator.stop(jobId); + } catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) { + log.error("No running execution for savings COB job ID: {}", jobId); + throw new RuntimeException(e); + } + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBWorkerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBWorkerConfiguration.java new file mode 100644 index 00000000000..3e78ab339dc --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsCOBWorkerConfiguration.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import org.apache.fineract.cob.COBBusinessStepService; +import org.apache.fineract.cob.common.InitialisationTasklet; +import org.apache.fineract.cob.common.ResetContextTasklet; +import org.apache.fineract.cob.conditions.BatchWorkerCondition; +import org.apache.fineract.cob.domain.SavingsAccountLockRepository; +import org.apache.fineract.cob.listener.ChunkProcessingSavingsItemListener; +import org.apache.fineract.cob.listener.CobWorkerStepListener; +import org.apache.fineract.infrastructure.core.config.FineractProperties; +import org.apache.fineract.infrastructure.jobs.service.JobName; +import org.apache.fineract.infrastructure.springbatch.PropertyService; +import org.apache.fineract.portfolio.savings.domain.SavingsAccount; +import org.apache.fineract.portfolio.savings.domain.SavingsAccountRepository; +import org.apache.fineract.useradministration.domain.AppUserRepositoryWrapper; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.SimpleStepBuilder; +import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; + +@Configuration +@Conditional(BatchWorkerCondition.class) +public class SavingsCOBWorkerConfiguration { + + @Autowired + private JobRepository jobRepository; + @Autowired + private PlatformTransactionManager transactionManager; + @Autowired + @Qualifier("batchJdbcTransactionTemplate") + private TransactionTemplate batchJdbcTransactionTemplate; + @Autowired + private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory; + @Autowired + private PropertyService propertyService; + @Autowired + private SavingsAccountRepository savingsAccountRepository; + @Autowired + private QueueChannel inboundRequests; + @Autowired + private COBBusinessStepService cobBusinessStepService; + @Autowired + private AppUserRepositoryWrapper userRepository; + @Autowired + private RetrieveSavingsIdService retrieveSavingsIdService; + @Autowired + private FineractProperties fineractProperties; + @Autowired + private SavingsLockingService savingsLockingService; + @Autowired + private SavingsAccountLockRepository savingsAccountLockRepository; + + @Bean(name = SavingsCOBConstant.SAVINGS_COB_WORKER_STEP) + public Step savingsCOBWorkerStep() { + final SimpleStepBuilder stepBuilder = stepBuilderFactory.get("Savings COB worker - Step") + .inputChannel(inboundRequests) + .chunk(propertyService.getChunkSize(JobName.SAVINGS_COB.name()), transactionManager) // + .reader(savingsCobWorkerItemReader()) // + .processor(savingsCobWorkerItemProcessor()) // + .writer(savingsCobWorkerItemWriter()) // + .faultTolerant() // + .retry(Exception.class) // + .retryLimit(propertyService.getRetryLimit(SavingsCOBConstant.JOB_NAME)) // + .skip(Exception.class) // + .skipLimit(propertyService.getChunkSize(SavingsCOBConstant.JOB_NAME) + 1) // + .listener(savingsItemListener()) // + .listener(savingsCobWorkerStepListener()) // + .transactionManager(transactionManager); + + if (propertyService.getThreadPoolMaxPoolSize(SavingsCOBConstant.JOB_NAME) > 1) { + stepBuilder.taskExecutor(savingsCobTaskExecutor()); + } + return stepBuilder.build(); + } + + @Bean("savingsCobTaskExecutor") + public TaskExecutor savingsCobTaskExecutor() { + if (propertyService.getThreadPoolMaxPoolSize(SavingsCOBConstant.JOB_NAME) == 1) { + return new SyncTaskExecutor(); + } + final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + taskExecutor.setThreadNamePrefix("SavingsCOB-Thread-"); + taskExecutor.setThreadGroupName("SavingsCOB-Thread"); + taskExecutor.setCorePoolSize(propertyService.getThreadPoolCorePoolSize(JobName.SAVINGS_COB.name())); + taskExecutor.setMaxPoolSize(propertyService.getThreadPoolMaxPoolSize(JobName.SAVINGS_COB.name())); + taskExecutor.setQueueCapacity(propertyService.getThreadPoolQueueCapacity(JobName.SAVINGS_COB.name())); + taskExecutor.setAllowCoreThreadTimeOut(true); + taskExecutor.setTaskDecorator(new SavingsContextAwareTaskDecorator()); + return taskExecutor; + } + + @Bean("savingsCobWorkerStepListener") + public CobWorkerStepListener savingsCobWorkerStepListener() { + return new CobWorkerStepListener(savingsInitialiseContext(), savingsApplyLock(), savingsResetContext()); + } + + @Bean("savingsInitialiseContext") + public InitialisationTasklet savingsInitialiseContext() { + return new InitialisationTasklet(userRepository); + } + + @Bean("savingsItemListener") + public ChunkProcessingSavingsItemListener savingsItemListener() { + return new ChunkProcessingSavingsItemListener(savingsLockingService, batchJdbcTransactionTemplate); + } + + @Bean("savingsApplyLock") + public ApplySavingsLockTasklet savingsApplyLock() { + return new ApplySavingsLockTasklet(fineractProperties, savingsLockingService, retrieveSavingsIdService, + batchJdbcTransactionTemplate); + } + + @Bean("savingsResetContext") + public ResetContextTasklet savingsResetContext() { + return new ResetContextTasklet(); + } + + @Bean + @StepScope + public SavingsItemReader savingsCobWorkerItemReader() { + return new SavingsItemReader(savingsAccountRepository, retrieveSavingsIdService, savingsLockingService); + } + + @Bean + @StepScope + public SavingsItemProcessor savingsCobWorkerItemProcessor() { + return new SavingsItemProcessor(cobBusinessStepService); + } + + @Bean + @StepScope + public SavingsItemWriter savingsCobWorkerItemWriter() { + SavingsItemWriter writer = new SavingsItemWriter(savingsLockingService); + writer.setRepository(savingsAccountRepository); + return writer; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsContextAwareTaskDecorator.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsContextAwareTaskDecorator.java new file mode 100644 index 00000000000..439ffd6d38f --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsContextAwareTaskDecorator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.infrastructure.core.domain.FineractContext; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; +import org.springframework.core.task.TaskDecorator; +import org.springframework.lang.NonNull; + +@Slf4j +public class SavingsContextAwareTaskDecorator implements TaskDecorator { + + @NonNull + @Override + public Runnable decorate(@NonNull final Runnable runnable) { + final FineractContext context = ThreadLocalContextUtil.getContext(); + return () -> { + try { + ThreadLocalContextUtil.init(context); + runnable.run(); + } finally { + ThreadLocalContextUtil.reset(); + } + }; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemProcessor.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemProcessor.java new file mode 100644 index 00000000000..ba4abe17218 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemProcessor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import org.apache.fineract.cob.COBBusinessStepService; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.BeforeStep; + +public class SavingsItemProcessor extends AbstractSavingsItemProcessor { + + public SavingsItemProcessor(COBBusinessStepService cobBusinessStepService) { + super(cobBusinessStepService); + } + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + setExecutionContext(stepExecution.getExecutionContext()); + setBusinessDate(stepExecution); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemReader.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemReader.java new file mode 100644 index 00000000000..c9515436b43 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemReader.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.converter.COBParameterConverter; +import org.apache.fineract.cob.data.COBParameter; +import org.apache.fineract.cob.resolver.CatchUpFlagResolver; +import org.apache.fineract.portfolio.savings.domain.SavingsAccountRepository; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.lang.NonNull; + +@Slf4j +public class SavingsItemReader extends AbstractSavingsItemReader { + + private final RetrieveSavingsIdService retrieveSavingsIdService; + private final SavingsLockingService savingsLockingService; + + public SavingsItemReader(SavingsAccountRepository savingsAccountRepository, RetrieveSavingsIdService retrieveSavingsIdService, + SavingsLockingService savingsLockingService) { + super(savingsAccountRepository); + this.retrieveSavingsIdService = retrieveSavingsIdService; + this.savingsLockingService = savingsLockingService; + } + + @BeforeStep + public void beforeStep(@NonNull StepExecution stepExecution) { + setRemainingData(filterRemainingData(stepExecution)); + } + + private LinkedBlockingQueue filterRemainingData(StepExecution stepExecution) { + ExecutionContext executionContext = stepExecution.getExecutionContext(); + COBParameter savingsCOBParameter = COBParameterConverter.convert(executionContext.get(SavingsCOBConstant.SAVINGS_COB_PARAMETER)); + boolean isCatchUp = CatchUpFlagResolver.resolve(stepExecution); + + List savingsIds; + if (Objects.isNull(savingsCOBParameter) + || (Objects.isNull(savingsCOBParameter.getMinAccountId()) && Objects.isNull(savingsCOBParameter.getMaxAccountId())) + || (savingsCOBParameter.getMinAccountId().equals(0L) && savingsCOBParameter.getMaxAccountId().equals(0L))) { + savingsIds = Collections.emptyList(); + } else { + savingsIds = new ArrayList<>(retrieveSavingsIdService + .retrieveAllNonClosedSavingsByLastClosedBusinessDateAndMinAndMaxSavingsId(savingsCOBParameter, isCatchUp)); + if (!savingsIds.isEmpty()) { + List lockedIds = savingsLockingService.findLockIdsBySavingsIdInAndLockOwner(savingsIds, + SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + savingsIds.retainAll(lockedIds); + } + } + return new LinkedBlockingQueue<>(savingsIds); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemWriter.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemWriter.java new file mode 100644 index 00000000000..d36bf2fb69c --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsItemWriter.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +public class SavingsItemWriter extends AbstractSavingsItemWriter { + + public SavingsItemWriter(SavingsLockingService savingsLockingService) { + super(savingsLockingService); + } + + @Override + protected SavingsLockOwner getLockOwner() { + return SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsLockingConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsLockingConfiguration.java new file mode 100644 index 00000000000..f25f0c4ef31 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsLockingConfiguration.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import org.apache.fineract.infrastructure.core.config.FineractProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +@Configuration +public class SavingsLockingConfiguration { + + @Autowired + private JdbcTemplate jdbcTemplate; + @Autowired + private FineractProperties fineractProperties; + + @Bean + @ConditionalOnMissingBean(name = "savingsLockingService") + public SavingsLockingService savingsLockingService() { + return new SavingsLockingServiceImpl(jdbcTemplate, fineractProperties); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsLockingServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsLockingServiceImpl.java new file mode 100644 index 00000000000..64f0d735100 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/SavingsLockingServiceImpl.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import java.sql.PreparedStatement; +import java.time.LocalDate; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType; +import org.apache.fineract.infrastructure.core.config.FineractProperties; +import org.apache.fineract.infrastructure.core.service.DateUtils; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; + +@Slf4j +public class SavingsLockingServiceImpl implements SavingsLockingService { + + private static final String TABLE_NAME = "m_savings_account_locks"; + + private static final String BATCH_SAVINGS_LOCK_INSERT = """ + INSERT INTO m_savings_account_locks (savings_id, version, lock_owner, lock_placed_on, lock_placed_on_cob_business_date) VALUES (?,?,?,?,?) + """; + + private static final String BATCH_SAVINGS_LOCK_UPGRADE = """ + UPDATE m_savings_account_locks SET version = version + 1, lock_owner = ?, lock_placed_on = ? WHERE savings_id = ? + """; + + private final JdbcTemplate jdbcTemplate; + private final NamedParameterJdbcTemplate namedParameterJdbcTemplate; + private final FineractProperties fineractProperties; + + public SavingsLockingServiceImpl(JdbcTemplate jdbcTemplate, FineractProperties fineractProperties) { + this.jdbcTemplate = jdbcTemplate; + this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate); + this.fineractProperties = fineractProperties; + } + + @Override + public void applyLock(List savingsIds, SavingsLockOwner lockOwner) { + LocalDate cobBusinessDate = ThreadLocalContextUtil.getBusinessDateByType(BusinessDateType.COB_DATE); + jdbcTemplate.batchUpdate(BATCH_SAVINGS_LOCK_INSERT, savingsIds, savingsIds.size(), (PreparedStatement ps, Long savingsId) -> { + ps.setLong(1, savingsId); + ps.setLong(2, 1); + ps.setString(3, lockOwner.name()); + ps.setObject(4, DateUtils.getAuditOffsetDateTime()); + ps.setObject(5, cobBusinessDate); + }); + } + + @Override + public void upgradeLock(List savingsIds, SavingsLockOwner lockOwner) { + jdbcTemplate.batchUpdate(BATCH_SAVINGS_LOCK_UPGRADE, savingsIds, getInClauseParameterSizeLimit(), (ps, id) -> { + ps.setString(1, lockOwner.name()); + ps.setObject(2, DateUtils.getAuditOffsetDateTime()); + ps.setLong(3, id); + }); + } + + @Override + public void deleteBySavingsIdInAndLockOwner(List savingsIds, SavingsLockOwner lockOwner) { + if (savingsIds.isEmpty()) { + return; + } + String sql = "DELETE FROM " + TABLE_NAME + " WHERE savings_id IN (:ids) AND lock_owner = :owner"; + namedParameterJdbcTemplate.update(sql, Map.of("ids", savingsIds, "owner", lockOwner.name())); + } + + @Override + public List findLockIdsBySavingsIdIn(List savingsIds) { + if (savingsIds.isEmpty()) { + return Collections.emptyList(); + } + String sql = "SELECT savings_id FROM " + TABLE_NAME + " WHERE savings_id IN (:ids)"; + return namedParameterJdbcTemplate.queryForList(sql, Map.of("ids", savingsIds), Long.class); + } + + @Override + public List findLockIdsBySavingsIdInAndLockOwner(List savingsIds, SavingsLockOwner lockOwner) { + if (savingsIds.isEmpty()) { + return Collections.emptyList(); + } + String sql = "SELECT savings_id FROM " + TABLE_NAME + " WHERE savings_id IN (:ids) AND lock_owner = :owner"; + return namedParameterJdbcTemplate.queryForList(sql, Map.of("ids", savingsIds, "owner", lockOwner.name()), Long.class); + } + + @Override + public void updateLockError(Long savingsId, SavingsLockOwner lockOwner, String error, String stacktrace) { + String sql = "UPDATE " + TABLE_NAME + " SET error = ?, stacktrace = ? WHERE savings_id = ? AND lock_owner = ?"; + int updated = jdbcTemplate.update(sql, error, stacktrace, savingsId, lockOwner.name()); + if (updated == 0) { + log.warn("No lock found to update error for savings id: {} with owner: {}", savingsId, lockOwner); + } + } + + private int getInClauseParameterSizeLimit() { + return fineractProperties.getQuery().getInClauseParameterSizeLimit(); + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/StayedLockedSavingsTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/StayedLockedSavingsTasklet.java new file mode 100644 index 00000000000..18f07a2561c --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/StayedLockedSavingsTasklet.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import java.time.LocalDate; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.data.COBIdAndExternalIdAndAccountNo; +import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; +import org.apache.fineract.infrastructure.event.business.service.BusinessEventNotifierService; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; + +@Slf4j +@RequiredArgsConstructor +public class StayedLockedSavingsTasklet implements Tasklet { + + private final BusinessEventNotifierService businessEventNotifierService; + private final RetrieveSavingsIdService retrieveSavingsIdService; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + LocalDate cobBusinessDate = ThreadLocalContextUtil.getBusinessDateByType(BusinessDateType.COB_DATE); + List stayedLocked = retrieveSavingsIdService.findAllStayedLockedByCobBusinessDate(cobBusinessDate); + if (!stayedLocked.isEmpty()) { + businessEventNotifierService.notifyPostBusinessEvent( + new SavingsAccountsStayedLockedBusinessEvent(new SavingsAccountsStayedLockedData(stayedLocked))); + } + return RepeatStatus.FINISHED; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/savings/UnlockProcessedSavingsTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/UnlockProcessedSavingsTasklet.java new file mode 100644 index 00000000000..dff2ad67923 --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/savings/UnlockProcessedSavingsTasklet.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.cob.domain.SavingsAccountLockRepository; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.lang.NonNull; + +@Slf4j +@RequiredArgsConstructor +public class UnlockProcessedSavingsTasklet implements Tasklet { + + private final SavingsAccountLockRepository savingsAccountLockRepository; + + @Override + public RepeatStatus execute(@NonNull StepContribution contribution, @NonNull ChunkContext chunkContext) { + int removedCount = savingsAccountLockRepository.removeOrphanedLocksForProcessedAccounts(); + if (removedCount > 0) { + log.debug("Unlocked {} savings account(s) that completed COB processing but remained locked", removedCount); + } + return RepeatStatus.FINISHED; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/SavingsCOBJobParameterProvider.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/SavingsCOBJobParameterProvider.java new file mode 100644 index 00000000000..42b9c328cbc --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/jobparameterprovider/SavingsCOBJobParameterProvider.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.infrastructure.jobs.service.jobparameterprovider; + +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import org.apache.fineract.cob.savings.SavingsCOBConstant; +import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; +import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO; +import org.apache.fineract.infrastructure.jobs.domain.CustomJobParameterRepository; +import org.apache.fineract.infrastructure.jobs.service.JobName; +import org.apache.fineract.infrastructure.springbatch.SpringBatchJobConstants; +import org.springframework.batch.core.JobParameter; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +@Component +@RequiredArgsConstructor +public class SavingsCOBJobParameterProvider extends AbstractJobParameterProvider { + + private final CustomJobParameterRepository customJobParameterRepository; + + @Override + @Transactional + public Map> provide(Set jobParameterDTOSet) { + Map> jobParameterMap = new HashMap<>(); + Long customJobParameterId = customJobParameterRepository.save(getJobParameterDTOListWithCorrectBusinessDate(jobParameterDTOSet)); + jobParameterMap.put(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY, new JobParameter<>(customJobParameterId, Long.class)); + return jobParameterMap; + } + + @Override + protected List getJobNames() { + return List.of(JobName.SAVINGS_COB.name()); + } + + private Set getJobParameterDTOListWithCorrectBusinessDate(Set jobParameterDTOset) { + Set jobParameterDTOListWithCorrectBusinessDate = jobParameterDTOset.isEmpty() ? new HashSet<>() + : new HashSet<>(jobParameterDTOset); + Optional optionalBusinessDateJobParameter = jobParameterDTOListWithCorrectBusinessDate.stream() + .filter(jobParameterDTO -> SavingsCOBConstant.BUSINESS_DATE_PARAMETER_NAME.equals(jobParameterDTO.getParameterName())) + .findFirst(); + if (optionalBusinessDateJobParameter.isEmpty()) { + jobParameterDTOListWithCorrectBusinessDate.add(new JobParameterDTO(SavingsCOBConstant.BUSINESS_DATE_PARAMETER_NAME, + ThreadLocalContextUtil.getBusinessDateByType(BusinessDateType.COB_DATE).format(DateTimeFormatter.ISO_DATE))); + } + return jobParameterDTOListWithCorrectBusinessDate; + } +} diff --git a/fineract-provider/src/main/resources/application.properties b/fineract-provider/src/main/resources/application.properties index c43cede1517..37e71d4015e 100644 --- a/fineract-provider/src/main/resources/application.properties +++ b/fineract-provider/src/main/resources/application.properties @@ -95,6 +95,15 @@ fineract.partitioned-job.partitioned-job-properties[0].thread-pool-queue-capacit fineract.partitioned-job.partitioned-job-properties[0].retry-limit=${LOAN_COB_RETRY_LIMIT:5} fineract.partitioned-job.partitioned-job-properties[0].poll-interval=${LOAN_COB_POLL_INTERVAL:500} +fineract.partitioned-job.partitioned-job-properties[1].job-name=SAVINGS_COB +fineract.partitioned-job.partitioned-job-properties[1].chunk-size=${SAVINGS_COB_CHUNK_SIZE:100} +fineract.partitioned-job.partitioned-job-properties[1].partition-size=${SAVINGS_COB_PARTITION_SIZE:100} +fineract.partitioned-job.partitioned-job-properties[1].thread-pool-core-pool-size=${SAVINGS_COB_THREAD_POOL_CORE_POOL_SIZE:5} +fineract.partitioned-job.partitioned-job-properties[1].thread-pool-max-pool-size=${SAVINGS_COB_THREAD_POOL_MAX_POOL_SIZE:5} +fineract.partitioned-job.partitioned-job-properties[1].thread-pool-queue-capacity=${SAVINGS_COB_THREAD_POOL_QUEUE_CAPACITY:20} +fineract.partitioned-job.partitioned-job-properties[1].retry-limit=${SAVINGS_COB_RETRY_LIMIT:5} +fineract.partitioned-job.partitioned-job-properties[1].poll-interval=${SAVINGS_COB_POLL_INTERVAL:500} + fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_SPRING_EVENTS_ENABLED:true} fineract.remote-job-message-handler.jms.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_ENABLED:false} fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_QUEUE_NAME:JMS-request-queue} diff --git a/fineract-provider/src/test/java/org/apache/fineract/cob/savings/SavingsLockingServiceImplTest.java b/fineract-provider/src/test/java/org/apache/fineract/cob/savings/SavingsLockingServiceImplTest.java new file mode 100644 index 00000000000..f04f35c9720 --- /dev/null +++ b/fineract-provider/src/test/java/org/apache/fineract/cob/savings/SavingsLockingServiceImplTest.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.sql.PreparedStatement; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType; +import org.apache.fineract.infrastructure.core.config.FineractProperties; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; + +@ExtendWith(MockitoExtension.class) +class SavingsLockingServiceImplTest { + + private static final LocalDate COB_BUSINESS_DATE = LocalDate.parse("2026-06-09"); + + @Mock + private JdbcTemplate jdbcTemplate; + @Mock + private FineractProperties fineractProperties; + @Mock + private FineractProperties.FineractQueryProperties queryProperties; + + private MockedConstruction namedJdbcMock; + private NamedParameterJdbcTemplate namedJdbc; + private SavingsLockingServiceImpl service; + + @BeforeEach + void setUp() { + lenient().when(fineractProperties.getQuery()).thenReturn(queryProperties); + lenient().when(queryProperties.getInClauseParameterSizeLimit()).thenReturn(1000); + namedJdbcMock = mockConstruction(NamedParameterJdbcTemplate.class); + service = new SavingsLockingServiceImpl(jdbcTemplate, fineractProperties); + namedJdbc = namedJdbcMock.constructed().get(0); + + HashMap dates = new HashMap<>(); + dates.put(BusinessDateType.COB_DATE, COB_BUSINESS_DATE); + ThreadLocalContextUtil.setBusinessDates(dates); + } + + @AfterEach + void tearDown() { + namedJdbcMock.close(); + ThreadLocalContextUtil.reset(); + } + + // ---------- applyLock ---------- + + @Test + void applyLockInsertsRowPerSavingsIdWithBusinessDate() throws Exception { + List savingsIds = List.of(10L, 20L); + + service.applyLock(savingsIds, SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + + @SuppressWarnings("unchecked") + ArgumentCaptor> setterCaptor = ArgumentCaptor + .forClass(ParameterizedPreparedStatementSetter.class); + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(jdbcTemplate).batchUpdate(sqlCaptor.capture(), eq(savingsIds), eq(savingsIds.size()), setterCaptor.capture()); + + assertThat(normalize(sqlCaptor.getValue())) + .isEqualTo("INSERT INTO m_savings_account_locks (savings_id, version, lock_owner, lock_placed_on, " + + "lock_placed_on_cob_business_date) VALUES (?,?,?,?,?)"); + + PreparedStatement ps = mock(PreparedStatement.class); + setterCaptor.getValue().setValues(ps, 42L); + verify(ps).setLong(1, 42L); + verify(ps).setLong(2, 1L); + verify(ps).setString(3, "SAVINGS_COB_CHUNK_PROCESSING"); + verify(ps).setObject(eq(4), any()); + verify(ps).setObject(5, COB_BUSINESS_DATE); + } + + // ---------- upgradeLock ---------- + + @Test + void upgradeLockBatchUpdatesWithConfiguredChunkSize() throws Exception { + List savingsIds = List.of(10L, 20L); + when(queryProperties.getInClauseParameterSizeLimit()).thenReturn(500); + + service.upgradeLock(savingsIds, SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + + @SuppressWarnings("unchecked") + ArgumentCaptor> setterCaptor = ArgumentCaptor + .forClass(ParameterizedPreparedStatementSetter.class); + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(jdbcTemplate).batchUpdate(sqlCaptor.capture(), eq(savingsIds), eq(500), setterCaptor.capture()); + + assertThat(normalize(sqlCaptor.getValue())).isEqualTo( + "UPDATE m_savings_account_locks SET version = version + 1, lock_owner = ?, lock_placed_on = ? WHERE savings_id = ?"); + + PreparedStatement ps = mock(PreparedStatement.class); + setterCaptor.getValue().setValues(ps, 99L); + verify(ps).setString(1, "SAVINGS_COB_CHUNK_PROCESSING"); + verify(ps).setObject(eq(2), any()); + verify(ps).setLong(3, 99L); + } + + // ---------- updateLockError ---------- + + @Test + void updateLockErrorRunsParameterisedUpdate() { + when(jdbcTemplate.update(anyString(), any(), any(), any(), any())).thenReturn(1); + + service.updateLockError(77L, SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING, "boom", "stack"); + + verify(jdbcTemplate).update( + "UPDATE m_savings_account_locks SET error = ?, stacktrace = ? WHERE savings_id = ? AND lock_owner = ?", "boom", "stack", + 77L, "SAVINGS_COB_CHUNK_PROCESSING"); + } + + @Test + void updateLockErrorSucceedsSilentlyWhenNoRowMatches() { + when(jdbcTemplate.update(anyString(), any(), any(), any(), any())).thenReturn(0); + + service.updateLockError(77L, SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING, "boom", "stack"); + + verify(jdbcTemplate).update(anyString(), any(), any(), any(), any()); + } + + // ---------- findLockIdsBySavingsIdIn ---------- + + @Test + void findLockIdsBySavingsIdInReturnsEmptyWithoutQueryingWhenInputEmpty() { + List result = service.findLockIdsBySavingsIdIn(List.of()); + + assertThat(result).isEmpty(); + verifyNoInteractions(namedJdbc); + } + + @Test + void findLockIdsBySavingsIdInQueriesByIdList() { + when(namedJdbc.queryForList(anyString(), anyMap(), eq(Long.class))).thenReturn(List.of(10L, 30L)); + + List result = service.findLockIdsBySavingsIdIn(List.of(10L, 20L, 30L)); + + assertThat(result).containsExactly(10L, 30L); + verify(namedJdbc).queryForList("SELECT savings_id FROM m_savings_account_locks WHERE savings_id IN (:ids)", + Map.of("ids", List.of(10L, 20L, 30L)), Long.class); + } + + // ---------- findLockIdsBySavingsIdInAndLockOwner ---------- + + @Test + void findLockIdsBySavingsIdInAndLockOwnerReturnsEmptyWithoutQueryingWhenInputEmpty() { + List result = service.findLockIdsBySavingsIdInAndLockOwner(List.of(), SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + + assertThat(result).isEmpty(); + verifyNoInteractions(namedJdbc); + } + + @Test + void findLockIdsBySavingsIdInAndLockOwnerQueriesByIdAndOwner() { + when(namedJdbc.queryForList(anyString(), anyMap(), eq(Long.class))).thenReturn(List.of(20L)); + + List result = service.findLockIdsBySavingsIdInAndLockOwner(List.of(10L, 20L), + SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + + assertThat(result).containsExactly(20L); + verify(namedJdbc).queryForList( + "SELECT savings_id FROM m_savings_account_locks WHERE savings_id IN (:ids) AND lock_owner = :owner", + Map.of("ids", List.of(10L, 20L), "owner", "SAVINGS_COB_CHUNK_PROCESSING"), Long.class); + } + + // ---------- deleteBySavingsIdInAndLockOwner ---------- + + @Test + void deleteBySavingsIdInAndLockOwnerSkipsQueryWhenInputEmpty() { + service.deleteBySavingsIdInAndLockOwner(List.of(), SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + + verifyNoInteractions(namedJdbc); + } + + @Test + void deleteBySavingsIdInAndLockOwnerIssuesParameterisedDelete() { + service.deleteBySavingsIdInAndLockOwner(List.of(10L, 20L), SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING); + + verify(namedJdbc).update("DELETE FROM m_savings_account_locks WHERE savings_id IN (:ids) AND lock_owner = :owner", + Map.of("ids", List.of(10L, 20L), "owner", "SAVINGS_COB_CHUNK_PROCESSING")); + } + + private String normalize(String sql) { + return sql.replaceAll("\\s+", " ").trim(); + } +} diff --git a/fineract-savings/src/main/java/org/apache/fineract/cob/savings/domain/SavingsAccountLockRepository.java b/fineract-savings/src/main/java/org/apache/fineract/cob/domain/SavingsAccountLockRepository.java similarity index 85% rename from fineract-savings/src/main/java/org/apache/fineract/cob/savings/domain/SavingsAccountLockRepository.java rename to fineract-savings/src/main/java/org/apache/fineract/cob/domain/SavingsAccountLockRepository.java index 052edbfe2fa..38f5148368a 100644 --- a/fineract-savings/src/main/java/org/apache/fineract/cob/savings/domain/SavingsAccountLockRepository.java +++ b/fineract-savings/src/main/java/org/apache/fineract/cob/domain/SavingsAccountLockRepository.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.fineract.cob.savings.domain; +package org.apache.fineract.cob.domain; import java.util.List; import java.util.Optional; @@ -49,4 +49,11 @@ lck.lockOwner in (org.apache.fineract.cob.savings.SavingsLockOwner.SAVINGS_COB_C void removeLockByOwner(); List findAllBySavingsIdInAndLockOwner(List savingsIds, SavingsLockOwner lockOwner); + + @Query(""" + delete from SavingsAccountLock lck where lck.lockPlacedOnCobBusinessDate is not null and lck.error is null and + lck.lockOwner = org.apache.fineract.cob.savings.SavingsLockOwner.SAVINGS_COB_CHUNK_PROCESSING + """) + @Modifying(flushAutomatically = true) + int removeOrphanedLocksForProcessedAccounts(); } diff --git a/fineract-savings/src/main/java/org/apache/fineract/cob/savings/PostInterestForSavingsBusinessStep.java b/fineract-savings/src/main/java/org/apache/fineract/cob/savings/PostInterestForSavingsBusinessStep.java new file mode 100644 index 00000000000..0e4aa40013f --- /dev/null +++ b/fineract-savings/src/main/java/org/apache/fineract/cob/savings/PostInterestForSavingsBusinessStep.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.savings; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService; +import org.apache.fineract.infrastructure.core.service.DateUtils; +import org.apache.fineract.portfolio.savings.domain.SavingsAccount; +import org.apache.fineract.portfolio.savings.service.SavingsAccountWritePlatformService; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class PostInterestForSavingsBusinessStep implements SavingsCOBBusinessStep { + + private final SavingsAccountWritePlatformService savingsAccountWritePlatformService; + private final ConfigurationDomainService configurationDomainService; + + @Override + public SavingsAccount execute(SavingsAccount savingsAccount) { + log.debug("Start PostInterestForSavingsBusinessStep for savings account id [{}]", savingsAccount.getId()); + final boolean backdatedTxnsAllowedTill = configurationDomainService.retrievePivotDateConfig(); + savingsAccountWritePlatformService.postInterest(savingsAccount, false, DateUtils.getBusinessLocalDate(), backdatedTxnsAllowedTill); + log.debug("End PostInterestForSavingsBusinessStep for savings account id [{}]", savingsAccount.getId()); + return savingsAccount; + } + + @Override + public String getEnumStyledName() { + return "POST_INTEREST_FOR_SAVINGS"; + } + + @Override + public String getHumanReadableName() { + return "Post interest for savings"; + } +} diff --git a/fineract-savings/src/main/java/org/apache/fineract/cob/savings/SavingsLockingService.java b/fineract-savings/src/main/java/org/apache/fineract/cob/savings/SavingsLockingService.java index 2ec25e701fd..c4b50224fee 100644 --- a/fineract-savings/src/main/java/org/apache/fineract/cob/savings/SavingsLockingService.java +++ b/fineract-savings/src/main/java/org/apache/fineract/cob/savings/SavingsLockingService.java @@ -22,15 +22,15 @@ public interface SavingsLockingService { + void applyLock(List savingsIds, SavingsLockOwner lockOwner); + void upgradeLock(List accountsToLock, SavingsLockOwner lockOwner); void deleteBySavingsIdInAndLockOwner(List savingsIds, SavingsLockOwner lockOwner); - List findAllBySavingsIdIn(List savingsIds); + List findLockIdsBySavingsIdIn(List savingsIds); - SavingsAccountLock findBySavingsIdAndLockOwner(Long savingsId, SavingsLockOwner lockOwner); + List findLockIdsBySavingsIdInAndLockOwner(List savingsIds, SavingsLockOwner lockOwner); - List findAllBySavingsIdInAndLockOwner(List savingsIds, SavingsLockOwner lockOwner); - - void applyLock(List savingsIds, SavingsLockOwner lockOwner); + void updateLockError(Long savingsId, SavingsLockOwner lockOwner, String error, String stacktrace); } diff --git a/fineract-savings/src/main/java/org/apache/fineract/portfolio/savings/domain/SavingsAccountRepository.java b/fineract-savings/src/main/java/org/apache/fineract/portfolio/savings/domain/SavingsAccountRepository.java index 07ca0d4c63f..c46622c8686 100644 --- a/fineract-savings/src/main/java/org/apache/fineract/portfolio/savings/domain/SavingsAccountRepository.java +++ b/fineract-savings/src/main/java/org/apache/fineract/portfolio/savings/domain/SavingsAccountRepository.java @@ -22,6 +22,7 @@ import java.time.LocalDate; import java.util.Collection; import java.util.List; +import org.apache.fineract.cob.data.COBIdAndExternalIdAndAccountNo; import org.apache.fineract.cob.data.COBIdAndLastClosedBusinessDate; import org.apache.fineract.infrastructure.core.domain.ExternalId; import org.apache.fineract.portfolio.savings.data.SavingsAccrualData; @@ -147,4 +148,7 @@ WHERE sa.status IN (100, 200, 300, 303, 304) ORDER BY sa.lastClosedBusinessDate ASC """) List findAllSavingsIdsOldestCobProcessed(); + + @Query("select sa.id, sa.externalId, sa.accountNumber from SavingsAccountLock lock left join SavingsAccount sa on lock.savingsId = sa.id where lock.lockPlacedOnCobBusinessDate = :cobBusinessDate") + List findAllStayedLockedByCobBusinessDate(@Param("cobBusinessDate") LocalDate cobBusinessDate); } diff --git a/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/module-changelog-master.xml b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/module-changelog-master.xml index 887b310e3d3..3ac73d0abea 100644 --- a/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/module-changelog-master.xml +++ b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/module-changelog-master.xml @@ -28,4 +28,7 @@ + + + diff --git a/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2006_add_last_cob_business_date_to_savings_account.xml b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2006_add_last_cob_business_date_to_savings_account.xml new file mode 100644 index 00000000000..4ce526e97cf --- /dev/null +++ b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2006_add_last_cob_business_date_to_savings_account.xml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2007_add_savings_cob_job_data.xml b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2007_add_savings_cob_job_data.xml new file mode 100644 index 00000000000..4d16df26f2e --- /dev/null +++ b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2007_add_savings_cob_job_data.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2008_add_savings_cob_business_steps.xml b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2008_add_savings_cob_business_steps.xml new file mode 100644 index 00000000000..ddafb3822cf --- /dev/null +++ b/fineract-savings/src/main/resources/db/changelog/tenant/module/savings/parts/parts/2008_add_savings_cob_business_steps.xml @@ -0,0 +1,32 @@ + + + + + + + + + + +