Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import org.apache.thrift.TException;
Expand Down Expand Up @@ -332,4 +333,10 @@ public List<RegionReplicaSetInfo> getDataDistributionInfo() {
});
return new ArrayList<>(distributionMap.values());
}

public Map<TRegionReplicaSet, TimeRange> getDataPartitionByTimeRange(
String device, TimeRange timeRange) {
// TODO: zhy
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
Expand Down Expand Up @@ -1334,5 +1335,18 @@ public Analysis visitShowChildNodes(
analysis.setRespDatasetHeader(HeaderConstant.showChildNodesHeader);
return analysis;
}

@Override
public Analysis visitDeleteData(
DeleteDataStatement deleteDataStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
Analysis analysis = new Analysis();
analysis.setStatement(deleteDataStatement);
SchemaPartition schemaPartition;
schemaPartition =
partitionFetcher.getSchemaPartition(new PathPatternTree(deleteDataStatement.getPaths()));
analysis.setSchemaPartitionInfo(schemaPartition);
return analysis;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
import org.apache.iotdb.db.mpp.plan.expression.binary.AdditionExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.CompareBinaryExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.DivisionExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.EqualToExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.GreaterEqualExpression;
Expand Down Expand Up @@ -64,6 +66,7 @@
import org.apache.iotdb.db.mpp.plan.statement.component.ResultSetFormat;
import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.literal.BooleanLiteral;
Expand Down Expand Up @@ -110,6 +113,7 @@
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.Pair;

import org.apache.commons.lang.StringEscapeUtils;
Expand Down Expand Up @@ -158,7 +162,6 @@ public Statement visitSingleStatement(IoTDBSqlParser.SingleStatementContext ctx)
/** Data Definition Language (DDL) */

// Create Timeseries ========================================================================

@Override
public Statement visitCreateNonAlignedTimeseries(
IoTDBSqlParser.CreateNonAlignedTimeseriesContext ctx) {
Expand Down Expand Up @@ -1197,7 +1200,6 @@ private void parseInsertValuesSpec(
/** Common Parsers */

// IoTDB Objects ========================================================================

private PartialPath parseFullPath(IoTDBSqlParser.FullPathContext ctx) {
List<IoTDBSqlParser.NodeNameWithoutWildcardContext> nodeNamesWithoutStar =
ctx.nodeNameWithoutWildcard();
Expand Down Expand Up @@ -1394,7 +1396,6 @@ private String parseAlias(IoTDBSqlParser.AliasContext ctx) {
/** Data Control Language (DCL) */

// Create User

@Override
public Statement visitCreateUser(IoTDBSqlParser.CreateUserContext ctx) {
AuthorStatement authorStatement = new AuthorStatement(AuthorOperator.AuthorType.CREATE_USER);
Expand Down Expand Up @@ -1664,6 +1665,77 @@ public Statement visitDeleteStorageGroup(IoTDBSqlParser.DeleteStorageGroupContex
return deleteStorageGroupStatement;
}

@Override
public Statement visitDeleteStatement(IoTDBSqlParser.DeleteStatementContext ctx) {
DeleteDataStatement statement = new DeleteDataStatement();
List<IoTDBSqlParser.PrefixPathContext> prefixPaths = ctx.prefixPath();
for (IoTDBSqlParser.PrefixPathContext prefixPath : prefixPaths) {
statement.addPath(parsePrefixPath(prefixPath));
}
if (ctx.whereClause() != null) {
WhereCondition whereCondition = parseWhereClause(ctx.whereClause());
TimeRange timeRange = parseDeleteTimeRange(whereCondition.getPredicate());
statement.setTimeRange(timeRange);
} else {
statement.setTimeRange(new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE));
}
return statement;
}

private TimeRange parseDeleteTimeRange(Expression predicate) {
if (predicate instanceof LogicAndExpression) {
TimeRange leftTimeRange =
parseDeleteTimeRange(((LogicAndExpression) predicate).getLeftExpression());
TimeRange rightTimeRange =
parseDeleteTimeRange(((LogicAndExpression) predicate).getRightExpression());
return new TimeRange(
Math.max(leftTimeRange.getMin(), rightTimeRange.getMin()),
Math.min(leftTimeRange.getMax(), rightTimeRange.getMax()));
} else if (predicate instanceof CompareBinaryExpression) {
if (((CompareBinaryExpression) predicate).getLeftExpression() instanceof TimestampOperand) {
return parseTimeRange(
predicate.getExpressionType(),
((CompareBinaryExpression) predicate).getLeftExpression(),
((CompareBinaryExpression) predicate).getRightExpression());
} else {
return parseTimeRange(
predicate.getExpressionType(),
((CompareBinaryExpression) predicate).getRightExpression(),
((CompareBinaryExpression) predicate).getLeftExpression());
}
} else {
throw new SemanticException(DELETE_RANGE_ERROR_MSG);
}
}

private TimeRange parseTimeRange(
ExpressionType expressionType, Expression timeExpression, Expression valueExpression) {
if (!(timeExpression instanceof TimestampOperand)
|| !(valueExpression instanceof ConstantOperand)) {
throw new SemanticException(DELETE_ONLY_SUPPORT_TIME_EXP_ERROR_MSG);
}

if (((ConstantOperand) valueExpression).getDataType() != TSDataType.INT64) {
throw new SemanticException("The datatype of timestamp should be LONG.");
}

long time = Long.parseLong(((ConstantOperand) valueExpression).getValueString());
switch (expressionType) {
case LESS_THAN:
return new TimeRange(Long.MIN_VALUE, time - 1);
case LESS_EQUAL:
return new TimeRange(Long.MIN_VALUE, time);
case GREATER_THAN:
return new TimeRange(time + 1, Long.MAX_VALUE);
case GREATER_EQUAL:
return new TimeRange(time, Long.MAX_VALUE);
case EQUAL_TO:
return new TimeRange(time, time);
default:
throw new SemanticException(DELETE_RANGE_ERROR_MSG);
}
}

/** function for parsing file path used by LOAD statement. */
public String parseFilePath(String src) {
return src.substring(1, src.length() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
Expand Down Expand Up @@ -606,5 +607,15 @@ public PlanNode visitDeleteTimeseries(
new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
return planBuilder.planDeleteData(paths, storageGroups).planDeleteTimeseries(paths).getRoot();
}

@Override
public PlanNode visitDeleteData(
DeleteDataStatement deleteDataStatement, MPPQueryContext context) {
LogicalPlanBuilder builder = new LogicalPlanBuilder(context);
List<PartialPath> paths = deleteDataStatement.getPaths();
List<String> storageGroups =
new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
return builder.planDeleteData(paths, storageGroups).getRoot();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,33 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;

import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
public class DeleteDataNode extends WritePlanNode implements IPartitionRelatedNode {

private final QueryId queryId;
private final List<PartialPath> pathList;
private final List<String> storageGroups;

private TRegionReplicaSet regionReplicaSet;
private Map<String, List<TimeRange>> deviceTimeRangeMap;

public DeleteDataNode(
PlanNodeId id, QueryId queryId, List<PartialPath> pathList, List<String> storageGroups) {
Expand Down Expand Up @@ -137,4 +143,15 @@ public String toString() {
storageGroups,
regionReplicaSet == null ? "Not Assigned" : regionReplicaSet.getRegionId());
}

@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
DataPartition partition = analysis.getDataPartitionInfo();
for (PartialPath path : pathList) {
// Map<TRegionReplicaSet, TimeRange> replicaTimeRangeMap =
// partition.getDataPartitionByTimeRange(path.getDevice(), timeRange);

}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.mpp.plan.statement;

import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
Expand Down Expand Up @@ -216,4 +217,8 @@ public R visitShowChildPaths(ShowChildPathsStatement showChildPathsStatement, C
public R visitShowChildNodes(ShowChildNodesStatement showChildNodesStatement, C context) {
return visitStatement(showChildNodesStatement, context);
}

public R visitDeleteData(DeleteDataStatement deleteDataStatement, C context) {
return visitStatement(deleteDataStatement, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.plan.statement.crud;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.tsfile.read.common.TimeRange;

import java.util.ArrayList;
import java.util.List;

public class DeleteDataStatement extends Statement {
private List<PartialPath> paths;
private TimeRange timeRange;

public DeleteDataStatement() {
super();
this.paths = new ArrayList<>();
}

@Override
public List<PartialPath> getPaths() {
return paths;
}

public void addPath(PartialPath path) {
paths.add(path);
}

public TimeRange getTimeRange() {
return timeRange;
}

public void setTimeRange(TimeRange timeRange) {
this.timeRange = timeRange;
}

@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitDeleteData(this, context);
}
}