摘要
BadgerDB 是一个用纯 Go 编写的可嵌入、持久且快速的键值 (KV) 数据库。它是Dgraph的底层数据库, 一个快速的分布式图数据库。它旨在成为 RocksDB 等非基于 Go 的键值存储的高性能替代品。
因为在项目中使用到了Badger的skiplist来构建LSM的memtable,所以本文就来鉴赏一下Badger的源码,向大佬们学习。
Arena内存池
Arena 是一个无锁内存池,使用原子操作避免锁竞争,他的实现主要用于 SkipList(跳表)数据结构,通过高效的内存管理来提升跳表的整体性能。它通过预分配大块内存并通过偏移量来管理内存分配,避免了频繁的系统内存分配调用
先来看Arena的常量定义
offsetSize : 获取 uint32 类型的大小(通常是4字节)
nodeAlign : 用于64位内存对齐,通常等于7(8-1)。这确保了所有节点都在64位边界上对齐,这对于原子操作是必需的,1B的7的二进制为0000 0111,取反^7为1111 1000,当任何数加上7然后再和^7按位与,其低4位都会变为1000,即保证结果一定是8的倍数(64位对齐)
const (
offsetSize = int(unsafe.Sizeof(uint32(0)))
// 即使在32位架构上,也始终将节点对齐到64位边界,
// 以确保node.value字段是64位对齐的。这是必需的,因为
// node.getValueOffset使用atomic.LoadUint64,它要求输入指针是64位对齐的。
nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1
)
在Arena结构中,n 是使用原子操作的计数器,用于追踪下一个可用的内存位置,buf是实际存储数据的字节切片
在newArena创建时参数 n 指定了内存池的总大小,同时创建指定大小的字节切片,将计数器初始化为1(保留0作为特殊值)
// Arena应该是无锁的。
type Arena struct {
n atomic.Uint32
buf []byte
}
// newArena返回一个新的arena。
func newArena(n int64) *Arena {
// 不在位置0存储数据,以保留offset=0作为一种
// nil指针。
out := &Arena{buf: make([]byte, n)}
out.n.Store(1)
return out
}
func (s *Arena) size() int64 {
return int64(s.n.Load())
}
Arena 中的节点就是跳表(Skip List)的节点,关于跳表的节点是什么,可以参考我的跳表入门文章跳表及其简单实现 - knociのblog,在节点中我们存储的“塔”,就是入门文章中的zskiplistLevel结构,他存储了节点每一个高度(level)的前进指针,因此要对他的空间进行计算和对齐
// putNode在arena中分配一个节点。该节点在指针大小的边界上对齐。返回节点在arena中的偏移量。
func (s *Arena) putNode(height int) uint32 {
// 计算永远不会被使用的"塔"(tower)大小,因为高度小于maxHeight。
unusedSize := (maxHeight - height) * offsetSize
// 用足够的字节填充分配以确保指针对齐。
// MaxNodeSize - unusedSize :先计算实际需要的节点大小
// + nodeAlign :这里加 nodeAlign 是为了预留足够的空间进行对齐
l := uint32(MaxNodeSize - unusedSize + nodeAlign)
n := s.n.Add(l)
// y.AssertTruef 是一个断言函数,用于运行时检查条件是否满足
// 在这里确保新分配的内存位置 n 不会超过 arena 缓冲区 buf 的总大小
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))
// 返回对齐后的偏移量,n - l 得到基础偏移量
// + uint32(nodeAlign) 确保有足够空间进行向上对齐
// & ^uint32(nodeAlign) 执行实际的对齐操作
m := (n - l + uint32(nodeAlign)) & ^uint32(nodeAlign)
return m
}
// getNode返回位于偏移量处的节点指针。如果偏移量为零,则返回nil节点指针。
func (s *Arena) getNode(offset uint32) *node {
if offset == 0 {
return nil
}
return (*node)(unsafe.Pointer(&s.buf[offset]))
}
// getNodeOffset返回节点在arena中的偏移量。如果节点指针为nil,则返回零偏移量。
func (s *Arena) getNodeOffset(nd *node) uint32 {
if nd == nil {
return 0
}
// 得到节点相对于缓冲区起始位置的字节偏移量
return uint32(uintptr(unsafe.Pointer(nd)) - uintptr(unsafe.Pointer(&s.buf[0])))
}
在讲解键值储存之前,Arena对于值(Value)使用了自定义的 y.ValueStruct 结构,结构的定义如下,同时有 EncodedSize() 方法,用于获取编码后的大小、有 Encode() 方法,用于将值编码到字节切片中、有 Decode() 方法,用于从字节切片中解码值
type ValueStruct struct { // 大小为18+x字节,x取决于值大小
// (1字节): 元数据标记
Meta byte
// (1字节): 用户自定义元数据
UserMeta byte
// (8字节): 过期时间戳
ExpiresAt uint64
// (可变长度): 实际存储的值
Value []byte
// (8字节)内部使用的版本号,不会被序列化
Version uint64
}
// EncodedSize返回编码后的大小
func (v *ValueStruct) EncodedSize() uint16 {
sz := len(v.Value) + 2 // meta, usermeta.
if v.ExpiresAt == 0 {
return uint16(sz + 1)
}
enc := sizeVarint(v.ExpiresAt)
return uint16(sz + enc)
}
// Encode expects a slice of length at least v.EncodedSize().
func (v *ValueStruct) Encode(b []byte) {
b[0] = v.Meta
b[1] = v.UserMeta
sz := binary.PutUvarint(b[2:], v.ExpiresAt)
copy(b[2+sz:], v.Value)
}
// Decode uses the length of the slice to infer the length of the Value field.
func (v *ValueStruct) Decode(b []byte) {
v.Meta = b[0]
v.UserMeta = b[1]
var sz int
v.ExpiresAt, sz = binary.Uvarint(b[2:])
v.Value = b[2+sz:]
}
Put会将val 复制 到arena中,这点在上面的Encode可以看出,key同样也是复制,为了更好地利用这个功能,最好复用输入缓冲区。
// 返回buf中的偏移量。用户负责记住val的大小。
// 我们也可以在arena内存储这个大小,但编码和解码会产生一些开销。
func (s *Arena) putVal(v y.ValueStruct) uint32 {
l := v.EncodedSize()
n := s.n.Add(l)
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))
m := n - l
v.Encode(s.buf[m:])
return m
}
func (s *Arena) putKey(key []byte) uint32 {
l := uint32(len(key))
// 先分配空间
n := s.n.Add(l)
// 使用 AssertTruef表明空间不足被视为一个严重错误,程序会直接 panic
y.AssertTruef(int(n) <= len(s.buf),
"Arena too small, toWrite:%d newTotal:%d limit:%d",
l, n, len(s.buf))
// m是你应该写入的偏移量。
// n = 新长度 - 键长度 给你应该写入的偏移量。
m := n - l
// 从m:n复制到缓冲区
y.AssertTrue(len(key) == copy(s.buf[m:n], key))
return m
}
// getKey返回偏移量处的字节切片。
func (s *Arena) getKey(offset uint32, size uint16) []byte {
return s.buf[offset : offset+uint32(size)]
}
// getVal返回偏移量处的字节切片。给定的大小应该只是值的
// 大小,不应包含元数据字节。
func (s *Arena) getVal(offset uint32, size uint32) (ret y.ValueStruct) {
ret.Decode(s.buf[offset : offset+size])
return
}
看完代码大家是否注意到了一个问题,那就是putNode、putVal 和 putKey 这三个方法都采用了相同的模式:先增加计数器,再检查空间是否足够。一般情况下,这种设计可能存在潜在问题:如果空间检查失败,但是计数器已经被修改,在并发环境下可能导致内存分配状态不一致。
但是我们要知道Arena是一个预分配固定大小的内存池,首先在正常情况下(空间足够),只需要一次原子操作;其次使用 AssertTruef 而不是返回错误这表明空间不足被视为一个严重错误,程序会直接 panic,在这种情况下,内存计数器的状态是否一致已经不重要;最后Arena 通常会预分配足够大的空间空间不足是一个极端情况,不是常规操作路径。
其实,这是一个典型的性能优化案例:牺牲了一些极端情况下的优雅性,换取了常规路径的最佳性能,这是Arena设计的巧妙所在。
Skiplist跳表
跳表的结构定义,用一个计数器来判断释放逻辑
const (
maxHeight = 20
heightIncrease = math.MaxUint32 / 3
)
// MaxNodeSize 是最大高度节点的内存占用。
const MaxNodeSize = int(unsafe.Sizeof(node{}))
type node struct {
// 值的多个部分被编码为单个uint64,以便可以
// 原子地加载和存储:
// 值偏移量: uint32 (位 0-31)
// 值大小 : uint16 (位 32-63)
value atomic.Uint64
// 字节切片是24字节。我们在这里试图节省空间。
keyOffset uint32 // 不可变。访问键不需要锁。
keySize uint16 // 不可变。访问键不需要锁。
// 塔的高度。
height uint16
// 大多数节点不需要使用塔的全部高度,因为每个连续层级的概率呈指数递减。
// 大于塔高度的空间永远不会被访问,所以不需要分配。
// 因此,当在arena中分配节点时,其内存占用截断以不包括不需要的塔元素。
// 所有对元素的访问都应使用CAS操作,无需锁定。
// 存储的是下一个node的32位offset
tower [maxHeight]atomic.Uint32
}
type Skiplist struct {
height atomic.Int32 // 当前高度。1 <= height <= kMaxHeight。CAS。
head *node
ref atomic.Int32 // 一个引用计数器,用于跟踪跳表的使用情况
arena *Arena
OnClose func()
}
// IncrRef 增加引用计数
func (s *Skiplist) IncrRef() {
s.ref.Add(1)
}
// DecrRef 减少引用计数,当完成使用时释放跳表
func (s *Skiplist) DecrRef() {
newRef := s.ref.Add(-1)
if newRef > 0 {
return
}
if s.OnClose != nil {
s.OnClose()
}
// 表示我们已关闭。对测试有用。同时让GC回收内存。这里的竞争条件
// 表明我们在应该没有引用时访问跳表!
s.arena = nil
// 由于head引用了arena的buf,只要保留head,GC就无法释放buf。
s.head = nil
}
// NewSkiplist 创建一个新的空跳表,具有给定的arena大小
func NewSkiplist(arenaSize int64) *Skiplist {
arena := newArena(arenaSize)
// 空头
head := newNode(arena, nil, y.ValueStruct{}, maxHeight)
s := &Skiplist{head: head, arena: arena}
s.height.Store(1)
s.ref.Store(1)
return s
}
下面是节点有关的方法,encode和decode将两个 32 位值打包成一个 64 位值的编码。其他的方法都很直观,主要是用了arena的方法。其中node 结构体和 arena 是解耦的设计,也就是说node 结构体是独立的,不依赖于特定的 Skiplist 实例,同一个 node 可以被不同的 Skiplist 使用(如果需要的话)
func newNode(arena *Arena, key []byte, v y.ValueStruct, height int) *node {
// 基本层级已在节点结构中分配。
offset := arena.putNode(height)
node := arena.getNode(offset)
node.keyOffset = arena.putKey(key)
node.keySize = uint16(len(key))
node.height = uint16(height)
node.value.Store(encodeValue(arena.putVal(v), v.EncodedSize()))
return node
}
// 将 valSize 左移 32 位(占据高 32 位)
// 将 valOffset 保持在低 32 位
// 用位运算符 | 将它们组合成一个 64 位整数
func encodeValue(valOffset uint32, valSize uint32) uint64 {
return uint64(valSize)<<32 | uint64(valOffset)
}
// 从 64 位值中提取出原来的两个 32 位值
// valOffset 直接取低 32 位
// valSize 右移 32 位后取值
func decodeValue(value uint64) (valOffset uint32, valSize uint32) {
valOffset = uint32(value)
valSize = uint32(value >> 32)
return
}
// getValueOffset 获取节点存储的值的偏移量和大小
func (s *node) getValueOffset() (uint32, uint32) {
value := s.value.Load()
return decodeValue(value)
}
// key 返回节点的键内容
func (s *node) key(arena *Arena) []byte {
return arena.getKey(s.keyOffset, s.keySize)
}
// setValue 原子地更新节点的值
func (s *node) setValue(arena *Arena, v y.ValueStruct) {
valOffset := arena.putVal(v)
value := encodeValue(valOffset, v.EncodedSize())
s.value.Store(value)
}
// getNextOffset 获取指定层级的下一个节点的偏移量
func (s *node) getNextOffset(h int) uint32 {
return s.tower[h].Load()
}
// casNextOffset 使用CAS原子地更新指定层级的下一个节点
func (s *node) casNextOffset(h int, old, val uint32) bool {
return s.tower[h].CompareAndSwap(old, val)
}
然后我们来看一些跳表的方法,在介绍主要功能Put和Get实现之前,先来看一下他们依赖的一些基础方法。
findNear是跳表中查找的逻辑,具体可以看跳表及其简单实现 - knociのblog(梅开二度)。
findSpliceForLevel 则主要用于在插入操作时找到合适的插入位置,它在指定层级寻找可以插入新节点的位置返回两个节点:插入位置的前驱节点(outBefore)和后继节点(outAfter)。
// 获取随机高度
func (s *Skiplist) randomHeight() int {
h := 1
// heightIncrease使用 MaxUint32/3 作为阈值,意味着:
// 生成的随机数 <= MaxUint32/3 的概率约为 1/3
// 这使得每个节点有约 1/3 的概率增加一层高度
for h < maxHeight && z.FastRand() <= heightIncrease {
h++
}
return h
}
// 获取下一个节点
func (s *Skiplist) getNext(nd *node, height int) *node {
return s.arena.getNode(nd.getNextOffset(height))
}
// findNear 查找靠近键的节点。
// 如果less=true,它查找最左边的节点,使得node.key < key(如果allowEqual=false)或
// node.key <= key(如果allowEqual=true)。
// 如果less=false,它查找最右边的节点,使得node.key > key(如果allowEqual=false)或
// node.key >= key(如果allowEqual=true)。
// 返回找到的节点。返回的bool值如果为true表示节点的键等于给定的键。
func (s *Skiplist) findNear(key []byte, less bool, allowEqual bool) (*node, bool) {
// 从头和最高尾开始找
x := s.head
level := int(s.getHeight() - 1)
for {
// 假设x.key < key。
next := s.getNext(x, level)
if next == nil { // x.key < key < 列表末尾
if level > 0 { // 可以进一步下降以更接近末尾。
level--
continue
}
// 层级=0。不能进一步下降。
if !less {
return nil, false
}
// 确保它不是头节点。
if x == s.head {
return nil, false
}
return x, false
}
nextKey := next.key(s.arena)
cmp := y.CompareKeys(key, nextKey)
if cmp > 0 {
// x.key < next.key < key。我们可以继续向右移动。
x = next
continue
}
if cmp == 0 {
// x.key < key == next.key。
if allowEqual {
return next, true
}
if !less {
// 我们想要>,所以去基本层级获取下一个更大的节点。
return s.getNext(next, 0), false
}
// 我们想要<。如果不是基本层级,我们应该在下一层级更接近。
if level > 0 {
level--
continue
}
// 在基本层级。返回x。
if x == s.head {
return nil, false
}
return x, false
}
// cmp < 0。换句话说,x.key < key < next。
if level > 0 {
level--
continue
}
// 在基本层级。需要返回一些内容。
if !less {
return next, false
}
// 尝试返回x。确保它不是头节点。
if x == s.head {
return nil, false
}
return x, false
}
}
// findSpliceForLevel 返回(outBefore, outAfter),
// 其中outBefore.key <= key <= outAfter.key。
// 输入的"before"告诉我们从哪里开始查找。
// 如果我们找到一个具有相同键的节点,那么我们返回outBefore = outAfter。
// 否则,outBefore.key < key < outAfter.key。
func (s *Skiplist) findSpliceForLevel(key []byte, before *node, level int) (*node, *node) {
for {
// 假设before.key < key。
next := s.getNext(before, level)
if next == nil {
return before, next
}
nextKey := next.key(s.arena)
cmp := y.CompareKeys(key, nextKey)
if cmp == 0 {
// 相等的情况。
return next, next
}
if cmp < 0 {
// before.key < key < next.key。我们完成了这一层级。
return before, next
}
before = next // 在这一层级继续向右移动。
}
}
// findLast 返回最后一个元素。如果是head(空列表),我们返回nil。
func (s *Skiplist) findLast() *node {
n := s.head
level := int(s.getHeight()) - 1
for {
next := s.getNext(n, level)
if next != nil {
n = next
continue
}
if level == 0 {
if n == s.head {
return nil
}
return n
}
level--
}
}
// 获取跳表高度
func (s *Skiplist) getHeight() int32 {
return s.height.Load()
}
// Empty 返回跳表是否为空。
func (s *Skiplist) Empty() bool {
return s.findLast() == nil
}
// MemSize 返回跳表在其内部arena中使用的内存大小。
func (s *Skiplist) MemSize() int64 { return s.arena.size() }
跳表主要功能Put和Get的实现,Put插入从Level0开始往上,因为值是存储在节点的 value 字段中的,而不是在每一层都存储一次,所有Level0检测到相同键就会覆盖
// Put 插入键值对。
func (s *Skiplist) Put(key []byte, v y.ValueStruct) {
// 由于我们允许覆写,我们可能不需要创建新节点。
// 我们甚至可能不需要增加高度。让我们推迟这些操作。
listHeight := s.getHeight()
var prev [maxHeight + 1]*node
var next [maxHeight + 1]*node
prev[listHeight] = s.head
next[listHeight] = nil
for i := int(listHeight) - 1; i >= 0; i-- {
// 获取所有层级的前驱后继
prev[i], next[i] = s.findSpliceForLevel(key, prev[i+1], i)
if prev[i] == next[i] { // 刚好可以插入
prev[i].setValue(s.arena, v)
return
}
}
// 创建一个新节点。
height := s.randomHeight()
x := newNode(s.arena, key, v, height)
// 如果需要增高,就尝试通过CAS增加s.height。
listHeight = s.getHeight()
for height > int(listHeight) {
if s.height.CompareAndSwap(listHeight, int32(height)) {
// 成功增加skiplist.height。
break
}
listHeight = s.getHeight()
}
// 我们总是从基本层级向上插入。在基本层级添加节点后,
// 我们不能在上面的层级创建节点,因为它会发现基本层级中的节点。
for i := 0; i < height; i++ {
for {
if prev[i] == nil {
y.AssertTrue(i > 1) // 这在基本层级不可能发生。
// 我们还没有计算这个层级的prev、next,因为高度超过了旧的listHeight。
// 对于这些层级,我们期望列表是稀疏的,所以我们可以直接从头开始搜索。
prev[i], next[i] = s.findSpliceForLevel(key, s.head, i)
// 在我们能够这样做之前,有人添加了完全相同的键。
// 这只能发生在基本层级。但我们知道我们不在基本层级。
y.AssertTrue(prev[i] != next[i])
}
nextOffset := s.arena.getNodeOffset(next[i])
x.tower[i].Store(nextOffset)
if prev[i].casNextOffset(i, nextOffset, s.arena.getNodeOffset(x)) {
// 成功在prev[i]和next[i]之间插入x。转到下一层级。
break
}
// CAS失败。我们需要重新计算prev和next。
// 在重做搜索时尝试使用不同的层级不太可能有帮助,
// 因为在prev[i]和next[i]之间插入大量节点的可能性很小。
prev[i], next[i] = s.findSpliceForLevel(key, prev[i], i)
if prev[i] == next[i] {
y.AssertTruef(i == 0, "equal only happend in level0:%d", i)
prev[i].setValue(s.arena, v)
return
}
}
}
}
// Get 获取与键关联的值。如果找到相同或更早版本的相同键,它返回一个有效值。
func (s *Skiplist) Get(key []byte) y.ValueStruct {
n, _ := s.findNear(key, false, true) // findGreaterOrEqual。
if n == nil {
return y.ValueStruct{}
}
nextKey := s.arena.getKey(n.keyOffset, n.keySize)
if !y.SameKey(key, nextKey) {
return y.ValueStruct{}
}
// 获取值的偏移量和大小,从 arena 中读取实际值
// 设置版本号(从键中解析),返回完整的值结构
valOffset, valSize := n.getValueOffset()
vs := s.arena.getVal(valOffset, valSize)
vs.Version = y.ParseTs(nextKey)
return vs
}
下面是迭代器的源码,它用于遍历跳表中的节点,它是在Level0迭代的,提供了一系列操作来访问和遍历数据,是不可或缺的存在,在经典LSM中,memtable的迭代器就基于skiplist的迭代器实现。
// NewIterator 返回一个跳表迭代器。你必须Close()这个迭代器。
func (s *Skiplist) NewIterator() *Iterator {
s.IncrRef()
return &Iterator{list: s}
}
// Iterator 是跳表对象的迭代器。
type Iterator struct {
list *Skiplist
n *node
}
// Close 释放迭代器持有的资源
func (s *Iterator) Close() error {
s.list.DecrRef()
return nil
}
// Valid 当迭代器位于有效节点时返回true。
func (s *Iterator) Valid() bool { return s.n != nil }
// Key 返回当前位置的键。
func (s *Iterator) Key() []byte {
return s.list.arena.getKey(s.n.keyOffset, s.n.keySize)
}
// Value 返回值。
func (s *Iterator) Value() y.ValueStruct {
valOffset, valSize := s.n.getValueOffset()
return s.list.arena.getVal(valOffset, valSize)
}
// ValueUint64 返回当前节点的uint64值。
func (s *Iterator) ValueUint64() uint64 {
return s.n.value.Load()
}
// Next 前进到下一个位置。
func (s *Iterator) Next() {
y.AssertTrue(s.Valid())
s.n = s.list.getNext(s.n, 0)
}
// Prev 前进到前一个位置。
func (s *Iterator) Prev() {
y.AssertTrue(s.Valid())
s.n, _ = s.list.findNear(s.Key(), true, false) // 查找<。不允许相等。
}
// Seek 前进到第一个键 >= target的条目。
func (s *Iterator) Seek(target []byte) {
s.n, _ = s.list.findNear(target, false, true) // 查找>=。
}
// SeekForPrev 查找键 <= target的条目。
func (s *Iterator) SeekForPrev(target []byte) {
s.n, _ = s.list.findNear(target, true, true) // 查找<=。
}
// SeekToFirst 寻找列表中的第一个条目位置。
// 如果列表不为空,迭代器的最终状态是Valid()。
func (s *Iterator) SeekToFirst() {
s.n = s.list.getNext(s.list.head, 0)
}
// SeekToLast 寻找列表中的最后一个条目位置。
// 如果列表不为空,迭代器的最终状态是Valid()。
func (s *Iterator) SeekToLast() {
s.n = s.list.findLast()
}
最后还有一个UniIterator,它是是unilateral(adj. 反方面的,单边的),其实就是给Iterator包装了一下,为了和之后的双向内存迭代表区分,毕竟人家说了我们将来可能支持双向迭代器,但目前并没有,似乎也没必要
// UniIterator 是一个单向内存表迭代器。它是Iterator的一个简单包装。
// 我们喜欢保持Iterator原样,因为它更强大,而且
// 我们将来可能支持双向迭代器。
type UniIterator struct {
iter *Iterator
reversed bool
}
// NewUniIterator 返回一个UniIterator。
func (s *Skiplist) NewUniIterator(reversed bool) *UniIterator {
return &UniIterator{
iter: s.NewIterator(),
reversed: reversed,
}
}
// Next 实现y.Interface
func (s *UniIterator) Next() {
if !s.reversed {
s.iter.Next()
} else {
s.iter.Prev()
}
}
// Rewind 实现y.Interface
func (s *UniIterator) Rewind() {
if !s.reversed {
s.iter.SeekToFirst()
} else {
s.iter.SeekToLast()
}
}
// Seek 实现y.Interface
func (s *UniIterator) Seek(key []byte) {
if !s.reversed {
s.iter.Seek(key)
} else {
s.iter.SeekForPrev(key)
}
}
// Key 实现y.Interface
func (s *UniIterator) Key() []byte { return s.iter.Key() }
// Value 实现y.Interface
func (s *UniIterator) Value() y.ValueStruct { return s.iter.Value() }
// Valid 实现y.Interface
func (s *UniIterator) Valid() bool { return s.iter.Valid() }
// Close 实现y.Interface(并释放迭代器的资源)
func (s *UniIterator) Close() error { return s.iter.Close() }
总结
Such Elegant and excellent code~
站在巨人的肩膀上,才能看的更高~
想要进步,就要多学习大佬们的好代码!
Comments NOTHING