Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,48 +55,43 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {

private static final Logger LOG =
LoggerFactory.getLogger(CloseContainerEventHandler.class);

private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private final SCMContext scmContext;

private final LeaseManager<Object> leaseManager;
private final long timeout;
private final CloseContainerContext context;

public CloseContainerEventHandler(
final PipelineManager pipelineManager,
final ContainerManager containerManager,
final SCMContext scmContext,
@Nullable LeaseManager<Object> leaseManager,
final long timeout) {
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
this.scmContext = scmContext;
this.leaseManager = leaseManager;
this.timeout = timeout;
context = new CloseContainerContext(pipelineManager, containerManager, scmContext, leaseManager, timeout);
}

@Override
public void onMessage(ContainerID containerID, EventPublisher publisher) {
if (!scmContext.isLeader()) {
if (!context.getScmContext().isLeader()) {
LOG.info("Skip close container {} since current SCM is not leader.",
containerID);
return;
}

closeContainer(containerID, publisher, context);
}

public static void closeContainer(
ContainerID containerID, EventPublisher publisher, CloseContainerContext context) {
try {
LOG.info("Close container Event triggered for container : {}, " +
"current state: {}", containerID,
containerManager.getContainer(containerID).getState());
context.getContainerManager().getContainer(containerID).getState());
// If the container is in OPEN state, FINALIZE it.
if (containerManager.getContainer(containerID).getState()
if (context.getContainerManager().getContainer(containerID).getState()
== LifeCycleState.OPEN) {
containerManager.updateContainerState(
context.getContainerManager().updateContainerState(
containerID, LifeCycleEvent.FINALIZE);
}

// ContainerInfo has to read again after the above state change.
final ContainerInfo container = containerManager
final ContainerInfo container = context.getContainerManager()
.getContainer(containerID);
// Send close command to datanodes, if the container is in CLOSING state
if (container.getState() == LifeCycleState.CLOSING) {
Expand All @@ -111,13 +106,13 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
}
SCMCommand<?> command = new CloseContainerCommand(
containerID.getId(), container.getPipelineID(), force);
command.setTerm(scmContext.getTermOfLeader());
command.setEncodedToken(getContainerToken(containerID));
command.setTerm(context.getScmContext().getTermOfLeader());
command.setEncodedToken(getContainerToken(containerID, context.getScmContext()));

if (null != leaseManager) {
if (null != context.getLeaseManager()) {
try {
leaseManager.acquire(command, timeout, () -> triggerCloseCallback(
publisher, container, command));
context.getLeaseManager().acquire(command, context.getTimeout(), () -> triggerCloseCallback(
publisher, container, command, context));
} catch (LeaseAlreadyExistException ex) {
LOG.debug("Close container {} in {} state already in queue.",
containerID, container.getState());
Expand All @@ -126,7 +121,7 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
}
} else {
// case of recon, lease manager will be null, trigger event directly
triggerCloseCallback(publisher, container, command);
triggerCloseCallback(publisher, container, command, context);
}
} else {
LOG.debug("Cannot close container {}, which is in {} state.",
Expand All @@ -153,16 +148,16 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
* @return Void
* @throws ContainerNotFoundException
*/
private Void triggerCloseCallback(
EventPublisher publisher, ContainerInfo container, SCMCommand<?> command)
private static Void triggerCloseCallback(
EventPublisher publisher, ContainerInfo container, SCMCommand<?> command, CloseContainerContext context)
throws ContainerNotFoundException {
getNodes(container).forEach(node ->
getNodes(container, context).forEach(node ->
publisher.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(node, command)));
return null;
}

private String getContainerToken(ContainerID containerID) {
private static String getContainerToken(ContainerID containerID, SCMContext scmContext) {
if (scmContext.getScm() instanceof StorageContainerManager) {
StorageContainerManager scm =
(StorageContainerManager) scmContext.getScm();
Expand All @@ -178,16 +173,56 @@ private String getContainerToken(ContainerID containerID) {
* @return list of DatanodeDetails
* @throws ContainerNotFoundException
*/
private List<DatanodeDetails> getNodes(final ContainerInfo container)
private static List<DatanodeDetails> getNodes(final ContainerInfo container, CloseContainerContext context)
throws ContainerNotFoundException {
try {
return pipelineManager.getPipeline(container.getPipelineID()).getNodes();
return context.getPipelineManager().getPipeline(container.getPipelineID()).getNodes();
} catch (PipelineNotFoundException ex) {
// Use container replica if the pipeline is not available.
return containerManager.getContainerReplicas(container.containerID())
return context.getContainerManager().getContainerReplicas(container.containerID())
.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
}
}

public static class CloseContainerContext {
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private final SCMContext scmContext;

private final LeaseManager<Object> leaseManager;
private final long timeout;


public CloseContainerContext(
PipelineManager pipelineManager, ContainerManager containerManager, SCMContext scmContext,
LeaseManager<Object> leaseManager, long timeout) {
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
this.scmContext = scmContext;
this.leaseManager = leaseManager;
this.timeout = timeout;
}

public PipelineManager getPipelineManager() {
return pipelineManager;
}

public ContainerManager getContainerManager() {
return containerManager;
}

public SCMContext getScmContext() {
return scmContext;
}

public LeaseManager<Object> getLeaseManager() {
return leaseManager;
}

public long getTimeout() {
return timeout;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void processPipelineAction(final DatanodeDetails datanode,
LOG.info(logMsg);
try {
if (action == PipelineAction.Action.CLOSE) {
pipelineManager.closePipeline(pid);
pipelineManager.closePipelineAsError(pid, info);
} else {
LOG.error("Received unknown pipeline action {}, for pipeline {} ",
action, pid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.utils.db.CodecException;
Expand Down Expand Up @@ -113,6 +114,9 @@ void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)

void closePipeline(PipelineID pipelineID) throws IOException;

void closePipelineAsError(PipelineID pipelineID, StorageContainerDatanodeProtocolProtos.ClosePipelineInfo info)
throws IOException;

void deletePipeline(PipelineID pipelineID) throws IOException;

void closeStalePipelines(DatanodeDetails datanodeDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
Expand All @@ -58,12 +60,14 @@
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -470,7 +474,7 @@ protected void removePipeline(Pipeline pipeline)
* @param pipelineId - ID of the pipeline.
* @throws IOException
*/
private void closeContainersForPipeline(final PipelineID pipelineId)
private void closeContainersForPipeline(final PipelineID pipelineId, boolean async)
throws IOException {
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
ContainerManager containerManager = scmContext.getScm()
Expand All @@ -485,8 +489,17 @@ private void closeContainersForPipeline(final PipelineID pipelineId)
throw new IOException(ex);
}
}
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
LOG.info("Container {} closed for pipeline={}", containerID, pipelineId);

if (async) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
LOG.info("Trigger Container {} closed for pipeline={}", containerID, pipelineId);
}
else {
CloseContainerEventHandler.CloseContainerContext closeContainerContext
= new CloseContainerEventHandler.CloseContainerContext(this, containerManager, scmContext, null, 0);
CloseContainerEventHandler.closeContainer(containerID, eventPublisher, closeContainerContext);
LOG.info("Container {} closed for pipeline={}", containerID, pipelineId);
}
}
}

Expand All @@ -499,7 +512,7 @@ private void closeContainersForPipeline(final PipelineID pipelineId)
public void closePipeline(PipelineID pipelineID) throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
closeContainersForPipeline(pipelineID, true);
if (!getPipeline(pipelineID).isClosed()) {
acquireWriteLock();
try {
Expand All @@ -515,6 +528,51 @@ public void closePipeline(PipelineID pipelineID) throws IOException {

}

/**
* Move the Pipeline to CLOSED state.
* @param pipelineID ID of the Pipeline to be closed
* @param info ClosePipelineInfo containing the reason for closing the pipeline
* @throws IOException In case of exception while closing the Pipeline
*/
@Override
public void closePipelineAsError(
PipelineID pipelineID, StorageContainerDatanodeProtocolProtos.ClosePipelineInfo info) throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID, false);
if (!getPipeline(pipelineID).isClosed()) {
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
} finally {
releaseWriteLock();
}
if (null != scmContext.getScm().getLeaseManager()) {
try {
// wait for 2HB interval after CLOSE Container for closing pipeline
long pipelineCloseWait = HddsServerUtil.getScmHeartbeatInterval(conf) * 2;
LOG.info("Pipeline {} with status {} moved to CLOSED state, removing in {}ms", pipelineID, info.getReason(),
pipelineCloseWait);
scmContext.getScm().getLeaseManager().acquire(pipelineID, pipelineCloseWait, () -> {
LOG.info("Scrubbing pipeline: id: {} in CLOSED stage.", pipelineID.getId());
deletePipeline(pipelineID);
return null;
});
} catch (LeaseAlreadyExistException ex) {
LOG.debug("Close Pipeline {} delete already in queue.", pipelineID);
} catch (Exception ex) {
LOG.error("Error while scheduling pipeline removal", ex);
}
} else {
LOG.info("Pipeline {} with status moved to CLOSED state", pipelineID, info.getReason());
}
}

metrics.removePipelineMetrics(pipelineID);

}

/**
* Deletes the Pipeline for the given PipelineID.
* @param pipelineID ID of the Pipeline to be deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.ozone.lease.LeaseManager;

/**
* Interface for the SCM Facade class that can be used by a passive SCM like
Expand Down Expand Up @@ -68,4 +69,8 @@ public interface OzoneStorageContainerManager {
SCMHAManager getScmHAManager();

SequenceIdGenerator getSequenceIdGen();

default LeaseManager<Object> getLeaseManager() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2033,6 +2033,9 @@ public SCMServiceManager getSCMServiceManager() {
return serviceManager;
}

public LeaseManager<Object> getLeaseManager() {
return leaseManager;
}
/**
* Force SCM out of safe mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
Expand Down Expand Up @@ -238,6 +239,12 @@ public void closePipeline(final PipelineID pipelineId)
HddsProtos.PipelineState.PIPELINE_CLOSED);
}

@Override
public void closePipelineAsError(
final PipelineID pipelineId, StorageContainerDatanodeProtocolProtos.ClosePipelineInfo info) throws IOException {
closePipeline(pipelineId);
}

@Override
public void deletePipeline(PipelineID pipelineID) {
}
Expand Down