前言
orderBy是velox中的排序算子,其源码位置为1
2velox/exec/OrderBy.h
velox/exec/OrderBy.cpp
其主要逻辑基本转交给了sortbuffer
来处理,sortbuffer
的源码位置为1
2velox/exec/SortBuffer.h
velox/exec/SortBuffer.cpp
本节我们将重点关注sortbuffer
的实现。
按照先易后难的顺序,我们先关注无需考虑spill
的情况,再在后续章节讨论包含spill
的情况。
算子流程图
Ctor
调用方式:1
2
3
4
5
6
7
8
9
10// in velox/exec/OrderBy.cpp
sortBuffer_ = std::make_unique<SortBuffer>(
outputType_,
sortColumnIndices,
sortCompareFlags,
pool(),
&nonReclaimableSection_,
driverCtx->prefixSortConfig(),
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr,
&spillStats_);
实现如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58SortBuffer::SortBuffer(
const RowTypePtr& input,
const std::vector<column_index_t>& sortColumnIndices,
const std::vector<CompareFlags>& sortCompareFlags,
velox::memory::MemoryPool* pool,
tsan_atomic<bool>* nonReclaimableSection,
common::PrefixSortConfig prefixSortConfig,
const common::SpillConfig* spillConfig,
folly::Synchronized<velox::common::SpillStats>* spillStats)
: input_(input),
sortCompareFlags_(sortCompareFlags), // 各列排序规则
pool_(pool),
nonReclaimableSection_(nonReclaimableSection), // 当前是否处于nonReclaimableSection
prefixSortConfig_(prefixSortConfig),
spillConfig_(spillConfig),
spillStats_(spillStats),
sortedRows_(0, memory::StlAllocator<char*>(*pool)) { // sortbuffer排序时使用行数据,这里存储有序的行数据地址
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size());
VELOX_CHECK_GT(sortCompareFlags_.size(), 0);
VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size());
VELOX_CHECK_NOT_NULL(nonReclaimableSection_);
std::vector<TypePtr> sortedColumnTypes;
std::vector<TypePtr> nonSortedColumnTypes;
std::vector<std::string> sortedSpillColumnNames;
std::vector<TypePtr> sortedSpillColumnTypes;
sortedColumnTypes.reserve(sortColumnIndices.size());
nonSortedColumnTypes.reserve(input->size() - sortColumnIndices.size());
sortedSpillColumnNames.reserve(input->size());
sortedSpillColumnTypes.reserve(input->size());
std::unordered_set<column_index_t> sortedChannelSet;
// Sorted key columns.
for (column_index_t i = 0; i < sortColumnIndices.size(); ++i) {
columnMap_.emplace_back(IdentityProjection(i, sortColumnIndices.at(i)));
sortedColumnTypes.emplace_back(input_->childAt(sortColumnIndices.at(i)));
sortedSpillColumnTypes.emplace_back(
input_->childAt(sortColumnIndices.at(i)));
sortedSpillColumnNames.emplace_back(input->nameOf(sortColumnIndices.at(i)));
sortedChannelSet.emplace(sortColumnIndices.at(i));
}
// Non-sorted key columns.
for (column_index_t i = 0, nonSortedIndex = sortCompareFlags_.size();
i < input_->size();
++i) {
if (sortedChannelSet.count(i) != 0) {
continue;
}
columnMap_.emplace_back(nonSortedIndex++, i); // columnMap_存储了行存储->列存储的索引映射,在行存储中,key列将靠前存储
nonSortedColumnTypes.emplace_back(input_->childAt(i));
sortedSpillColumnTypes.emplace_back(input_->childAt(i));
sortedSpillColumnNames.emplace_back(input->nameOf(i));
}
data_ = std::make_unique<RowContainer>(
sortedColumnTypes, nonSortedColumnTypes, pool_); // 构造RowContainer,后续用于存储行数据
spillerStoreType_ =
ROW(std::move(sortedSpillColumnNames), std::move(sortedSpillColumnTypes));
}
AddInput
将输入的列数据转为行数据存储,在存储前会进行内存评估。
实现方式:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24void SortBuffer::addInput(const VectorPtr& input) {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::SortBuffer::addInput", this);
VELOX_CHECK(!noMoreInput_);
ensureInputFits(input);
// 开辟内存,存储行数据
SelectivityVector allRows(input->size());
std::vector<char*> rows(input->size());
for (int row = 0; row < input->size(); ++row) {
rows[row] = data_->newRow();
}
auto* inputRow = input->as<RowVector>();
for (const auto& columnProjection : columnMap_) {
DecodedVector decoded(
*inputRow->childAt(columnProjection.outputChannel), allRows);
data_->store(
decoded,
folly::Range(rows.data(), input->size()),
columnProjection.inputChannel);
}
numInputRows_ += allRows.size(); // 更新输入行数
}
其中开辟内存,存储行数据的逻辑在RowContainer
中有详细描述。
我们把目光投向ensureInputFits
函数,该函数用于评估内存是否足够存储输入数据。
若评估存在内存不足的风险,则会向pool申请更多内存。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63void SortBuffer::ensureInputFits(const VectorPtr& input) {
// Check if spilling is enabled or not.
if (spillConfig_ == nullptr) {
return;
}
const int64_t numRows = data_->numRows();
if (numRows == 0) {
// 'data_' is empty. Nothing to spill.
return;
}
// 定长区还能存储多少行 + 变长区还能存储多少字节(在不扩容的前提下)
auto [freeRows, outOfLineFreeBytes] = data_->freeSpace();
const auto outOfLineBytes =
data_->stringAllocator().retainedSize() - outOfLineFreeBytes; // 当前变长区已使用的内存
const int64_t flatInputBytes = input->estimateFlatSize(); // LX TODO::看逻辑很像获取变长数据大小
// Test-only spill path.
if (numRows > 0 && testingTriggerSpill(pool_->name())) {
spill();
return;
}
const auto currentMemoryUsage = pool_->usedBytes(); // 当前内存池已使用大小
// 若当前已使用大小为U, minSpillableReservationPct记作P, 当前内存池预留内存记作R
// 经验认为,R >= U * P 时才无需spill,否则很可能内存不足
const auto minReservationBytes =
currentMemoryUsage * spillConfig_->minSpillableReservationPct / 100;
const auto availableReservationBytes = pool_->availableReservation();
const int64_t estimatedIncrementalBytes =
data_->sizeIncrement(input->size(), outOfLineBytes ? flatInputBytes : 0); // 评估所需要的内存大小
if (availableReservationBytes > minReservationBytes) { // 经过评估,总内存足够
// If we have enough free rows for input rows and enough variable length
// free space for the vector's flat size, no need for spilling.
if (freeRows > input->size() &&
(outOfLineBytes == 0 || outOfLineFreeBytes >= flatInputBytes)) { // 定长区和变长区都够用
return;
}
// If the current available reservation in memory pool is 2X the
// estimatedIncrementalBytes, no need to spill.
if (availableReservationBytes > 2 * estimatedIncrementalBytes) { // 若当前预留内存大于缩小内存的两倍,也不spill
return;
}
}
// Try reserving targetIncrementBytes more in memory pool, if succeeded, no
// need to spill.
const auto targetIncrementBytes = std::max<int64_t>(
estimatedIncrementalBytes * 2,
currentMemoryUsage * spillConfig_->spillableReservationGrowthPct / 100); // 向内存池申请max(2 * 预估值, 当前使用量 * H)大小的内存
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(targetIncrementBytes)) { // 能申请到,无需spill
return;
}
}
LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
<< " for memory pool " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
}
noMoreInput
指示后续不再存在输入,此时将触发排序逻辑。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37void SortBuffer::noMoreInput() {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::SortBuffer::noMoreInput", this);
VELOX_CHECK(!noMoreInput_); // 此前不存在重复调用
// It may trigger spill, make sure it's triggered before noMoreInput_ is set.
ensureSortFits(); // 评估进行排序时的内存是否足够,sortedRows_所需占用的内存 + prefix-sort encoding所需占用的内存
noMoreInput_ = true;
// No data.
if (numInputRows_ == 0) {
return;
}
if (spiller_ == nullptr) {
VELOX_CHECK_EQ(numInputRows_, data_->numRows());
updateEstimatedOutputRowSize(); // 评估输出时内存是否足够
// Sort the pointers to the rows in RowContainer (data_) instead of sorting
// the rows.
sortedRows_.resize(numInputRows_); // 存储有行数据的地址
RowContainerIterator iter;
data_->listRows(&iter, numInputRows_, sortedRows_.data()); // 提取当前输入的行地址至sortedRows_
PrefixSort::sort(
data_.get(), sortCompareFlags_, prefixSortConfig_, pool_, sortedRows_); // 执行prefix-sort排序或者std-sort, sortedRows_将存储排序完成后的行地址
} else {
// Spill the remaining in-memory state to disk if spilling has been
// triggered on this sort buffer. This is to simplify query OOM prevention
// when producing output as we don't support to spill during that stage as
// for now.
spill(); // 如果此前触发过spill,把当前内存里的数据也执行spill(简化流程)
finishSpill();
}
// Releases the unused memory reservation after procesing input.
pool_->release();
}