diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index cb8b7744769..b791badf8b0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -86,7 +86,13 @@ public static > T getNextId(String name, ServerContext c } } - private static KeyExtent findContaining(Ample ample, TableId tableId, Text row) { + /** + * Finds the single tablet extent that contains the provided row. + * + * @throws NullPointerException if row is null + * @throws java.util.NoSuchElementException if no tablet contains the row + */ + public static KeyExtent findContaining(Ample ample, TableId tableId, Text row) { Objects.requireNonNull(row); try (var tablets = ample.readTablets().forTable(tableId).overlapping(row, true, row) .fetch(TabletMetadata.ColumnType.PREV_ROW).build()) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java index b8ab0aac5af..7b5abfa3ada 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java @@ -21,17 +21,25 @@ import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.RowRange; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.thrift.TRange; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock; +import org.apache.accumulo.core.fate.zookeeper.LockRange; +import org.apache.accumulo.core.util.RowRangeUtil; import org.apache.accumulo.manager.tableOps.AbstractFateOperation; import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.manager.tableOps.Utils; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LockTable extends AbstractFateOperation { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(LockTable.class); private final TableId tableId; private final NamespaceId namespaceId; @@ -48,10 +56,12 @@ public LockTable(TableId tableId, NamespaceId namespaceId, TRange range, @Override public long isReady(FateId fateId, FateEnv env) throws Exception { + LockRange lockRange = getLockRange(env); return Utils.reserveNamespace(env.getContext(), namespaceId, fateId, DistributedReadWriteLock.LockType.READ, true, TableOperation.SET_TABLET_AVAILABILITY) + Utils.reserveTable(env.getContext(), tableId, fateId, - DistributedReadWriteLock.LockType.WRITE, true, TableOperation.SET_TABLET_AVAILABILITY); + DistributedReadWriteLock.LockType.WRITE, true, TableOperation.SET_TABLET_AVAILABILITY, + lockRange); } @Override @@ -66,4 +76,38 @@ public void undo(FateId fateId, FateEnv env) throws Exception { Utils.unreserveTable(env.getContext(), tableId, fateId, DistributedReadWriteLock.LockType.WRITE); } + + /** + * Get the LockRange for {@code this} object using its tRange field. Converts the key-range to a + * row-range which is needed for the LockRange. We do a safe conversion meaning potentially + * locking slightly more than is needed so we have at least what we need. If the key-range can't + * be converted to a RowRange, an infinite LockRange is returned. + */ + private LockRange getLockRange(FateEnv env) { + if (tRange.infiniteStartKey && tRange.infiniteStopKey) { + return LockRange.infinite(); + } + Range range = new Range(tRange); + + try { + RowRange rowRange = RowRangeUtil.toRowRange(range); + Text startRow = rowRange.getLowerBound(); + Text endRow = rowRange.getUpperBound(); + + Text lockStartRow = null; + if (startRow != null) { + if (rowRange.isLowerBoundInclusive()) { + lockStartRow = + Utils.findContaining(env.getContext().getAmple(), tableId, startRow).prevEndRow(); + } else { + lockStartRow = startRow; + } + } + + return LockRange.of(lockStartRow, endRow); + } catch (IllegalArgumentException | java.util.NoSuchElementException e) { + LOG.debug("Unable to convert {} to a RowRange, defaulting to infinite lock range", range, e); + return LockRange.infinite(); + } + } }