LevelDB 源码阅读三:工具函数一览

前面两篇文章,我们分别介绍了 LevelDB 的部分基础数据结构和 LevelDB 对操作系统功能的封装,本文我们继续按照自底向上的原则,来看 util 目录下的我们尚未阅读的源代码。涉及的源文件包括:

  • util/arena.hutil/arena.cc
  • util/coding.hutil/coding.cc
  • util/crc32c.hutil/crc32c.cc
  • util/hash.hutil/hash.cc
  • util/histogram.hutil/histogram.cc
  • util/logging.hutil/logging.cc
  • util/no_destructor.h
  • util/random.h
  • include/leveldb/comparator.hutil/comparator.cc

下面我们按顺序来看相关代码。

1. 内存分配管理

arena.h 源文件中定义了用于管理内存分配的类 Arena,其 public 接口和数据成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Arena {
public:
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);

// Allocate memory with the normal alignment guarantees provided by malloc.
char* AllocateAligned(size_t bytes);

// Returns an estimate of the total memory usage of data allocated
// by the arena.
size_t MemoryUsage() const {
return memory_usage_.load(std::memory_order_relaxed);
}

private:
// Allocation state
char* alloc_ptr_;
size_t alloc_bytes_remaining_;

// Array of new[] allocated memory blocks
std::vector<char*> blocks_;

// Total memory usage of the arena.
//
// TODO(costan): This member is accessed via atomics, but the others are
// accessed without any locking. Is this OK?
std::atomic<size_t> memory_usage_;
};

从源码可以看出,Arena 类主要暴露了 3 个接口,功能分别为:

  • char* Allocate(size_t bytes):分配指定大小的内存
  • char* AllocateAligned(size_t bytes):分配对齐的指定大小内存
  • size_t MemoryUsage() const:返回 Arena 类分配的内存总量

Arena 类自行进行内存的回收,不需要申请内存的调用者进行内存的回收操作。这一点可以从 Arena 类析构函数中看出:

1
2
3
4
5
Arena::~Arena() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
}
}

Arena 类在析构时自动进行内存的释放。

另外,在源码注释中可以看到:

1
2
// TODO(costan): This member is accessed via atomics, but the others are
// accessed without any locking. Is this OK?

只有数据成员 memory_usage_ 是原子类型,其他成员是普通类型且没有锁保护,LevelDB 作者对于这种实现能否保证并发安全并不确定。查看相关代码,可以发现,多线程并发访问 Allocate 接口时,是有可能发生并发访问错误的:

1
2
3
4
5
6
7
8
9
10
11
12
13
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}

例如,线程 1 和线程 2 同时调用 Allocate 函数,申请相同 bytes 大小的内存,并发进入 if (bytes <= alloc_bytes_remaining_) 分支,并同时返回了 result 指针。这样,两个线程就会持有执行同一个内存区域的指针,后续如果两个线程都对内存进行写入操作,就会导致部分写失效,发生非预期行为。

我们在 arena_test.cc 中添加以下测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
TEST(ArenaTest, ConcurrencyAllocate) {
Arena arena;
std::vector<char*> thread1_mem;
std::vector<char*> thread2_mem;
constexpr int allocate_times = 1000000;
size_t bytes = 100;
std::thread t1([&] {
for (int i = 0; i < allocate_times; ++i) {
thread1_mem.emplace_back(arena.Allocate(bytes));
}
});

std::thread t2([&] {
for (int i = 0; i < allocate_times; ++i) {
thread2_mem.emplace_back(arena.Allocate(bytes));
}
});

t1.join();
t2.join();

for (const char* ptr : thread1_mem) {
auto iter = std::find(thread2_mem.begin(), thread2_mem.end(), ptr);
ASSERT_EQ(iter, thread2_mem.end());
}
}

重新编译并使用 ./leveldb_tests --gtest_filter=Arena* 命令运行,多运行几次就可能因为并发申请内存出错:

当然,也可能因为并发访问的原因(double free 等)导致测试进程异常退出。

这一问题的修复在 LevelDB 的官方仓库中已有相关 PR,不过似乎由于 LevelDB 缺乏维护,相关 PR 并没有被 merge。在笔者的 GitHub 仓库(UnpureRationalist/leveldb))中,利用 LevelDB 已有的接口(port/port.hutil/mutexlock.h)实现了 Arena 类的安全并发操作。

2. 编解码

负责编解码的代码为 util/coding.hutil/coding.cc。根据源码注释,LevelDB 采用了小端编码,支持对固定长度的数据和可变长度的字符串进行编码。对于字符串,先编码字符串的长度,然后存储字符串内容。其提供的部分接口包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Standard Put... routines append to a string
void PutFixed32(std::string* dst, uint32_t value);
void PutFixed64(std::string* dst, uint64_t value);
void PutVarint32(std::string* dst, uint32_t value);
void PutVarint64(std::string* dst, uint64_t value);
void PutLengthPrefixedSlice(std::string* dst, const Slice& value);

// Standard Get... routines parse a value from the beginning of a Slice
// and advance the slice past the parsed value.
bool GetVarint32(Slice* input, uint32_t* value);
bool GetVarint64(Slice* input, uint64_t* value);
bool GetLengthPrefixedSlice(Slice* input, Slice* result);

// Pointer-based variants of GetVarint... These either store a value
// in *v and return a pointer just past the parsed value, or return
// nullptr on error. These routines only look at bytes in the range
// [p..limit-1]
const char* GetVarint32Ptr(const char* p, const char* limit, uint32_t* v);
const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* v);

// Returns the length of the varint32 or varint64 encoding of "v"
int VarintLength(uint64_t v);

这些函数实现了对不同长度数据的编解码。需要注意的是,LevelDB 实现了对 uint32_tuint64_t 类型的变长编码以节省存储空间,具体的编码方式为:每个字节的低 7 位表示具体的数位,最高位表示是否有后续。具体的实现细节涉及一些位运算操作,本文不详细解释,感兴趣的读者请自行阅读相关代码。

3. 循环冗余校验

util/crc32c.hutil/crc32c.cc 源文件实现了 CRC32C 循环冗余校验算法,用于数据完整性校验。这部分最重要的就是用于计算一段数据 CRC32C 校验值的函数:

1
2
// Return the crc32c of data[0,n-1]
inline uint32_t Value(const char* data, size_t n) { return Extend(0, data, n); }

CRC32C 的具体实现代码本文不介绍。

4. 哈希函数

util/hash.hutil/hash.cc 源文件定义并实现了对变长数据的哈希值计算。接口如下:

1
uint32_t Hash(const char* data, size_t n, uint32_t seed);

哈希函数对长度为 n 的输入数据的每个字节进行运算,得到一个哈希值。

5. 直方图统计数据

实现这一功能的是 util/histogram.hutil/histogram.cc 源文件。接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Histogram {
public:
Histogram() {}
~Histogram() {}

void Clear();
void Add(double value);
void Merge(const Histogram& other);

std::string ToString() const;

private:
enum { kNumBuckets = 154 };

double Median() const;
double Percentile(double p) const;
double Average() const;
double StandardDeviation() const;

static const double kBucketLimit[kNumBuckets];

double min_;
double max_;
double num_;
double sum_;
double sum_squares_;

double buckets_[kNumBuckets];
};

这一类用于维护中位数等一些统计信息,实现代码只涉及到一些数学知识。

6. 日志

负责日志的源文件为 util/logging.hutil/logging.cclogging.h 头文件中声明的接口有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Append a human-readable printout of "num" to *str
void AppendNumberTo(std::string* str, uint64_t num);

// Append a human-readable printout of "value" to *str.
// Escapes any non-printable characters found in "value".
void AppendEscapedStringTo(std::string* str, const Slice& value);

// Return a human-readable printout of "num"
std::string NumberToString(uint64_t num);

// Return a human-readable version of "value".
// Escapes any non-printable characters found in "value".
std::string EscapeString(const Slice& value);

// Parse a human-readable number from "*in" into *value. On success,
// advances "*in" past the consumed number and sets "*val" to the
// numeric value. Otherwise, returns false and leaves *in in an
// unspecified state.
bool ConsumeDecimalNumber(Slice* in, uint64_t* val);

这些函数用于将数组、字符串等转换为人类可读的形式,并写入结果字符串。

7. 不析构类模板

这一类模板(util/no_destructor.h)与之前介绍过的 SingletonEnv 单例模板类 有点类似,也是实现了对象在预先分配的内存上构造,并不允许调用析构函数。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Wraps an instance whose destructor is never called.
//
// This is intended for use with function-level static variables.
template <typename InstanceType>
class NoDestructor {
public:
template <typename... ConstructorArgTypes>
explicit NoDestructor(ConstructorArgTypes&&... constructor_args) {
static_assert(sizeof(instance_storage_) >= sizeof(InstanceType),
"instance_storage_ is not large enough to hold the instance");
static_assert(
alignof(decltype(instance_storage_)) >= alignof(InstanceType),
"instance_storage_ does not meet the instance's alignment requirement");
new (&instance_storage_)
InstanceType(std::forward<ConstructorArgTypes>(constructor_args)...);
}

~NoDestructor() = default;

NoDestructor(const NoDestructor&) = delete;
NoDestructor& operator=(const NoDestructor&) = delete;

InstanceType* get() {
return reinterpret_cast<InstanceType*>(&instance_storage_);
}

private:
typename std::aligned_storage<sizeof(InstanceType),
alignof(InstanceType)>::type instance_storage_;
};

8. 随机数生成

util/random.h 源文件中实现了一个生成随机数的类 Random,部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// A very simple random number generator.  Not especially good at
// generating truly random bits, but good enough for our needs in this
// package.
class Random {
private:
uint32_t seed_;

public:
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) {
// Avoid bad seeds.
if (seed_ == 0 || seed_ == 2147483647L) {
seed_ = 1;
}
}
uint32_t Next() {
// 具体实现省略
}
// Returns a uniformly distributed value in the range [0..n-1]
// REQUIRES: n > 0
uint32_t Uniform(int n) { return Next() % n; }

// Randomly returns true ~"1/n" of the time, and false otherwise.
// REQUIRES: n > 0
bool OneIn(int n) { return (Next() % n) == 0; }

// Skewed: pick "base" uniformly from range [0,max_log] and then
// return "base" random bits. The effect is to pick a number in the
// range [0,2^max_log-1] with exponential bias towards smaller numbers.
uint32_t Skewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); }
};

其中,最重要的就是 uint32_t Next() 函数,用于生成下一个随机数。

9. 比较接口

include/leveldb/comparator.hutil/comparator.cc 源文件实现 LevelDB 中排序操作所需的比较操作的接口抽象和实现。由于 Slice 是 LevelDB 中最关键的数据结构,KeyValue 都以 Slice 类型表示,因此 Compare 接口针对的类型就是 Slice,部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// A Comparator object provides a total order across slices that are
// used as keys in an sstable or a database. A Comparator implementation
// must be thread-safe since leveldb may invoke its methods concurrently
// from multiple threads.
class LEVELDB_EXPORT Comparator {
public:
virtual ~Comparator();

// 三路比较运算,大等小分别返回 1,0,-1
virtual int Compare(const Slice& a, const Slice& b) const = 0;

// 返回比较器名称
virtual const char* Name() const = 0;

// Advanced functions: these are used to reduce the space requirements
// for internal data structures like index blocks.

// If *start < limit, changes *start to a short string in [start,limit).
// Simple comparator implementations may return with *start unchanged,
// i.e., an implementation of this method that does nothing is correct.
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const = 0;

// Changes *key to a short string >= *key.
// Simple comparator implementations may return with *key unchanged,
// i.e., an implementation of this method that does nothing is correct.
virtual void FindShortSuccessor(std::string* key) const = 0;
};

util/comparator.cc 源文件中,BytewiseComparatorImpl 类实现了上述接口,进行 byte wise 的比较。其内部实现调用了 Slice 类型的 compare 函数,相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() = default;

const char* Name() const override { return "leveldb.BytewiseComparator"; }

int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b); // 调用 Slice 类成员函数,进行逐字节比较
}

void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
// 省略
}

void FindShortSuccessor(std::string* key) const override {
// 省略
}
};

此外,该源文件中还用到了上文介绍的 NoDestructor 类模板:

1
2
3
4
const Comparator* BytewiseComparator() {
static NoDestructor<BytewiseComparatorImpl> singleton;
return singleton.get();
}

用于返回全局唯一的逐字节比较器。

10. 总结

以上就是对 LevelDB 中 util 目录下部分源代码的介绍,涉及到的代码和功能较为繁杂。除了本文罗列的相关源文件,util 目录下实际上还有一些我们之前已经介绍过的代码,如:EnvStatus 等;此外,还有少量源文件,因为篇幅限制,且其代码稍长或者比较重要,会在本系列的后续文章中进行介绍。


LevelDB 源码阅读三:工具函数一览
https://arcsin2.cloud/2024/06/14/LevelDB-源码阅读三:工具函数一览/
作者
arcsin2
发布于
2024年6月14日
许可协议