Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private Response getContainerEndpointResponse(long containerId) {
null, // ContainerHealthSchemaManager - not needed for this test
recon.getReconServer().getReconNamespaceSummaryManager(),
recon.getReconServer().getReconContainerMetadataManager(),
omMetadataManagerInstance);
omMetadataManagerInstance, null);
return containerEndpoint.getKeysForContainer(containerId, 10, "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.recon.api.ExportJobManager;
import org.apache.hadoop.ozone.recon.heatmap.HeatMapServiceImpl;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration;
Expand Down Expand Up @@ -110,6 +111,7 @@ protected void configure() {
bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class);

bind(ContainerHealthSchemaManager.class).in(Singleton.class);
bind(ExportJobManager.class).in(Singleton.class);
bind(ReconContainerMetadataManager.class)
.to(ReconContainerMetadataManagerImpl.class).in(Singleton.class);
bind(ReconFileMetadataManager.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,45 @@ public final class ReconServerConfigKeys {
"ozone.recon.scm.container.id.batch.size";
public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000;

/**
* JDBC fetch size for CSV exports.
* Default: 10,000 rows per fetch
*/
public static final String OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE =
"ozone.recon.unhealthy.container.fetch.size";
public static final int OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT = 10_000;

/**
* Max export jobs that can sit in the queue (waiting + executing) at once.
* Submissions beyond this limit are rejected with HTTP 429.
* Kept small because export is single-threaded and the unhealthy-container
* states it can be invoked for are bounded (~5).
* Default: 4
*/
public static final String OZONE_RECON_EXPORT_MAX_JOBS_TOTAL =
"ozone.recon.export.max.jobs.total";
public static final int OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT = 4;

/**
* Directory to store export CSV files.
* Default: /tmp/recon/exports
*/
public static final String OZONE_RECON_EXPORT_DIRECTORY =
"ozone.recon.export.directory";

// Default is resolved at runtime as {ozone.recon.db.dir}/exports.
// Empty string signals ExportJobManager to compute the path dynamically.
public static final String OZONE_RECON_EXPORT_DIRECTORY_DEFAULT = "";

/**
* Maximum number of times a completed export TAR file can be downloaded.
* Prevents repeated downloads from filling up network bandwidth or being misused.
* Default: 3
*/
public static final String OZONE_RECON_EXPORT_MAX_DOWNLOADS =
"ozone.recon.export.max.downloads";
public static final int OZONE_RECON_EXPORT_MAX_DOWNLOADS_DEFAULT = 3;

/**
* Private constructor for utility class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_MIN_CONTAINER_ID;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
Expand All @@ -39,15 +42,18 @@
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand All @@ -67,6 +73,7 @@
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.ContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.DeletedContainerInfo;
import org.apache.hadoop.ozone.recon.api.types.ExportJob;
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata.ContainerBlockMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeysResponse;
Expand All @@ -75,6 +82,7 @@
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary;
import org.apache.hadoop.ozone.recon.api.ExportJobManager;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.persistence.ContainerHistory;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
Expand Down Expand Up @@ -104,6 +112,7 @@ public class ContainerEndpoint {
private final ContainerHealthSchemaManager containerHealthSchemaManager;
private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
private final OzoneStorageContainerManager reconSCM;
private final ExportJobManager exportJobManager;
private static final Logger LOG =
LoggerFactory.getLogger(ContainerEndpoint.class);
private BucketLayout layout = BucketLayout.DEFAULT;
Expand Down Expand Up @@ -145,7 +154,8 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM,
ContainerHealthSchemaManager containerHealthSchemaManager,
ReconNamespaceSummaryManager reconNamespaceSummaryManager,
ReconContainerMetadataManager reconContainerMetadataManager,
ReconOMMetadataManager omMetadataManager) {
ReconOMMetadataManager omMetadataManager,
ExportJobManager exportJobManager) {
this.containerManager =
(ReconContainerManager) reconSCM.getContainerManager();
this.pipelineManager = reconSCM.getPipelineManager();
Expand All @@ -154,6 +164,7 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM,
this.reconSCM = reconSCM;
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.omMetadataManager = omMetadataManager;
this.exportJobManager = exportJobManager;
}

/**
Expand Down Expand Up @@ -502,6 +513,159 @@ public Response getUnhealthyContainers(
minContainerId);
}

/**
* List all export jobs tracked by the server (any status).
*
* @return Response containing a list of ExportJob objects
*/
@GET
@Path("/unhealthy/export")
@Produces(MediaType.APPLICATION_JSON)
public Response listExportJobs() {
List<ExportJob> jobs = exportJobManager.getAllJobs();
for (ExportJob job : jobs) {
if (job.getStatus() == ExportJob.JobStatus.QUEUED) {
job.setQueuePosition(exportJobManager.getQueuePosition(job.getJobId()));
}
}
return Response.ok(jobs).build();
}

/**
* Start an async CSV export job for unhealthy containers.
* Returns immediately with a job ID that the client can poll.
*
* @param state The container state (required: MISSING, UNDER_REPLICATED, etc.)
* @return Response containing ExportJob with jobId
*/
@POST
@Path("/unhealthy/export")
@Produces(MediaType.APPLICATION_JSON)
public Response startExport(@QueryParam("state") String state) {

if (StringUtils.isEmpty(state)) {
throw new WebApplicationException("state query parameter is required",
Response.Status.BAD_REQUEST);
}

// Validate state parameter
try {
ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state);
} catch (IllegalArgumentException e) {
throw new WebApplicationException("Invalid state: " + state, Response.Status.BAD_REQUEST);
}

try {
String jobId = exportJobManager.submitJob(state);
ExportJob job = exportJobManager.getJob(jobId);
return Response.ok(job).build();
} catch (IllegalStateException e) {
// Return JSON error response instead of HTML
Map<String, String> errorResponse = new HashMap<>();
errorResponse.put("error", "Too Many Requests");
errorResponse.put("message", e.getMessage());
return Response.status(Response.Status.TOO_MANY_REQUESTS)
.entity(errorResponse)
.type(MediaType.APPLICATION_JSON)
.build();
}
}

/**
* Get the status of an export job.
*
* @param jobId The job ID returned by startExport
* @return Response containing the ExportJob with current status/progress
*/
@GET
@Path("/unhealthy/export/{jobId}")
@Produces(MediaType.APPLICATION_JSON)
public Response getExportStatus(@PathParam("jobId") String jobId) {
ExportJob job = exportJobManager.getJob(jobId);
if (job == null) {
throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND);
}

// Calculate and set queue position if QUEUED
if (job.getStatus() == ExportJob.JobStatus.QUEUED) {
int position = exportJobManager.getQueuePosition(jobId);
job.setQueuePosition(position);
}

return Response.ok(job).build();
}

/**
* Download a completed export TAR file.
*
* @param jobId The job ID
* @return Response with TAR file stream
*/
@GET
@Path("/unhealthy/export/{jobId}/download")
@Produces("application/x-tar")
public Response downloadExport(@PathParam("jobId") String jobId) {
ExportJob job = exportJobManager.getJob(jobId);
if (job == null) {
throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND);
}
if (job.getStatus() != ExportJob.JobStatus.COMPLETED) {
throw new WebApplicationException("Job not completed yet", Response.Status.CONFLICT);
}

File file = new File(job.getFilePath());
if (!file.exists()) {
throw new WebApplicationException("Export file not found", Response.Status.NOT_FOUND);
}

if (!job.tryReserveDownload()) {
Map<String, String> errorResponse = new java.util.HashMap<>();
errorResponse.put("error", "Download limit reached");
errorResponse.put("message", "This export has reached its maximum download limit of "
+ job.getMaxDownloads() + ".");
return Response.status(Response.Status.TOO_MANY_REQUESTS)
.entity(errorResponse)
.type(MediaType.APPLICATION_JSON)
.build();
}

LOG.info("Download {} of {} for job {}", job.getDownloadCount(), job.getMaxDownloads(), jobId);

StreamingOutput stream = outputStream -> {
try (FileInputStream fis = new FileInputStream(file);
BufferedOutputStream bos = new BufferedOutputStream(outputStream, 256 * 1024)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
bos.write(buffer, 0, bytesRead);
}
bos.flush();
}
};

return Response.ok(stream)
.header("Content-Disposition", "attachment; filename=\"" + job.getFileName() + "\"")
.header("Content-Type", "application/x-tar")
.build();
}

/**
* Cancel a running export job.
*
* @param jobId The job ID
* @return Response with 200 if successful
*/
@DELETE
@Path("/unhealthy/export/{jobId}")
public Response cancelExport(@PathParam("jobId") String jobId) {
try {
exportJobManager.cancelJob(jobId);
return Response.ok().build();
} catch (IllegalStateException e) {
throw new WebApplicationException(e.getMessage(), Response.Status.NOT_FOUND);
}
}

/**
* This API will return all DELETED containers in SCM in below JSON format.
* {
Expand Down
Loading
Loading