Skip to content

Commit

Permalink
Add unit tests for operators.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sh-Zh-7 committed Dec 23, 2024
1 parent aa55f6d commit 0f1912d
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public TsBlock next() throws Exception {
if (inputOperator.hasNextWithTimer()) {
// This TsBlock is pre-sorted with PARTITION BY and ORDER BY channels
TsBlock preSortedBlock = inputOperator.next();
// StreamSort Operator sometimes returns null
if (preSortedBlock == null || preSortedBlock.isEmpty()) {
return null;
}

cachedPartitionExecutors = partition(preSortedBlock);
if (cachedPartitionExecutors.isEmpty()) {
Expand Down Expand Up @@ -210,22 +214,24 @@ private LinkedList<PartitionExecutor> partition(TsBlock tsBlock) {
windowFunctions,
frameInfoList,
sortChannels);

partitionExecutors.addLast(partitionExecutor);
cachedTsBlocks.clear();
startIndexInFirstBlock = -1;
}

cachedTsBlocks.clear();
}

// Try to find all partitions
while (partitionEndInCurrentBlock < tsBlock.getPositionCount()) {
int count = tsBlock.getPositionCount();
while (count == 1 || partitionEndInCurrentBlock < count) {
// Try to find one partition
while (partitionEndInCurrentBlock < tsBlock.getPositionCount()
while (partitionEndInCurrentBlock < count
&& partitionComparator.equalColumns(
partitionColumns, partitionStartInCurrentBlock, partitionEndInCurrentBlock)) {
partitionEndInCurrentBlock++;
}

if (partitionEndInCurrentBlock != tsBlock.getPositionCount()) {
if (partitionEndInCurrentBlock != count) {
// Find partition
PartitionExecutor partitionExecutor;
if (partitionStartInCurrentBlock != 0 || startIndexInFirstBlock == -1) {
Expand Down Expand Up @@ -261,8 +267,12 @@ private LinkedList<PartitionExecutor> partition(TsBlock tsBlock) {
} else {
// Last partition of TsBlock
// The beginning of next TsBlock may have rows in this partition
startIndexInFirstBlock = partitionStartInCurrentBlock;
if (startIndexInFirstBlock == -1) {
startIndexInFirstBlock = partitionStartInCurrentBlock;
}
cachedTsBlocks.add(tsBlock);
// For count == 1
break;
}
}

Expand Down
Loading

0 comments on commit 0f1912d

Please sign in to comment.