Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.MutationCellGrouper;
import org.apache.phoenix.replication.ReplicationLogGroup;
import org.apache.phoenix.replication.SystemCatalogWALEntryFilter;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
Expand Down Expand Up @@ -163,7 +164,6 @@

import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
Expand Down Expand Up @@ -861,7 +861,7 @@ private void replicateEditOnWALRestore(ReplicationLogGroup logGroup, WALKey logK
}
if (!regularCells.isEmpty()) {
String tableName = logKey.getTableName().getNameAsString();
for (Mutation split : splitCellsIntoMutations(regularCells)) {
for (Mutation split : MutationCellGrouper.splitCellsIntoMutations(regularCells)) {
if (!this.ignoreReplicationFilter.test(split)) {
logGroup.append(tableName, -1, split);
}
Expand Down Expand Up @@ -2873,52 +2873,6 @@ public static boolean isAtomicOperationComplete(OperationStatus status) {
return status.getOperationStatusCode() == SUCCESS && status.getResult() != null;
}

/**
* Splits cells into individual Put/Delete mutations grouped by (row key, put-vs-delete). HBase's
* checkAndMergeCPMutations merges coprocessor cells into the data mutation, so a single Put may
* contain Delete cells with different row keys (e.g., local index). This method recovers distinct
* mutations using the same grouping algorithm as HBase's ReplicationSink.
*/
private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
return previousCell == null || previousCell.getType() != cell.getType()
|| !CellUtil.matchingRows(previousCell, cell);
}

static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) throws IOException {
List<Mutation> result = new ArrayList<>();
Cell previousCell = null;
Mutation current = null;
for (Cell cell : cells) {
if (isNewRowOrType(previousCell, cell)) {
if (current != null) {
result.add(current);
}
if (CellUtil.isDelete(cell)) {
current = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
} else {
current = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
}
if (CellUtil.isDelete(cell)) {
((Delete) current).add(cell);
} else {
((Put) current).add(cell);
}
previousCell = cell;
}
if (current != null) {
result.add(current);
}
return result;
}

static List<Mutation> splitCellsIntoMutations(Mutation merged) throws IOException {
if (merged.isEmpty()) {
return Collections.singletonList(merged);
}
return splitCellsIntoMutations(Iterables.concat(merged.getFamilyCellMap().values()));
}

private void replicateMutations(RegionCoprocessorEnvironment env,
MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context)
throws IOException {
Expand All @@ -2939,44 +2893,53 @@ private void replicateMutations(RegionCoprocessorEnvironment env,
// Record ReplicationSyncTime only when we are actually doing work (not on early-return paths).
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
List<Cell> dataTableCells = new ArrayList<>();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (this.ignoreReplicationFilter.test(m)) {
continue;
}
// When coprocessors add cells (local index, conditional TTL, ON DUPLICATE KEY UPDATE),
// HBase merges them into the data mutation which can mix row keys and cell types.
// Split those back into individual Put/Delete mutations for correct serialization.
if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
group.append(this.dataTableName, -1, m);
} else {
for (Mutation split : splitCellsIntoMutations(m)) {
group.append(this.dataTableName, -1, split);
}
}
// We stream the cells through as-is — the consumer reconstructs Put/Delete mutations
// on the row+type boundary.
appendCells(dataTableCells, m);
}
if (context.preIndexUpdates != null) {
for (Map.Entry<HTableInterfaceReference, Mutation> entry : context.preIndexUpdates
.entries()) {
if (this.ignoreReplicationFilter.test(entry.getValue())) {
continue;
}
group.append(entry.getKey().getTableName(), -1, entry.getValue());
}
}
if (context.postIndexUpdates != null) {
for (Map.Entry<HTableInterfaceReference, Mutation> entry : context.postIndexUpdates
.entries()) {
if (this.ignoreReplicationFilter.test(entry.getValue())) {
continue;
}
group.append(entry.getKey().getTableName(), -1, entry.getValue());
}
if (!dataTableCells.isEmpty()) {
group.append(this.dataTableName, -1, dataTableCells);
}
appendIndexUpdates(group, context.preIndexUpdates);
appendIndexUpdates(group, context.postIndexUpdates);
group.sync();
} finally {
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
metricSource.updateReplicationSyncTime(this.dataTableName, duration);
}
}

private void appendIndexUpdates(ReplicationLogGroup group,
ListMultimap<HTableInterfaceReference, Mutation> updates) throws IOException {
if (updates == null) {
return;
}
for (Map.Entry<HTableInterfaceReference, Collection<Mutation>> entry : updates.asMap()
.entrySet()) {
List<Cell> cells = new ArrayList<>();
for (Mutation m : entry.getValue()) {
if (this.ignoreReplicationFilter.test(m)) {
continue;
}
appendCells(cells, m);
}
if (!cells.isEmpty()) {
group.append(entry.getKey().getTableName(), -1, cells);
}
}
}

private static void appendCells(List<Cell> bucket, Mutation m) {
for (List<Cell> familyCells : m.getFamilyCellMap().values()) {
bucket.addAll(familyCells);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;

/**
* Groups a flat cell stream into Put/Delete mutations, mirroring the algorithm HBase's
* ReplicationSink uses to reconstruct mutations from a WALEdit. A new mutation is started whenever
* the row key or the put-vs-delete disposition differs from the immediately preceding cell;
* consecutive cells sharing both are collected into one mutation. There is no precondition on the
* input ordering: any cell stream produces valid mutations. Ordering only affects how the cells are
* partitioned into Mutation objects (a row that recurs non-consecutively yields a separate mutation
* per run), not correctness -- cell order is preserved, so replaying the resulting mutations in
* order reproduces the effect of applying the input cells in order.
*/
public final class MutationCellGrouper {

private MutationCellGrouper() {
}

private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
return previousCell == null || previousCell.getType() != cell.getType()
|| !CellUtil.matchingRows(previousCell, cell);
}

/** Group a cell stream into Put/Delete mutations using the row+type boundary algorithm. */
public static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) throws IOException {
List<Mutation> result = new ArrayList<>();
Cell previousCell = null;
Mutation current = null;
for (Cell cell : cells) {
if (isNewRowOrType(previousCell, cell)) {
if (current != null) {
result.add(current);
}
if (CellUtil.isDelete(cell)) {
current = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
} else {
current = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
}
if (CellUtil.isDelete(cell)) {
((Delete) current).add(cell);
} else {
((Put) current).add(cell);
}
previousCell = cell;
}
if (current != null) {
result.add(current);
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.phoenix.replication.ReplicationLogGroup.Record;
import org.apache.phoenix.replication.log.LogFileWriter;
Expand Down Expand Up @@ -312,7 +311,7 @@ private void replayCurrentBatch() throws IOException {
LOG.info("Replaying {} unsynced records into new writer {}", currentBatch.size(),
currentWriter);
for (Record r : currentBatch) {
currentWriter.append(r.tableName, r.commitId, r.mutation);
currentWriter.append(r.tableName, r.commitId, r.cells);
}
}

Expand Down Expand Up @@ -352,7 +351,7 @@ private void apply(Action action) throws IOException {
protected void append(Record r) throws IOException {
final boolean[] blockSynced = { false };
apply(writer -> {
blockSynced[0] = writer.append(r.tableName, r.commitId, r.mutation);
blockSynced[0] = writer.append(r.tableName, r.commitId, r.cells);
});
// Add to current batch only after we succeed at appending
currentBatch.add(r);
Expand All @@ -367,10 +366,6 @@ protected void append(Record r) throws IOException {
}
}

protected void append(String tableName, long commitId, Mutation mutation) throws IOException {
apply(writer -> writer.append(tableName, commitId, mutation));
}

protected void sync() throws IOException {
apply(LogFileWriter::sync);
currentBatch.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
Expand Down Expand Up @@ -329,15 +330,19 @@ public void setValues(int type, Record record, CompletableFuture<Void> syncFutur
}
}

/**
* Append payload carried through the ring buffer. Always carries a flat {@link List} of
* {@link Cell}s; the per-mutation public API extracts the cells before publishing.
*/
protected static class Record {
public String tableName;
public long commitId;
public Mutation mutation;
public final String tableName;
public final long commitId;
public final List<Cell> cells;

public Record(String tableName, long commitId, Mutation mutation) {
public Record(String tableName, long commitId, List<Cell> cells) {
this.tableName = tableName;
this.commitId = commitId;
this.mutation = mutation;
this.cells = cells;
}
}

Expand Down Expand Up @@ -551,6 +556,31 @@ public void append(String tableName, long commitId, Mutation mutation) throws IO
if (LOG.isTraceEnabled()) {
LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, commitId, mutation);
}
List<Cell> cells = new ArrayList<>();
for (List<Cell> familyCells : mutation.getFamilyCellMap().values()) {
cells.addAll(familyCells);
}
publishDataEvent(new Record(tableName, commitId, cells));
}

/**
* Append a coalesced batch of cells as a single record on the log. The cells must already be
* grouped by row+type so consumers can reconstruct Put/Delete mutations on the row+type boundary
* (see {@link MutationCellGrouper}). Behaves identically to
* {@link #append(String, long, Mutation)} with respect to backpressure and fail-stop.
* @param tableName The HBase table name shared by every cell in {@code cells}.
* @param commitId The commit identifier (e.g., SCN) for the batch.
* @param cells The flat ordered cell stream for one batch on one table.
* @throws IOException If the writer is closed or the ring buffer is full.
*/
public void append(String tableName, long commitId, List<Cell> cells) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Append: table={}, commitId={}, cells={}", tableName, commitId, cells.size());
}
publishDataEvent(new Record(tableName, commitId, cells));
}

private void publishDataEvent(Record record) throws IOException {
if (isClosed()) {
throw new IOException("Closed");
}
Expand All @@ -566,7 +596,7 @@ public void append(String tableName, long commitId, Mutation mutation) throws IO
long sequence = ringBuffer.next();
try {
LogEvent event = ringBuffer.get(sequence);
event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId, mutation), null);
event.setValues(EVENT_TYPE_DATA, record, null);
} finally {
// Update ring buffer events metric
ringBuffer.publish(sequence);
Expand Down
Loading