diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 5efbb7f7..452e6c40 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -187,6 +187,11 @@ public class OHTable implements Table { */ private int maxKeyValueSize; + /** + * whether to enable put optimization path + */ + private boolean enablePutOptimization; + // i.e., doPut checks the writebuffer every X Puts. /** @@ -459,6 +464,8 @@ private void finishSetUp() { DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); this.writeBufferSize = this.configuration.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + this.enablePutOptimization = this.configuration.getBoolean(HBASE_HTABLE_USE_PUT_OPTIMIZATION, + HBASE_HTABLE_USE_PUT_OPTIMIZATION_DEFAULT); } public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, @@ -722,7 +729,7 @@ public void batch(final List actions, final Object[] results) thr } catch (Exception e) { throw new IOException(tableNameString + " table occurred unexpected error." , e); } - } else if (OHBaseFuncUtils.isAllPut(actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + } else if (OHBaseFuncUtils.isAllPut(actions) && OHBaseFuncUtils.isHBasePutPefSupport(obTableClient, enablePutOptimization)) { // only support Put now ObHbaseRequest request = buildHbaseRequest(actions); try { @@ -1253,7 +1260,7 @@ private void doPut(List puts) throws IOException { // we need to periodically see if the writebuffer is full instead of waiting until the end of the List n++; if (n % putWriteBufferCheck == 0 && currentWriteBufferSize.get() > writeBufferSize) { - if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient, enablePutOptimization)) { flushCommitsV2(); } else { flushCommits(); @@ -1261,7 +1268,7 @@ private void doPut(List puts) throws IOException { } } if (autoFlush || currentWriteBufferSize.get() > writeBufferSize) { - if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient)) { + if (OHBaseFuncUtils.isHBasePutPefSupport(obTableClient, enablePutOptimization)) { flushCommitsV2(); } else { flushCommits(); diff --git a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java index deb94f75..d317a5a0 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java +++ b/src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java @@ -129,6 +129,12 @@ public final class OHConstants { */ public static final String HBASE_HTABLE_QUERY_HOT_ONLY = "hbase.htable.query.hot_only"; + /** + * use to specify whether to enable put optimization path. + * Default is true (enabled). + */ + public static final String HBASE_HTABLE_USE_PUT_OPTIMIZATION = "hbase.htable.use.put.optimization"; + /*-------------------------------------------------------------------------------------------------------------*/ /** @@ -160,4 +166,6 @@ public final class OHConstants { public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds + public static final boolean HBASE_HTABLE_USE_PUT_OPTIMIZATION_DEFAULT = true; + } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java index 61b4ef65..ebbdf701 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java @@ -51,7 +51,12 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep return new byte[][] { family, newQualifier }; } - public static boolean isHBasePutPefSupport(ObTableClient tableClient) { + public static boolean isHBasePutPefSupport(ObTableClient tableClient, boolean enablePutOptimization) { + // If client-side optimization is disabled, return false directly + if (!enablePutOptimization) { + return false; + } + if (tableClient.isOdpMode()) { // server version support and distributed capacity is enabled and odp version support return ObGlobal.isHBasePutPerfSupport()