Skip to content

Commit a798b73

Browse files
committed
add
Signed-off-by: Weihao Li <18110526956@163.com>
1 parent d19de74 commit a798b73

2 files changed

Lines changed: 147 additions & 7 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.Arrays;
5555
import java.util.List;
5656
import java.util.Map;
57+
import java.util.concurrent.TimeUnit;
5758

5859
public class TransformOperator implements ProcessOperator {
5960

@@ -225,6 +226,7 @@ public final boolean hasNext() throws Exception {
225226
@Override
226227
public TsBlock next() throws Exception {
227228

229+
long start = System.nanoTime();
228230
try {
229231
YieldableState yieldableState = iterateAllColumnsToNextValid();
230232
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -236,9 +238,10 @@ public TsBlock next() throws Exception {
236238
final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
237239
final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
238240
final int columnCount = columnBuilders.length;
239-
240-
int rowCount = 0;
241-
while (!timeHeap.isEmpty()) {
241+
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
242+
while (!timeHeap.isEmpty()
243+
&& !tsBlockBuilder.isFull()
244+
&& System.nanoTime() - start < maxRuntime) {
242245
final long currentTime = timeHeap.pollFirst();
243246

244247
// time
@@ -253,25 +256,26 @@ public TsBlock next() throws Exception {
253256
}
254257
timeHeap.add(currentTime);
255258

256-
tsBlockBuilder.declarePositions(rowCount);
257259
return tsBlockBuilder.build();
258260
}
259261
}
260262

261263
prepareEachColumn(columnCount);
262264

263-
++rowCount;
265+
tsBlockBuilder.declarePosition();
264266

265267
yieldableState = iterateAllColumnsToNextValid();
266268
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
267-
tsBlockBuilder.declarePositions(rowCount);
268269
return tsBlockBuilder.build();
269270
}
270271

271272
inputLayer.updateRowRecordListEvictionUpperBound();
272273
}
273274

274-
tsBlockBuilder.declarePositions(rowCount);
275+
if (tsBlockBuilder.isEmpty()) {
276+
return null;
277+
}
278+
275279
return tsBlockBuilder.build();
276280
} catch (Exception e) {
277281
throw new RuntimeException(e);

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,39 @@
2020
package org.apache.iotdb.db.queryengine.execution.operator;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.commons.path.PartialPath;
24+
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
2325
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
26+
import org.apache.iotdb.db.queryengine.common.NodeRef;
2427
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2528
import org.apache.iotdb.db.queryengine.common.QueryId;
2629
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
2730
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2831
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
2932
import org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator;
3033
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
34+
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
35+
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
3136
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
37+
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
3238
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
3339
import org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
3440
import org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet;
3541

3642
import com.google.common.collect.ImmutableList;
43+
import com.google.common.collect.ImmutableMap;
44+
import org.apache.tsfile.common.conf.TSFileDescriptor;
3745
import org.apache.tsfile.enums.TSDataType;
3846
import org.apache.tsfile.read.common.block.TsBlock;
3947
import org.apache.tsfile.read.common.block.column.LongColumn;
4048
import org.apache.tsfile.read.common.block.column.TimeColumn;
49+
import org.junit.Assert;
4150
import org.junit.Test;
4251

52+
import java.time.ZoneId;
53+
import java.util.HashMap;
54+
import java.util.List;
55+
import java.util.Map;
4356
import java.util.Optional;
4457

4558
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -139,4 +152,127 @@ public long ramBytesUsed() {
139152
reader.yield();
140153
reader.consumedAll();
141154
}
155+
156+
@Test
157+
public void testTransformResultLimit() throws Exception {
158+
UDFClassLoaderManager.setupAndGetInstance();
159+
int savedMaxLine = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
160+
try {
161+
int rowCount = 2001;
162+
int maxLine = 200;
163+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(200);
164+
QueryId queryId = new QueryId("stub_query_chunk");
165+
FragmentInstanceId instanceId =
166+
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
167+
FragmentInstanceStateMachine stateMachine =
168+
new FragmentInstanceStateMachine(
169+
instanceId,
170+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"));
171+
FragmentInstanceContext fragmentInstanceContext =
172+
createFragmentInstanceContext(instanceId, stateMachine);
173+
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
174+
PlanNodeId scanNodeId = new PlanNodeId("scan");
175+
driverContext.addOperatorContext(1, scanNodeId, SeriesScanOperator.class.getSimpleName());
176+
PlanNodeId transformNodeId = new PlanNodeId("transform");
177+
driverContext.addOperatorContext(2, transformNodeId, TransformOperator.class.getSimpleName());
178+
179+
long[] times = new long[rowCount];
180+
long[] values = new long[rowCount];
181+
for (int i = 0; i < rowCount; i++) {
182+
times[i] = i;
183+
values[i] = i * 10L;
184+
}
185+
TsBlock oneBatch =
186+
new TsBlock(
187+
new TimeColumn(rowCount, times), new LongColumn(rowCount, Optional.empty(), values));
188+
189+
Operator childOperator =
190+
new Operator() {
191+
boolean consumed = false;
192+
193+
@Override
194+
public OperatorContext getOperatorContext() {
195+
return driverContext.getOperatorContexts().get(0);
196+
}
197+
198+
@Override
199+
public TsBlock next() {
200+
if (!consumed) {
201+
consumed = true;
202+
return oneBatch;
203+
}
204+
return null;
205+
}
206+
207+
@Override
208+
public boolean hasNext() {
209+
return !consumed;
210+
}
211+
212+
@Override
213+
public void close() {}
214+
215+
@Override
216+
public boolean isFinished() {
217+
return consumed;
218+
}
219+
220+
@Override
221+
public long calculateMaxPeekMemory() {
222+
return oneBatch.getSizeInBytes();
223+
}
224+
225+
@Override
226+
public long calculateMaxReturnSize() {
227+
return oneBatch.getSizeInBytes();
228+
}
229+
230+
@Override
231+
public long calculateRetainedSizeAfterCallingNext() {
232+
return 0;
233+
}
234+
235+
@Override
236+
public long ramBytesUsed() {
237+
return 0;
238+
}
239+
};
240+
241+
TimeSeriesOperand s1 =
242+
new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"), TSDataType.INT64);
243+
Map<String, List<InputLocation>> inputLocations =
244+
ImmutableMap.of(s1.getExpressionString(), ImmutableList.of(new InputLocation(0, 0)));
245+
Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
246+
expressionTypes.put(NodeRef.of(s1), TSDataType.INT64);
247+
248+
TransformOperator transform =
249+
new TransformOperator(
250+
driverContext.getOperatorContexts().get(1),
251+
childOperator,
252+
ImmutableList.of(TSDataType.INT64),
253+
inputLocations,
254+
new Expression[] {s1},
255+
true,
256+
ZoneId.systemDefault(),
257+
expressionTypes,
258+
true);
259+
260+
int totalOutRows = 0;
261+
int nonNullNextCount = 0;
262+
while (transform.hasNext()) {
263+
TsBlock out = transform.next();
264+
if (out != null) {
265+
nonNullNextCount++;
266+
Assert.assertTrue(
267+
"Each batch must be at most " + maxLine + " rows", out.getPositionCount() <= maxLine);
268+
totalOutRows += out.getPositionCount();
269+
}
270+
}
271+
Assert.assertEquals(rowCount, totalOutRows);
272+
System.out.println(nonNullNextCount);
273+
Assert.assertTrue(nonNullNextCount >= 11);
274+
} finally {
275+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine);
276+
}
277+
}
142278
}

0 commit comments

Comments
 (0)