HDDS-14913. Implement Scalable CSV Export for Unhealthy Containers in Recon UI.#10162
HDDS-14913. Implement Scalable CSV Export for Unhealthy Containers in Recon UI.#10162ArafatKhan2198 wants to merge 8 commits intoapache:masterfrom
Conversation
|
@ArafatKhan2198 as discussed, please design the solution server based for single Recon user. We don't have user based logins in Recon. We should not localize the logic at browser for job progress. All browser windows opened in multiple machines opening the recon page should see the same job and its progress. At a time only job should be allowed to run and remaining 2 should go in queue. |
sumitagrawl
left a comment
There was a problem hiding this comment.
@ArafatKhan2198 Thanks for working, given few comments
| private long estimatedTotal; | ||
|
|
||
| @JsonProperty("filePath") | ||
| private String filePath; |
There was a problem hiding this comment.
exposing internal file path may have security risk, we should not return file path
| */ | ||
| public static final String OZONE_RECON_EXPORT_DIRECTORY = | ||
| "ozone.recon.export.directory"; | ||
| public static final String OZONE_RECON_EXPORT_DIRECTORY_DEFAULT = "/tmp/recon/exports"; |
There was a problem hiding this comment.
we should avoid tmp path, and keep the same Recon metapath with export
| LOG.error("Failed to create export directory: {}", exportDirectory, e); | ||
| } | ||
|
|
||
| LOG.info("ExportJobManager initialized with single-threaded queue (max {} jobs)", MAX_QUEUE_SIZE); |
There was a problem hiding this comment.
restart of Recon will leave this file as orphan, IMO, we should able to get those jobs again via UI
| LOG.error("Failed to create export directory: {}", exportDirectory, e); | ||
| } | ||
|
|
||
| LOG.info("ExportJobManager initialized with single-threaded queue (max {} jobs)", MAX_QUEUE_SIZE); |
There was a problem hiding this comment.
For failed job files remaining over disk, we should remove it, can clean failed jobs
| LOG.info("ExportJobManager initialized with single-threaded queue (max {} jobs)", MAX_QUEUE_SIZE); | ||
| } | ||
|
|
||
| public synchronized String submitJob(String userId, String state, int limit, long prevKey) { |
| ExportJob job = new ExportJob(jobId, userId, state, limit, prevKey); | ||
| // Filename format: export_{state}_{userId}_{shortJobId}.tar | ||
| String shortJobId = jobId.substring(0, 8); | ||
| String filePath = exportDirectory + "/export_" + state.toLowerCase() + "_" + userId + "_" + shortJobId + ".tar"; |
There was a problem hiding this comment.
how are you ensuring file are unique ? why you need uniqueness ? may be timestamp can be added
| int fileIndex = 1; | ||
| long totalRecords = 0; | ||
| long recordsInCurrentFile = 0; | ||
| final int CHUNK_SIZE = 500_000; |
| } | ||
| } | ||
|
|
||
| private void deleteDirectory(Path directory) { |
There was a problem hiding this comment.
reuse exisiting directory delete recursive
| } finally { | ||
| // 3-second cooldown before the next queued job is picked up by the single worker thread. | ||
| try { | ||
| Thread.sleep(3000); |
There was a problem hiding this comment.
This may not provide any advantage for sleep, as tar logic already provide some delay where waiting task can get lock and proceed.
Also max download file count will help avoid this.
| replicaMismatchCount: number; | ||
| } | ||
|
|
||
| export type ExportJobStatus = 'QUEUED' | 'RUNNING' | 'COMPLETED' | 'FAILED'; |
There was a problem hiding this comment.
UI related,
- Download button should not show filename, it can be downloaded
- Active Export / Completed Export: can be combined, and add fields, submitted time, started time.
- DELETE -- should act as cancel and/or deleted completed jobs ?
devmadhuu
left a comment
There was a problem hiding this comment.
Thanks @ArafatKhan2198 for improving the patch. However few comments, pls check.
Also I am not sure of any cleanup or TTL for completed jobs. How these exported files will be cleaned up, what is the lifecycle ? They can continue to accumulate indefinitely ?
| } | ||
|
|
||
| // Check global queue size limit | ||
| synchronized (jobQueue) { |
There was a problem hiding this comment.
This is very confusing. This method acquires this object lock, then lock over jobQueue. Any other thread that acquires jobQueue first and then tries to call a synchronized method creates a deadlock condition.
There was a problem hiding this comment.
Pls check this still not solved.
There was a problem hiding this comment.
Deadlock nested synchronized(this) + synchronized(jobQueue) Fixed. Removed synchronized from submitJob and moved all queue checks and mutations into a single synchronized(jobQueue) block. One lock everywhere, no nesting, no possible lock-order deadlock.
| @Singleton | ||
| public class ExportJobManager { | ||
| private static final Logger LOG = LoggerFactory.getLogger(ExportJobManager.class); | ||
| private static final int MAX_QUEUE_SIZE = 4; |
There was a problem hiding this comment.
Better put a comment here, why hardcoded as 4 ?
There was a problem hiding this comment.
Replaced with maxQueueSize field read from ozone.recon.export.max.jobs.total config (default 4). Javadoc explains the choice: single-threaded worker, ~5 unhealthy states, one-TAR-per-state rule.
| */ | ||
| public static final String OZONE_RECON_EXPORT_MAX_JOBS_TOTAL = | ||
| "ozone.recon.export.max.jobs.total"; | ||
| public static final int OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT = 10; |
There was a problem hiding this comment.
Are these used anywhere ? Also contradicts with your thread queue size ?
There was a problem hiding this comment.
The constant is now actively read in ExportJobManager constructor and wired to maxQueueSize. Default updated to 4 to match the design.
| * Default: 1 | ||
| */ | ||
| public static final String OZONE_RECON_EXPORT_WORKER_THREADS = | ||
| "ozone.recon.export.worker.threads"; |
There was a problem hiding this comment.
Removed the constant entirely. Export is intentionally single-threaded to avoid concurrent Derby access a worker-threads config would be misleading.
| * Manages asynchronous CSV export jobs. | ||
| */ | ||
| @Singleton | ||
| public class ExportJobManager { |
There was a problem hiding this comment.
Add some unit tests for this class
There was a problem hiding this comment.
Unit tests for ExportJobManager Added TestExportJobManager covering: submit success, empty result, duplicate-state (running + completed), failed-state retry, queue-full, cancel running, cancel completed, unknown job, queue position, startup cleanup, and filename pattern. Plus TestExportJob for the download counter and path derivation.
|
|
||
| // Controls how many rows Derby returns per JDBC round-trip. | ||
| // Default is 10,000 rows. | ||
| query.fetchSize(10000); |
There was a problem hiding this comment.
This is hardcoded again. In old PR , it was fixed.
There was a problem hiding this comment.
fetchSize(10000) hardcoded Fixed. Now reads from ozone.recon.unhealthy.container.fetch.size (default 10,000) wired in ContainerHealthSchemaManager constructor via OzoneConfiguration.
| * @param prevKey Container ID offset for cursor-based pagination | ||
| * @return Total count of matching containers | ||
| */ | ||
| public long getUnhealthyContainersCount( |
There was a problem hiding this comment.
Check javadoc above this method. Seems something wrong.
| this.totalRecords = totalRecords; | ||
| } | ||
|
|
||
| public void incrementTotalRecords() { |
There was a problem hiding this comment.
Not sure, what is the purpose of this. The current code passes a local long totalRecords counter and calls setTotalRecords on every row. Using incrementTotalRecords() removes the local counter
|
@devmadhuu @sumitagrawl please take another look |
devmadhuu
left a comment
There was a problem hiding this comment.
@ArafatKhan2198 Few comments still unresolved. pls check.
| private int maxDownloads; | ||
|
|
||
| @JsonProperty("downloadCount") | ||
| private int downloadCount; |
There was a problem hiding this comment.
We should make this to AtomicInteger to avoid any race conditions.
There was a problem hiding this comment.
Replaced int downloadCount with AtomicInteger and introduced a single tryReserveDownload() method that atomically checks and increments in one CAS loop, so concurrent download requests can't race past the limit.
| final int RECORDS_PER_FILE = 500_000; | ||
|
|
||
| BufferedWriter writer = null; | ||
| FileOutputStream fos = null; |
There was a problem hiding this comment.
Better use fos in try finally block to avoid any resource leak.
There was a problem hiding this comment.
fos in try/finally Fixed. Added an inner try/finally around the BufferedWriter construction if wrapping fails, fos.close() is called immediately so no file descriptor leaks.
What changes were proposed in this pull request?
The Recon UI had no way for administrators to export unhealthy container data (Missing, Under-Replicated, Over-Replicated, etc.) at scale. For clusters with millions of containers, any streaming export over a long-running HTTP connection would be killed by network infrastructure (firewalls, load balancers, proxies) before completion.
Solution: Asynchronous Background Export with Queue
Instead of streaming data directly to the browser, this PR implements a server-side background job system that:
Backend Changes
New:
ExportJobmodel (ExportJob.java)A data class representing one export job with fields:
jobId(UUID),userId,state(container state),status(QUEUED → RUNNING → COMPLETED/FAILED)queuePosition,totalRecords,estimatedTotal,progressPercentfilePath(path to TAR on disk),submittedAt,startedAt,completedAt,errorMessageNew:
ExportJobManager.java— the core engineA Guice Singleton that runs for the lifetime of the Recon server:
part001.csv,part002.csv)Archiver.create()intoexport_{state}_{userId}_{shortJobId}.tarCOUNT(*)before the cursor opens to calculateestimatedTotal;totalRecordsincrements livesubmitJob()— prevents race conditions when multiple users submit simultaneouslygetQueuePosition()— walksLinkedHashMap(insertion-order) to return 1-indexed positionContainerEndpoint.java— new REST endpointsQueue-full (429) errors return JSON instead of Jetty's HTML error page.
ContainerHealthSchemaManager.javagetUnhealthyContainersCursor()— jOOQ lazy cursor for streaming DB records without holding them all in JVM heapgetUnhealthyContainersCount()— fastCOUNT(*)used before the cursor opens for progress estimationReconServerConfigKeys.javaNew config keys:
ozone.recon.export.worker.threads(default: 1)ozone.recon.export.directory(default:/tmp/recon/exports)ozone.recon.export.max.jobs.total(default: 10)Frontend Changes (
containers.tsx,container.types.ts)New: Export Tab (tab key
'6')A dedicated Export tab is added to the Containers page alongside Missing, Under-Replicated, etc. It contains:
Submit Controls:
Active Exports table (hidden when empty):
#1,#2...), Progress bar + record countCompleted Exports table (always visible, paginated):
MMM D, HH:mm:ssPolling:
setInterval+useRef— starts when Export tab is opened or a job is submittedError handling:
- 429 queue-full error shows a 6-second toast with the specific message
- All errors show clean messages (no raw HTML from Jetty)
- Guard in
## What is the link to the Apache JIRAfetchTabDatapreventsundefinedAPI calls when Export tab is activehttps://issues.apache.org/jira/browse/HDDS-14913
How was this patch tested?
Log Changes -
CSV_Export_Feature.mp4