diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index e21bcc7df22e..3285d48115b7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -55,13 +55,7 @@ public class CloseContainerEventHandler implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger(CloseContainerEventHandler.class); - - private final PipelineManager pipelineManager; - private final ContainerManager containerManager; - private final SCMContext scmContext; - - private final LeaseManager leaseManager; - private final long timeout; + private final CloseContainerContext context; public CloseContainerEventHandler( final PipelineManager pipelineManager, @@ -69,34 +63,35 @@ public CloseContainerEventHandler( final SCMContext scmContext, @Nullable LeaseManager leaseManager, final long timeout) { - this.pipelineManager = pipelineManager; - this.containerManager = containerManager; - this.scmContext = scmContext; - this.leaseManager = leaseManager; - this.timeout = timeout; + context = new CloseContainerContext(pipelineManager, containerManager, scmContext, leaseManager, timeout); } @Override public void onMessage(ContainerID containerID, EventPublisher publisher) { - if (!scmContext.isLeader()) { + if (!context.getScmContext().isLeader()) { LOG.info("Skip close container {} since current SCM is not leader.", containerID); return; } + closeContainer(containerID, publisher, context); + } + + public static void closeContainer( + ContainerID containerID, EventPublisher publisher, CloseContainerContext context) { try { LOG.info("Close container Event triggered for container : {}, " + "current state: {}", containerID, - containerManager.getContainer(containerID).getState()); + context.getContainerManager().getContainer(containerID).getState()); // If the container is in OPEN state, FINALIZE it. - if (containerManager.getContainer(containerID).getState() + if (context.getContainerManager().getContainer(containerID).getState() == LifeCycleState.OPEN) { - containerManager.updateContainerState( + context.getContainerManager().updateContainerState( containerID, LifeCycleEvent.FINALIZE); } // ContainerInfo has to read again after the above state change. - final ContainerInfo container = containerManager + final ContainerInfo container = context.getContainerManager() .getContainer(containerID); // Send close command to datanodes, if the container is in CLOSING state if (container.getState() == LifeCycleState.CLOSING) { @@ -111,13 +106,13 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { } SCMCommand command = new CloseContainerCommand( containerID.getId(), container.getPipelineID(), force); - command.setTerm(scmContext.getTermOfLeader()); - command.setEncodedToken(getContainerToken(containerID)); + command.setTerm(context.getScmContext().getTermOfLeader()); + command.setEncodedToken(getContainerToken(containerID, context.getScmContext())); - if (null != leaseManager) { + if (null != context.getLeaseManager()) { try { - leaseManager.acquire(command, timeout, () -> triggerCloseCallback( - publisher, container, command)); + context.getLeaseManager().acquire(command, context.getTimeout(), () -> triggerCloseCallback( + publisher, container, command, context)); } catch (LeaseAlreadyExistException ex) { LOG.debug("Close container {} in {} state already in queue.", containerID, container.getState()); @@ -126,7 +121,7 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { } } else { // case of recon, lease manager will be null, trigger event directly - triggerCloseCallback(publisher, container, command); + triggerCloseCallback(publisher, container, command, context); } } else { LOG.debug("Cannot close container {}, which is in {} state.", @@ -153,16 +148,16 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { * @return Void * @throws ContainerNotFoundException */ - private Void triggerCloseCallback( - EventPublisher publisher, ContainerInfo container, SCMCommand command) + private static Void triggerCloseCallback( + EventPublisher publisher, ContainerInfo container, SCMCommand command, CloseContainerContext context) throws ContainerNotFoundException { - getNodes(container).forEach(node -> + getNodes(container, context).forEach(node -> publisher.fireEvent(DATANODE_COMMAND, new CommandForDatanode<>(node, command))); return null; } - private String getContainerToken(ContainerID containerID) { + private static String getContainerToken(ContainerID containerID, SCMContext scmContext) { if (scmContext.getScm() instanceof StorageContainerManager) { StorageContainerManager scm = (StorageContainerManager) scmContext.getScm(); @@ -178,16 +173,56 @@ private String getContainerToken(ContainerID containerID) { * @return list of DatanodeDetails * @throws ContainerNotFoundException */ - private List getNodes(final ContainerInfo container) + private static List getNodes(final ContainerInfo container, CloseContainerContext context) throws ContainerNotFoundException { try { - return pipelineManager.getPipeline(container.getPipelineID()).getNodes(); + return context.getPipelineManager().getPipeline(container.getPipelineID()).getNodes(); } catch (PipelineNotFoundException ex) { // Use container replica if the pipeline is not available. - return containerManager.getContainerReplicas(container.containerID()) + return context.getContainerManager().getContainerReplicas(container.containerID()) .stream() .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); } } + + public static class CloseContainerContext { + private final PipelineManager pipelineManager; + private final ContainerManager containerManager; + private final SCMContext scmContext; + + private final LeaseManager leaseManager; + private final long timeout; + + + public CloseContainerContext( + PipelineManager pipelineManager, ContainerManager containerManager, SCMContext scmContext, + LeaseManager leaseManager, long timeout) { + this.pipelineManager = pipelineManager; + this.containerManager = containerManager; + this.scmContext = scmContext; + this.leaseManager = leaseManager; + this.timeout = timeout; + } + + public PipelineManager getPipelineManager() { + return pipelineManager; + } + + public ContainerManager getContainerManager() { + return containerManager; + } + + public SCMContext getScmContext() { + return scmContext; + } + + public LeaseManager getLeaseManager() { + return leaseManager; + } + + public long getTimeout() { + return timeout; + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index 9d2ced1bea30..9ba722f180e6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -89,7 +89,7 @@ private void processPipelineAction(final DatanodeDetails datanode, LOG.info(logMsg); try { if (action == PipelineAction.Action.CLOSE) { - pipelineManager.closePipeline(pid); + pipelineManager.closePipelineAsError(pid, info); } else { LOG.error("Received unknown pipeline action {}, for pipeline {} ", action, pid); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 6a448d6c88df..4bf87d9e23e5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.utils.db.CodecException; @@ -113,6 +114,9 @@ void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) void closePipeline(PipelineID pipelineID) throws IOException; + void closePipelineAsError(PipelineID pipelineID, StorageContainerDatanodeProtocolProtos.ClosePipelineInfo info) + throws IOException; + void deletePipeline(PipelineID pipelineID) throws IOException; void closeStalePipelines(DatanodeDetails datanodeDetails); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..c1db78cf026f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -44,8 +44,10 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReplica; @@ -58,12 +60,14 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.db.CodecException; import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; @@ -470,7 +474,7 @@ protected void removePipeline(Pipeline pipeline) * @param pipelineId - ID of the pipeline. * @throws IOException */ - private void closeContainersForPipeline(final PipelineID pipelineId) + private void closeContainersForPipeline(final PipelineID pipelineId, boolean async) throws IOException { Set containerIDs = stateManager.getContainers(pipelineId); ContainerManager containerManager = scmContext.getScm() @@ -485,8 +489,17 @@ private void closeContainersForPipeline(final PipelineID pipelineId) throw new IOException(ex); } } - eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); - LOG.info("Container {} closed for pipeline={}", containerID, pipelineId); + + if (async) { + eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); + LOG.info("Trigger Container {} closed for pipeline={}", containerID, pipelineId); + } + else { + CloseContainerEventHandler.CloseContainerContext closeContainerContext + = new CloseContainerEventHandler.CloseContainerContext(this, containerManager, scmContext, null, 0); + CloseContainerEventHandler.closeContainer(containerID, eventPublisher, closeContainerContext); + LOG.info("Container {} closed for pipeline={}", containerID, pipelineId); + } } } @@ -499,7 +512,7 @@ private void closeContainersForPipeline(final PipelineID pipelineId) public void closePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); // close containers. - closeContainersForPipeline(pipelineID); + closeContainersForPipeline(pipelineID, true); if (!getPipeline(pipelineID).isClosed()) { acquireWriteLock(); try { @@ -515,6 +528,51 @@ public void closePipeline(PipelineID pipelineID) throws IOException { } + /** + * Move the Pipeline to CLOSED state. + * @param pipelineID ID of the Pipeline to be closed + * @param info ClosePipelineInfo containing the reason for closing the pipeline + * @throws IOException In case of exception while closing the Pipeline + */ + @Override + public void closePipelineAsError( + PipelineID pipelineID, StorageContainerDatanodeProtocolProtos.ClosePipelineInfo info) throws IOException { + HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); + // close containers. + closeContainersForPipeline(pipelineID, false); + if (!getPipeline(pipelineID).isClosed()) { + acquireWriteLock(); + try { + stateManager.updatePipelineState(pipelineIDProtobuf, + HddsProtos.PipelineState.PIPELINE_CLOSED); + } finally { + releaseWriteLock(); + } + if (null != scmContext.getScm().getLeaseManager()) { + try { + // wait for 2HB interval after CLOSE Container for closing pipeline + long pipelineCloseWait = HddsServerUtil.getScmHeartbeatInterval(conf) * 2; + LOG.info("Pipeline {} with status {} moved to CLOSED state, removing in {}ms", pipelineID, info.getReason(), + pipelineCloseWait); + scmContext.getScm().getLeaseManager().acquire(pipelineID, pipelineCloseWait, () -> { + LOG.info("Scrubbing pipeline: id: {} in CLOSED stage.", pipelineID.getId()); + deletePipeline(pipelineID); + return null; + }); + } catch (LeaseAlreadyExistException ex) { + LOG.debug("Close Pipeline {} delete already in queue.", pipelineID); + } catch (Exception ex) { + LOG.error("Error while scheduling pipeline removal", ex); + } + } else { + LOG.info("Pipeline {} with status moved to CLOSED state", pipelineID, info.getReason()); + } + } + + metrics.removePipelineMetrics(pipelineID); + + } + /** * Deletes the Pipeline for the given PipelineID. * @param pipelineID ID of the Pipeline to be deleted diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java index 31230f071d59..6f255e7bfa03 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.ozone.lease.LeaseManager; /** * Interface for the SCM Facade class that can be used by a passive SCM like @@ -68,4 +69,8 @@ public interface OzoneStorageContainerManager { SCMHAManager getScmHAManager(); SequenceIdGenerator getSequenceIdGen(); + + default LeaseManager getLeaseManager() { + return null; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 696816b85a24..cdad93b92979 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -2033,6 +2033,9 @@ public SCMServiceManager getSCMServiceManager() { return serviceManager; } + public LeaseManager getLeaseManager() { + return leaseManager; + } /** * Force SCM out of safe mode. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index d6a3fc546352..2785e0135f36 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; @@ -238,6 +239,12 @@ public void closePipeline(final PipelineID pipelineId) HddsProtos.PipelineState.PIPELINE_CLOSED); } + @Override + public void closePipelineAsError( + final PipelineID pipelineId, StorageContainerDatanodeProtocolProtos.ClosePipelineInfo info) throws IOException { + closePipeline(pipelineId); + } + @Override public void deletePipeline(PipelineID pipelineID) { }