diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d5f40452ea8c0..768f276dbad36 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -74,7 +74,7 @@ public static void main(String[] args) .password("root") .version(Version.V_1_0) .build(); - session.open(false); + session.open(true); // set session fetchSize session.setFetchSize(10000); @@ -110,18 +110,18 @@ public static void main(String[] args) // deleteTimeseries(); // setTimeout(); - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); - session.close(); + // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); + // sessionEnableRedirect.setEnableQueryRedirection(true); + // sessionEnableRedirect.open(false); + // + // // set session fetchSize + // sessionEnableRedirect.setFetchSize(10000); + // + // fastLastDataQueryForOneDevice(); + // insertRecord4Redirect(); + // query4Redirect(); + // sessionEnableRedirect.close(); + // session.close(); } private static void createAndDropContinuousQueries() diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java index 191f0a42f9531..c1d799f2aadd2 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java @@ -42,7 +42,7 @@ public class RpcTransportFactory extends TTransportFactory { private final TTransportFactory inner; - private RpcTransportFactory(TTransportFactory inner) { + public RpcTransportFactory(TTransportFactory inner) { this.inner = inner; } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java index a85ccf0c5b53d..817c994061a62 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java @@ -20,11 +20,19 @@ package org.apache.iotdb.rpc; import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTransport { + private static final Logger LOGGER = LoggerFactory.getLogger(TSnappyElasticFramedTransport.class); + private static final AtomicLong totalUncompressionTime = new AtomicLong(0); + private static final AtomicLong totalUncompressionCount = new AtomicLong(0); + private static final AtomicLong totalOriginalDataSize = new AtomicLong(0); + private static final AtomicLong totalCompressedDataSize = new AtomicLong(0); public static class Factory extends TElasticFramedTransport.Factory { @@ -74,6 +82,23 @@ protected int compress(byte[] input, int inOff, int len, byte[] output, int outO @Override protected void uncompress(byte[] input, int inOff, int size, byte[] output, int outOff) throws IOException { - Snappy.uncompress(input, inOff, size, output, outOff); + + long startTime = System.nanoTime(); + int uncompressedSize = Snappy.uncompress(input, inOff, size, output, outOff); + long endTime = System.nanoTime(); + totalUncompressionTime.addAndGet(endTime - startTime); + totalOriginalDataSize.addAndGet(uncompressedSize); + totalCompressedDataSize.addAndGet(size); + long count = totalUncompressionCount.incrementAndGet(); + if (count % 1000 == 0) { + LOGGER.info( + "Average uncompression time: {} ms, average compression rate: {}", + totalUncompressionTime.doubleValue() / count / 1000000, + totalOriginalDataSize.doubleValue() / totalCompressedDataSize.doubleValue()); + totalUncompressionCount.set(0); + totalUncompressionTime.set(0); + totalOriginalDataSize.set(0); + totalCompressedDataSize.set(0); + } } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 6cb618e8fa935..68e221807a61f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -162,6 +162,13 @@ private void init(TEndPoint endPoint, boolean useSSL, String trustStore, String session.connectionTimeoutInMs, trustStore, trustStorePwd); + } else if (session.enableRPCCompression) { + RpcTransportFactory.setUseSnappy(true); + RpcTransportFactory.reInit(); + transport = + RpcTransportFactory.INSTANCE.getTransport( + // as there is a try-catch already, we do not need to use TSocket.wrap + endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs); } else { transport = RpcTransportFactory.INSTANCE.getTransport( diff --git a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties index e6ee742796e44..60206138c5ab9 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties +++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties @@ -107,6 +107,8 @@ dn_seed_config_node=127.0.0.1:10710 # this feature is under development, set this as false before it is done. # dn_rpc_advanced_compression_enable=false +# dn_data_transfer_compression_enable=false + # Datatype: int # dn_rpc_selector_thread_count=1 diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 854401d7b5e35..eb1c229a7b40a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -120,6 +120,8 @@ public class IoTDBConfig { /** whether to use Snappy compression before sending data through the network */ private boolean rpcAdvancedCompressionEnable = false; + private boolean dataTransportCompressionEnable = false; + /** Port which the JDBC server listens to. */ private int rpcPort = 6667; @@ -2589,6 +2591,14 @@ public void setRpcAdvancedCompressionEnable(boolean rpcAdvancedCompressionEnable RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable); } + public boolean isDataTransportCompressionEnable() { + return dataTransportCompressionEnable; + } + + public void setDataTransportCompressionEnable(boolean dataTransportCompressionEnable) { + this.dataTransportCompressionEnable = dataTransportCompressionEnable; + } + public int getMlogBufferSize() { return mlogBufferSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e4c5c243598aa..76fac28fe618b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -234,6 +234,14 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO Boolean.toString(conf.isRpcAdvancedCompressionEnable())) .trim())); + conf.setDataTransportCompressionEnable( + Boolean.parseBoolean( + properties + .getProperty( + "dn_data_transfer_compression_enable", + Boolean.toString(conf.isDataTransportCompressionEnable())) + .trim())); + conf.setConnectionTimeoutInMS( Integer.parseInt( properties diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 27bda2168f809..5c9519040aa4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -19,6 +19,28 @@ package org.apache.iotdb.db.protocol.thrift.impl; +import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; +import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; +import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; + +import io.airlift.units.Duration; +import io.jsonwebtoken.lang.Strings; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; @@ -176,34 +198,10 @@ import org.apache.iotdb.tsfile.utils.TimeDuration; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - -import io.airlift.units.Duration; -import io.jsonwebtoken.lang.Strings; -import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; -import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; -import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; -import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; -import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; - public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ClientRPCServiceImpl.class); @@ -1639,7 +1637,8 @@ public TSStatus insertRecords(TSInsertRecordsReq req) { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows( + req.getMeasurementsSet(), req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java index af747cb1dd09c..0efc54c6eb57d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -85,7 +85,7 @@ public void initThriftServiceThread() throws IllegalAccessException { config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), new RPCServiceThriftHandler(impl), - IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()); + config.isDataTransportCompressionEnable()); } } catch (RPCServiceException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index 12eea072f3224..794a7a2d72f46 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -23,6 +23,9 @@ import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.runtime.RPCServiceException; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport; import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TProcessor; @@ -208,6 +211,12 @@ protected AbstractThriftServiceThread( serverTransport = openTransport(bindAddress, port); TThreadPoolServer.Args poolArgs = initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); + if (compress) { + poolArgs.transportFactory( + new RpcTransportFactory( + new TimeoutChangeableTSnappyFramedTransport.Factory( + RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); + } poolServer = new TThreadPoolServer(poolArgs); poolServer.setServerEventHandler(serverEventHandler); } catch (TTransportException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java index 8f1a14c22a45d..313bb6a2a1781 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java @@ -18,6 +18,13 @@ */ package org.apache.iotdb.commons.utils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; @@ -25,13 +32,14 @@ import org.apache.iotdb.tsfile.exception.PathParseException; import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator; import org.apache.iotdb.tsfile.read.common.parser.PathVisitor; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PathUtils { + private static final Logger log = LoggerFactory.getLogger(PathUtils.class); + private static AtomicLong totalTimeCost = new AtomicLong(0); + private static AtomicLong totalCount = new AtomicLong(0); /** * @param path the path will split. ex, root.ln. @@ -63,6 +71,7 @@ public static String[] isLegalPath(String path) throws IllegalPathException { */ public static List> checkIsLegalSingleMeasurementListsAndUpdate( List> measurementLists) throws MetadataException { + long startTime = System.nanoTime(); if (measurementLists == null) { return null; } @@ -72,9 +81,35 @@ public static List> checkIsLegalSingleMeasurementListsAndUpdate( for (List measurements : measurementLists) { res.add(checkLegalSingleMeasurementsAndSkipDuplicate(measurements, checkedMeasurements)); } + totalTimeCost.addAndGet(System.nanoTime() - startTime); + long count = totalCount.incrementAndGet(); + if (count % 1000 == 0) { + log.info( + "checkIsLegalSingleMeasurementListsAndUpdate average time cost: {} ms", + totalTimeCost.get() * 1.0 / count / 1e6); + } return res; } + public static List> checkIsLegalSingleMeasurementsListsAndUpdateForRows( + Set measurements, List> measurementLists) throws MetadataException { + long startTime = System.nanoTime(); + if (Objects.isNull(measurementLists)) { + return null; + } + + Map needToUpdate = new HashMap<>(); + checkLegalSingleMeasurementsAndSkipDuplicateForRows(measurements, needToUpdate); + totalTimeCost.addAndGet(System.nanoTime() - startTime); + long count = totalCount.incrementAndGet(); + if (count % 1000 == 0) { + log.info( + "checkIsLegalSingleMeasurementListsAndUpdateForRows average time cost: {} ms", + totalTimeCost.get() * 1.0 / count / 1e6); + } + return measurementLists; + } + /** * check whether measurement is legal according to syntax convention. Measurement can only be a * single node name, use set to skip checking duplicated measurements @@ -100,6 +135,22 @@ public static List checkLegalSingleMeasurementsAndSkipDuplicate( return res; } + public static void checkLegalSingleMeasurementsAndSkipDuplicateForRows( + Set measurements, Map needToUpdate) throws MetadataException { + if (measurements == null) { + return; + } + for (String measurement : measurements) { + if (measurement == null) { + continue; + } + Pair checked = checkAndReturnSingleMeasurementForRows(measurement); + if (checked.left) { + needToUpdate.put(measurement, checked.right); + } + } + } + /** * check whether measurement is legal according to syntax convention. Measurement can only be a * single node name. @@ -175,6 +226,27 @@ public static String checkAndReturnSingleMeasurement(String measurement) return measurement; } + public static Pair checkAndReturnSingleMeasurementForRows(String measurement) + throws IllegalPathException { + if (measurement == null) { + return null; + } + if (measurement.startsWith(TsFileConstant.BACK_QUOTE_STRING) + && measurement.endsWith(TsFileConstant.BACK_QUOTE_STRING)) { + if (checkBackQuotes(measurement.substring(1, measurement.length() - 1))) { + return removeBackQuotesIfNecessaryForRows(measurement); + } else { + throw new IllegalPathException(measurement); + } + } + if (IoTDBConstant.reservedWords.contains(measurement.toUpperCase()) + || isRealNumber(measurement) + || !TsFileConstant.NODE_NAME_PATTERN.matcher(measurement).matches()) { + throw new IllegalPathException(measurement); + } + return new Pair<>(false, measurement); + } + /** Return true if the str is a real number. Examples: 1.0; +1.0; -1.0; 0011; 011e3; +23e-3 */ public static boolean isRealNumber(String str) { return PathVisitor.isRealNumber(str); @@ -195,6 +267,16 @@ public static String removeBackQuotesIfNecessary(String measurement) { } } + public static Pair removeBackQuotesIfNecessaryForRows(String measurement) { + String unWrapped = measurement.substring(1, measurement.length() - 1); + if (PathUtils.isRealNumber(unWrapped) + || !TsFileConstant.IDENTIFIER_PATTERN.matcher(unWrapped).matches()) { + return new Pair<>(false, measurement); + } else { + return new Pair<>(true, unWrapped); + } + } + private static boolean checkBackQuotes(String src) { int num = src.length() - src.replace("`", "").length(); if (num % 2 == 1) { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 2b2f558572670..0e08ea0fffdb3 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -253,6 +253,7 @@ struct TSInsertRecordsReq { 4: required list valuesList 5: required list timestamps 6: optional bool isAligned + 7: optional set measurementsSet } struct TSInsertRecordsOfOneDeviceReq {