From 07e846dd907bc5060c05bb20a3c7082efee02163 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Mon, 13 Nov 2023 18:17:15 +0800 Subject: [PATCH 1/5] temp --- .../java/org/apache/iotdb/SessionExample.java | 26 +++++++++---------- .../apache/iotdb/rpc/RpcTransportFactory.java | 2 +- .../iotdb/session/SessionConnection.java | 7 +++++ .../apache/iotdb/db/service/RPCService.java | 2 +- .../service/AbstractThriftServiceThread.java | 8 ++++++ 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d5f40452ea8c0..dc1d1f446bbd1 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -74,7 +74,7 @@ public static void main(String[] args) .password("root") .version(Version.V_1_0) .build(); - session.open(false); + session.open(true); // set session fetchSize session.setFetchSize(10000); @@ -110,18 +110,18 @@ public static void main(String[] args) // deleteTimeseries(); // setTimeout(); - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); - session.close(); +// sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); +// sessionEnableRedirect.setEnableQueryRedirection(true); +// sessionEnableRedirect.open(false); +// +// // set session fetchSize +// sessionEnableRedirect.setFetchSize(10000); +// +// fastLastDataQueryForOneDevice(); +// insertRecord4Redirect(); +// query4Redirect(); +// sessionEnableRedirect.close(); +// session.close(); } private static void createAndDropContinuousQueries() diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java index 191f0a42f9531..c1d799f2aadd2 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java @@ -42,7 +42,7 @@ public class RpcTransportFactory extends TTransportFactory { private final TTransportFactory inner; - private RpcTransportFactory(TTransportFactory inner) { + public RpcTransportFactory(TTransportFactory inner) { this.inner = inner; } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 6cb618e8fa935..68e221807a61f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -162,6 +162,13 @@ private void init(TEndPoint endPoint, boolean useSSL, String trustStore, String session.connectionTimeoutInMs, trustStore, trustStorePwd); + } else if (session.enableRPCCompression) { + RpcTransportFactory.setUseSnappy(true); + RpcTransportFactory.reInit(); + transport = + RpcTransportFactory.INSTANCE.getTransport( + // as there is a try-catch already, we do not need to use TSocket.wrap + endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs); } else { transport = RpcTransportFactory.INSTANCE.getTransport( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java index af747cb1dd09c..b5c6e3c10e291 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -85,7 +85,7 @@ public void initThriftServiceThread() throws IllegalAccessException { config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), new RPCServiceThriftHandler(impl), - IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()); + true); } } catch (RPCServiceException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index 12eea072f3224..eb89232608fa8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -24,6 +24,9 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.runtime.RPCServiceException; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport; import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; @@ -208,6 +211,11 @@ protected AbstractThriftServiceThread( serverTransport = openTransport(bindAddress, port); TThreadPoolServer.Args poolArgs = initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); + if (compress) { + poolArgs.transportFactory(new RpcTransportFactory( + new TimeoutChangeableTSnappyFramedTransport.Factory( + RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); + } poolServer = new TThreadPoolServer(poolArgs); poolServer.setServerEventHandler(serverEventHandler); } catch (TTransportException e) { From 8745ca06f862d6580c3b2fbd2abdfbee6439a48a Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Wed, 15 Nov 2023 15:40:37 +0800 Subject: [PATCH 2/5] temp --- .../java/org/apache/iotdb/SessionExample.java | 24 +++++++++---------- .../resources/conf/iotdb-datanode.properties | 2 ++ .../org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +++++++ .../apache/iotdb/db/service/RPCService.java | 2 +- .../service/AbstractThriftServiceThread.java | 9 +++---- 6 files changed, 38 insertions(+), 17 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index dc1d1f446bbd1..768f276dbad36 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -110,18 +110,18 @@ public static void main(String[] args) // deleteTimeseries(); // setTimeout(); -// sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); -// sessionEnableRedirect.setEnableQueryRedirection(true); -// sessionEnableRedirect.open(false); -// -// // set session fetchSize -// sessionEnableRedirect.setFetchSize(10000); -// -// fastLastDataQueryForOneDevice(); -// insertRecord4Redirect(); -// query4Redirect(); -// sessionEnableRedirect.close(); -// session.close(); + // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); + // sessionEnableRedirect.setEnableQueryRedirection(true); + // sessionEnableRedirect.open(false); + // + // // set session fetchSize + // sessionEnableRedirect.setFetchSize(10000); + // + // fastLastDataQueryForOneDevice(); + // insertRecord4Redirect(); + // query4Redirect(); + // sessionEnableRedirect.close(); + // session.close(); } private static void createAndDropContinuousQueries() diff --git a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties index e6ee742796e44..60206138c5ab9 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties +++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties @@ -107,6 +107,8 @@ dn_seed_config_node=127.0.0.1:10710 # this feature is under development, set this as false before it is done. # dn_rpc_advanced_compression_enable=false +# dn_data_transfer_compression_enable=false + # Datatype: int # dn_rpc_selector_thread_count=1 diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 854401d7b5e35..eb1c229a7b40a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -120,6 +120,8 @@ public class IoTDBConfig { /** whether to use Snappy compression before sending data through the network */ private boolean rpcAdvancedCompressionEnable = false; + private boolean dataTransportCompressionEnable = false; + /** Port which the JDBC server listens to. */ private int rpcPort = 6667; @@ -2589,6 +2591,14 @@ public void setRpcAdvancedCompressionEnable(boolean rpcAdvancedCompressionEnable RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable); } + public boolean isDataTransportCompressionEnable() { + return dataTransportCompressionEnable; + } + + public void setDataTransportCompressionEnable(boolean dataTransportCompressionEnable) { + this.dataTransportCompressionEnable = dataTransportCompressionEnable; + } + public int getMlogBufferSize() { return mlogBufferSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e4c5c243598aa..76fac28fe618b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -234,6 +234,14 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO Boolean.toString(conf.isRpcAdvancedCompressionEnable())) .trim())); + conf.setDataTransportCompressionEnable( + Boolean.parseBoolean( + properties + .getProperty( + "dn_data_transfer_compression_enable", + Boolean.toString(conf.isDataTransportCompressionEnable())) + .trim())); + conf.setConnectionTimeoutInMS( Integer.parseInt( properties diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java index b5c6e3c10e291..0efc54c6eb57d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -85,7 +85,7 @@ public void initThriftServiceThread() throws IllegalAccessException { config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), new RPCServiceThriftHandler(impl), - true); + config.isDataTransportCompressionEnable()); } } catch (RPCServiceException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index eb89232608fa8..794a7a2d72f46 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -23,10 +23,10 @@ import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.runtime.RPCServiceException; - import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport; + import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; @@ -212,9 +212,10 @@ protected AbstractThriftServiceThread( TThreadPoolServer.Args poolArgs = initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); if (compress) { - poolArgs.transportFactory(new RpcTransportFactory( - new TimeoutChangeableTSnappyFramedTransport.Factory( - RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); + poolArgs.transportFactory( + new RpcTransportFactory( + new TimeoutChangeableTSnappyFramedTransport.Factory( + RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); } poolServer = new TThreadPoolServer(poolArgs); poolServer.setServerEventHandler(serverEventHandler); From 8450fd772aa944505d455e0d270cbe2fc9540a4f Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Mon, 8 Jan 2024 16:45:33 +0800 Subject: [PATCH 3/5] add log for compression --- .../rpc/TSnappyElasticFramedTransport.java | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java index a85ccf0c5b53d..817c994061a62 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java @@ -20,11 +20,19 @@ package org.apache.iotdb.rpc; import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTransport { + private static final Logger LOGGER = LoggerFactory.getLogger(TSnappyElasticFramedTransport.class); + private static final AtomicLong totalUncompressionTime = new AtomicLong(0); + private static final AtomicLong totalUncompressionCount = new AtomicLong(0); + private static final AtomicLong totalOriginalDataSize = new AtomicLong(0); + private static final AtomicLong totalCompressedDataSize = new AtomicLong(0); public static class Factory extends TElasticFramedTransport.Factory { @@ -74,6 +82,23 @@ protected int compress(byte[] input, int inOff, int len, byte[] output, int outO @Override protected void uncompress(byte[] input, int inOff, int size, byte[] output, int outOff) throws IOException { - Snappy.uncompress(input, inOff, size, output, outOff); + + long startTime = System.nanoTime(); + int uncompressedSize = Snappy.uncompress(input, inOff, size, output, outOff); + long endTime = System.nanoTime(); + totalUncompressionTime.addAndGet(endTime - startTime); + totalOriginalDataSize.addAndGet(uncompressedSize); + totalCompressedDataSize.addAndGet(size); + long count = totalUncompressionCount.incrementAndGet(); + if (count % 1000 == 0) { + LOGGER.info( + "Average uncompression time: {} ms, average compression rate: {}", + totalUncompressionTime.doubleValue() / count / 1000000, + totalOriginalDataSize.doubleValue() / totalCompressedDataSize.doubleValue()); + totalUncompressionCount.set(0); + totalUncompressionTime.set(0); + totalOriginalDataSize.set(0); + totalCompressedDataSize.set(0); + } } } From 4def5b021a2bc79cd7ed7da6ef5e17cbbefdc59f Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Wed, 10 Jan 2024 16:42:30 +0800 Subject: [PATCH 4/5] add a new way for records to check path valid --- .../thrift/impl/ClientRPCServiceImpl.java | 2 +- .../apache/iotdb/commons/utils/PathUtils.java | 112 ++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 27bda2168f809..0ae4f5091a7ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -1639,7 +1639,7 @@ public TSStatus insertRecords(TSInsertRecordsReq req) { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows(req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java index 8f1a14c22a45d..0763fe4b858f2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java @@ -25,13 +25,24 @@ import org.apache.iotdb.tsfile.exception.PathParseException; import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator; import org.apache.iotdb.tsfile.read.common.parser.PathVisitor; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; public class PathUtils { + private static final Logger log = LoggerFactory.getLogger(PathUtils.class); + private static AtomicLong totalTimeCost = new AtomicLong(0); + private static AtomicLong totalCount = new AtomicLong(0); /** * @param path the path will split. ex, root.ln. @@ -63,6 +74,7 @@ public static String[] isLegalPath(String path) throws IllegalPathException { */ public static List> checkIsLegalSingleMeasurementListsAndUpdate( List> measurementLists) throws MetadataException { + long startTime = System.nanoTime(); if (measurementLists == null) { return null; } @@ -72,9 +84,62 @@ public static List> checkIsLegalSingleMeasurementListsAndUpdate( for (List measurements : measurementLists) { res.add(checkLegalSingleMeasurementsAndSkipDuplicate(measurements, checkedMeasurements)); } + totalTimeCost.addAndGet(System.nanoTime() - startTime); + long count = totalCount.incrementAndGet(); + if (count % 1000 == 0) { + log.info( + "checkIsLegalSingleMeasurementListsAndUpdate average time cost: {} ms", + totalTimeCost.get() * 1.0 / count / 1e6); + } return res; } + public static List> checkIsLegalSingleMeasurementsListsAndUpdateForRows( + List> measurementLists) throws MetadataException { + long startTime = System.nanoTime(); + if (Objects.isNull(measurementLists)) { + return null; + } + Set measurementSet = new HashSet<>(); + for (List measurements : measurementLists) { + if (Objects.isNull(measurements)) { + continue; + } + for (String measurement : measurements) { + if (Objects.isNull(measurement)) { + continue; + } + measurementSet.add(measurement); + } + } + + Map needToUpdate = new HashMap<>(); + checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurementSet, needToUpdate); + for (Map.Entry entry : needToUpdate.entrySet()) { + for (List measurements : measurementLists) { + if (Objects.isNull(measurements)) { + continue; + } + for (int i = 0; i < measurements.size(); i++) { + if (Objects.isNull(measurements.get(i))) { + continue; + } + if (measurements.get(i).equals(entry.getKey())) { + measurements.set(i, entry.getValue()); + } + } + } + } + totalTimeCost.addAndGet(System.nanoTime() - startTime); + long count = totalCount.incrementAndGet(); + if (count % 1000 == 0) { + log.info( + "checkIsLegalSingleMeasurementListsAndUpdateForRows average time cost: {} ms", + totalTimeCost.get() * 1.0 / count / 1e6); + } + return measurementLists; + } + /** * check whether measurement is legal according to syntax convention. Measurement can only be a * single node name, use set to skip checking duplicated measurements @@ -100,6 +165,22 @@ public static List checkLegalSingleMeasurementsAndSkipDuplicate( return res; } + public static void checkLegalSingleMeasurementsAndSkipDuplicateForRows( + Set measurements, Map needToUpdate) throws MetadataException { + if (measurements == null) { + return; + } + for (String measurement : measurements) { + if (measurement == null) { + continue; + } + Pair checked = checkAndReturnSingleMeasurementForRows(measurement); + if (checked.left) { + needToUpdate.put(measurement, checked.right); + } + } + } + /** * check whether measurement is legal according to syntax convention. Measurement can only be a * single node name. @@ -175,6 +256,27 @@ public static String checkAndReturnSingleMeasurement(String measurement) return measurement; } + public static Pair checkAndReturnSingleMeasurementForRows(String measurement) + throws IllegalPathException { + if (measurement == null) { + return null; + } + if (measurement.startsWith(TsFileConstant.BACK_QUOTE_STRING) + && measurement.endsWith(TsFileConstant.BACK_QUOTE_STRING)) { + if (checkBackQuotes(measurement.substring(1, measurement.length() - 1))) { + return removeBackQuotesIfNecessaryForRows(measurement); + } else { + throw new IllegalPathException(measurement); + } + } + if (IoTDBConstant.reservedWords.contains(measurement.toUpperCase()) + || isRealNumber(measurement) + || !TsFileConstant.NODE_NAME_PATTERN.matcher(measurement).matches()) { + throw new IllegalPathException(measurement); + } + return new Pair<>(false, measurement); + } + /** Return true if the str is a real number. Examples: 1.0; +1.0; -1.0; 0011; 011e3; +23e-3 */ public static boolean isRealNumber(String str) { return PathVisitor.isRealNumber(str); @@ -195,6 +297,16 @@ public static String removeBackQuotesIfNecessary(String measurement) { } } + public static Pair removeBackQuotesIfNecessaryForRows(String measurement) { + String unWrapped = measurement.substring(1, measurement.length() - 1); + if (PathUtils.isRealNumber(unWrapped) + || !TsFileConstant.IDENTIFIER_PATTERN.matcher(unWrapped).matches()) { + return new Pair<>(false, measurement); + } else { + return new Pair<>(true, unWrapped); + } + } + private static boolean checkBackQuotes(String src) { int num = src.length() - src.replace("`", "").length(); if (num % 2 == 1) { From c25c445fc2a512a2e84cf7ba18c6e0a99e420d02 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Thu, 11 Jan 2024 21:40:03 +0800 Subject: [PATCH 5/5] add measurement set in insertRecordsReq --- .../thrift/impl/ClientRPCServiceImpl.java | 49 +++++++++---------- .../apache/iotdb/commons/utils/PathUtils.java | 48 ++++-------------- .../src/main/thrift/client.thrift | 1 + 3 files changed, 34 insertions(+), 64 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 0ae4f5091a7ee..5c9519040aa4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -19,6 +19,28 @@ package org.apache.iotdb.db.protocol.thrift.impl; +import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; +import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; +import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; + +import io.airlift.units.Duration; +import io.jsonwebtoken.lang.Strings; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; @@ -176,34 +198,10 @@ import org.apache.iotdb.tsfile.utils.TimeDuration; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import io.airlift.units.Duration; -import io.jsonwebtoken.lang.Strings; -import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; -import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; -import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; -import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; -import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; - public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ClientRPCServiceImpl.class); @@ -1639,7 +1637,8 @@ public TSStatus insertRecords(TSInsertRecordsReq req) { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows( + req.getMeasurementsSet(), req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java index 0763fe4b858f2..313bb6a2a1781 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java @@ -18,6 +18,13 @@ */ package org.apache.iotdb.commons.utils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; @@ -26,19 +33,9 @@ import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator; import org.apache.iotdb.tsfile.read.common.parser.PathVisitor; import org.apache.iotdb.tsfile.utils.Pair; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - public class PathUtils { private static final Logger log = LoggerFactory.getLogger(PathUtils.class); private static AtomicLong totalTimeCost = new AtomicLong(0); @@ -95,41 +92,14 @@ public static List> checkIsLegalSingleMeasurementListsAndUpdate( } public static List> checkIsLegalSingleMeasurementsListsAndUpdateForRows( - List> measurementLists) throws MetadataException { + Set measurements, List> measurementLists) throws MetadataException { long startTime = System.nanoTime(); if (Objects.isNull(measurementLists)) { return null; } - Set measurementSet = new HashSet<>(); - for (List measurements : measurementLists) { - if (Objects.isNull(measurements)) { - continue; - } - for (String measurement : measurements) { - if (Objects.isNull(measurement)) { - continue; - } - measurementSet.add(measurement); - } - } Map needToUpdate = new HashMap<>(); - checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurementSet, needToUpdate); - for (Map.Entry entry : needToUpdate.entrySet()) { - for (List measurements : measurementLists) { - if (Objects.isNull(measurements)) { - continue; - } - for (int i = 0; i < measurements.size(); i++) { - if (Objects.isNull(measurements.get(i))) { - continue; - } - if (measurements.get(i).equals(entry.getKey())) { - measurements.set(i, entry.getValue()); - } - } - } - } + checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurements, needToUpdate); totalTimeCost.addAndGet(System.nanoTime() - startTime); long count = totalCount.incrementAndGet(); if (count % 1000 == 0) { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 2b2f558572670..0e08ea0fffdb3 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -253,6 +253,7 @@ struct TSInsertRecordsReq { 4: required list valuesList 5: required list timestamps 6: optional bool isAligned + 7: optional set measurementsSet } struct TSInsertRecordsOfOneDeviceReq {