Skip to content

Commit

Permalink
Add unit byte for no enough query memory
Browse files Browse the repository at this point in the history
  • Loading branch information
Beyyes authored Jan 25, 2024
1 parent 82d0459 commit 59fe3fe
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void aggregationWithHavingTest() {

@Test
public void fillTest() {
// linear fill can not use TopKNode
String[] expectedHeader = new String[] {"Time,Device,s1,s2"};
String[] retArray =
new String[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@
public class TopKOperator implements ProcessOperator {
private final OperatorContext operatorContext;

private final List<Operator> deviceOperators;
private int deviceIndex;
private final List<Operator> childrenOperators;
private int childIndex;
// read step operators each invoking
private int deviceBatchStep;
private boolean[] canCallNext;
private final int childBatchStep;
private final boolean[] canCallNext;

private final List<TSDataType> dataTypes;
private final TsBlockBuilder tsBlockBuilder;
Expand All @@ -84,13 +84,13 @@ public class TopKOperator implements ProcessOperator {

public TopKOperator(
OperatorContext operatorContext,
List<Operator> deviceOperators,
List<Operator> childrenOperators,
List<TSDataType> dataTypes,
Comparator<SortKey> comparator,
int topValue,
boolean childrenDataInOrder) {
this.operatorContext = operatorContext;
this.deviceOperators = deviceOperators;
this.childrenOperators = childrenOperators;
this.dataTypes = dataTypes;
this.mergeSortHeap = new MergeSortHeap(topValue, comparator.reversed());
this.comparator = comparator;
Expand All @@ -100,11 +100,11 @@ public TopKOperator(

initResultTsBlock();

deviceBatchStep =
childBatchStep =
OPERATOR_BATCH_UPPER_BOUND % topValue == 0
? OPERATOR_BATCH_UPPER_BOUND / topValue
: OPERATOR_BATCH_UPPER_BOUND / topValue + 1;
canCallNext = new boolean[deviceOperators.size()];
canCallNext = new boolean[childrenOperators.size()];
}

@Override
Expand All @@ -116,8 +116,8 @@ public OperatorContext getOperatorContext() {
public ListenableFuture<?> isBlocked() {
boolean hasReadyChild = false;
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = deviceIndex;
i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
for (int i = childIndex;
i < Math.min(childIndex + childBatchStep, childrenOperators.size());
i++) {
if (getOperator(i) == null) {
continue;
Expand All @@ -142,7 +142,7 @@ public boolean isFinished() throws Exception {

@Override
public boolean hasNext() throws Exception {
if (deviceIndex >= deviceOperators.size()) {
if (childIndex >= childrenOperators.size()) {
if (topKResult == null) {
return false;
}
Expand All @@ -154,16 +154,16 @@ public boolean hasNext() throws Exception {

@Override
public TsBlock next() throws Exception {
if (deviceIndex >= deviceOperators.size() && resultReturnSize < topKResult.length) {
if (childIndex >= childrenOperators.size() && resultReturnSize < topKResult.length) {
return getResultFromCachedTopKResult();
}

long startTime = System.nanoTime();
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);

boolean batchFinished = true;
int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
for (int i = deviceIndex; i < operatorBatchEnd; i++) {
int operatorBatchEnd = Math.min(childIndex + childBatchStep, childrenOperators.size());
for (int i = childIndex; i < operatorBatchEnd; i++) {
if (getOperator(i) == null) {
continue;
}
Expand Down Expand Up @@ -212,8 +212,8 @@ public TsBlock next() throws Exception {
}

if (batchFinished) {
deviceIndex = deviceIndex + deviceBatchStep;
if (deviceIndex >= deviceOperators.size()) {
childIndex = childIndex + childBatchStep;
if (childIndex >= childrenOperators.size()) {
return getResultFromCachedTopKResult();
}
}
Expand All @@ -223,8 +223,8 @@ public TsBlock next() throws Exception {

@Override
public void close() throws Exception {
for (int i = deviceIndex; i < deviceOperators.size(); i++) {
final Operator operator = deviceOperators.get(i);
for (int i = childIndex; i < childrenOperators.size(); i++) {
final Operator operator = childrenOperators.get(i);
if (operator != null) {
operator.close();
}
Expand All @@ -236,7 +236,7 @@ public long calculateMaxPeekMemory() {
// traverse each child serial,
// so no need to accumulate the returnSize and retainedSize of each child
long maxPeekMemory = calculateMaxReturnSize();
for (Operator operator : deviceOperators) {
for (Operator operator : childrenOperators) {
maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory());
}
return Math.max(maxPeekMemory, topValue * getMemoryUsageOfOneMergeSortKey() * 2);
Expand Down Expand Up @@ -379,11 +379,11 @@ private void updateTsBlockValue(TsBlock sourceTsBlock, int sourceIndex, int peek
}

private Operator getOperator(int i) {
return deviceOperators.get(i);
return childrenOperators.get(i);
}

private void closeOperator(int i) throws Exception {
getOperator(i).close();
deviceOperators.set(i, null);
childrenOperators.set(i, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void submitDrivers(
.getThrottleQuotaLimit()
.checkMemory(sessionInfo.getUserName(), estimatedMemory.get())) {
throw new MemoryNotEnoughException(
"There is not enough memory to execute current fragment instance");
"There is no enough memory to execute current fragment instance");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ private long checkMemory(Operator root, FragmentInstanceStateMachine stateMachin
throw new MemoryNotEnoughException(
String.format(
"There is not enough memory to execute current fragment instance, "
+ "current remaining free memory is %d, "
+ "estimated memory usage for current fragment instance is %d",
+ "current remaining free memory is %dB, "
+ "estimated memory usage for current fragment instance is %dB",
freeMemoryForOperators, estimatedMemorySize));
} else {
freeMemoryForOperators -= estimatedMemorySize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ public LogicalPlanBuilder planDeviceView(
orderByParameter,
outputColumnNames);

// if value filter exists, need add a LIMIT-NODE as the child node of TopKNode
// if value filter exists, need add a LimitNode as the child node of TopKNode
long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1;

// only order by based on time, use TopKNode + SingleDeviceViewNode
Expand All @@ -796,7 +796,7 @@ public LogicalPlanBuilder planDeviceView(
analysis.setUseTopKNode();
this.root = topKNode;
} else if (canUseMergeSortNode(queryStatement, deviceNameToSourceNodesMap.size())) {
// otherwise use MergeSortNode + SingleDeviceViewNode
// use MergeSortNode + SingleDeviceViewNode
MergeSortNode mergeSortNode =
new MergeSortNode(
context.getQueryId().genPlanNodeId(), orderByParameter, outputColumnNames);
Expand Down Expand Up @@ -855,7 +855,7 @@ private boolean canUseTopKNode(QueryStatement queryStatement, long limitValue) {
private boolean canUseMergeSortNode(QueryStatement queryStatement, int deviceSize) {
// 1. `order by based on time` + `no order by expression`.
// 2. deviceSize is larger than 1.
// when satisfy all above cases use MergeSortNode.
// when satisfy all above cases use MergeSortNode + SingleDeviceViewNode.
return queryStatement.isOrderByBasedOnTime()
&& !queryStatement.hasOrderByExpression()
&& deviceSize > 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class AlignByTimeOrderByLimitOffsetTest {
DistributedQueryPlan plan;
PlanNode firstFiRoot;
PlanNode firstFiTopNode;
PlanNode mergeSortNode;

/*
* IdentitySinkNode-63
Expand Down Expand Up @@ -154,6 +153,7 @@ public void orderByExpressionTest1() {
@Test
public void orderByExpressionTest2() {
// select s1 order by s2 + limit N
// use TopKNode to replace SortNode + LimitNode
sql =
String.format(
"select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC LIMIT %s", LIMIT_VALUE);
Expand All @@ -170,6 +170,7 @@ public void orderByExpressionTest2() {
firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof FullOuterTimeJoinNode);

// select s1 order by s2 + offset M + limit N
// use TopKNode to replace SortNode
sql =
String.format(
"select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC OFFSET 5 LIMIT %s",
Expand Down Expand Up @@ -257,6 +258,7 @@ public void orderByExpressionTest3() {
*/
@Test
public void orderByFillTest() {
// previous and constant fill can use TopKNode
sql =
String.format(
"select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY root.sg.d1.s1 DESC fill(previous) LIMIT %s",
Expand Down Expand Up @@ -290,6 +292,7 @@ public void orderByFillTest() {
* ├──ExchangeNode-58: [SourceAddress:192.0.2.1/test.7.0/61]
* └──ExchangeNode-59: [SourceAddress:192.0.4.1/test.8.0/62]
*/
// linear fill can not use TopKNode
sql =
String.format(
"select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY root.sg.d1.s1 DESC fill(linear) LIMIT %s",
Expand Down

0 comments on commit 59fe3fe

Please sign in to comment.