这里用Golang实现了一个不可变的B+树,实现是最小的,因此很容易遵循。
节点格式
B-tree最终将持久化到磁盘中,因此我们需要首先为B-tree节点设计连线格式。如果没有格式,我们将不知道节点的大小和何时拆分节点。
一个节点包含:
- 一个固定大小的标头,包含节点的类型(叶节点或内部节点)和键的数量
- 指向子节点的指针列表。(由内部节点使用)
- 指向每个键值对的偏移量列表
- 填充KV对
| type | nkeys | pointers | offsets | key-values | 2B | 2B | nkeys * 8B | nkeys * 2B | ...
这是KV对的格式。长度后面跟着数据
| klen | vlen | key | val | | 2B | 2B | ... | ... |
为了简单起见,叶节点和内部节点都使用相同的格式
数据类型
既然我们最终要将B树存储到磁盘,为什么不同时使用字节数组作为内存中的数据结构呢?
type BNode struct { data []byte // 可以存储到磁盘 } const ( BNODE_NODE = 1 // 内部节点 只有key 没有 value BNODE_LEAF = 2 // 叶子节点 key-value )
不能使用内存中的指针,指针是引用磁盘页而不是内存节点的64位整数。我们将添加一些回调来抽象这个方面,以便我们的数据结构代码仍然是纯数据结构代码
type BTree struct { // 指针 非零页码 root uint64 // 用于管理磁盘页面的回调 get func(uint64) BNode // 取消引用指针 new func(BNode) uint64 // 分配新的一页 del func(uint64) // 取消分配页 }
页面大小被定义为4K字节。更大的页面大小(如8K或16K)也可以工作
我们还添加了一些对键和值大小的限制。因此具有单个KV对的节点总是适合于单个页面。如果需要支持更大的键或值,则必须为它们分配额外的页面,这会增加复杂性。
// 头部大小 const HEADER = 4 // 单页大小 const BTREE_PAGE_SIZE = 4096 // 最大键值 const BTREE_MAX_KEY_SIZE = 1000 // 最大值 const BTREE_MAX_VAL_SIZE = 3000
解码B树节点
由于节点只是一个字节数组,这里添加一些辅助函数来访问它的内容
// 获取节点 类型 func (node BNode) btype() uint16 { return binary.LittleEndian.Uint16(node.data) } // 获取节点上key的数量 func (node BNode) nkeys() uint16 { return binary.LittleEndian.Uint16(node.data[2:4]) } // 设置节点的头部 类型 + key数量 func (node BNode) setHeader(btype uint16, nkeys uint16) { binary.LittleEndian.PutUint16(node.data[0:2], btype) binary.LittleEndian.PutUint16(node.data[2:4], nkeys) }
// 获取对应索引的指针 func (node BNode) getPtr(idx uint16) uint64 { assert(idx < node.nkeys()) pos := HEADER + 8*idx return binary.LittleEndian.Uint64(node.data[pos:]) } // 设置对应索引的指针 func (node BNode) setPtr(idx uint16, val uint64) { assert(idx < node.nkeys()) pos := HEADER + 8*idx binary.LittleEndian.PutUint64(node.data[pos:], val) }
关于偏移量列表的一些细节:
- 偏移量相对于第一KV对的位置。
- 第一个KV对的偏移量总是零,所以它不存储在列表中。
- 在偏移量列表中存储到最后一个KV对末尾的偏移量,该偏移量用于确定节点的大小。
// 获取指定索引的位置 func offsetPos(node BNode, idx uint16) uint16 { assert(1 <= idx && idx <= node.nkeys()) return HEADER + 8*node.nkeys() + 2*(idx-1) } // 获取指定索引偏移量 func (node BNode) getOffset(idx uint16) uint16 { if idx == 0 { return 0 } return binary.LittleEndian.Uint16(node.data[offsetPos(node, idx):]) } // 设置指定索引偏移量 func (node BNode) setOffset(idx uint16, offset uint16) { binary.LittleEndian.PutUint16(node.data[offsetPos(node, idx):], offset) }
偏移列表用于快速定位第n个KV对。
// 返回指定索引的k-v位置 func (node BNode) kvPos(idx uint16) uint16 { assert(idx <= node.nkeys()) return HEADER + 8*node.nkeys() + 2*node.nkeys() + node.getOffset(idx) } // 获取指定索引的key func (node BNode) getKey(idx uint16) []byte { assert(idx < node.nkeys()) pos := node.kvPos(idx) klen := binary.LittleEndian.Uint16(node.data[pos:]) return node.data[pos+4:][:klen] } // 获取指定索引的value func (node BNode) getVal(idx uint16) []byte { assert(idx < node.nkeys()) pos := node.kvPos(idx) klen := binary.LittleEndian.Uint16(node.data[pos+0:]) vlen := binary.LittleEndian.Uint16(node.data[pos+2:]) return node.data[pos+4+klen:][:vlen] }
并确定节点的大小。
// 获取节点大小 func (node BNode) nbytes() uint16 { return node.kvPos(node.nkeys()) }
B-树插入
代码被分解成几个小步骤。
- 查找key
要在叶节点中插入一个键,需要在排序的KV列表中查找它的位置
// 返回与key相交的第一个子节点的索引. (kid[i] <= key) func nodeLookupLE(node BNode, key []byte) uint16 { nkeys := node.nkeys() found := uint16(0) // 第一个key是父节点的副本,因此第一个key总是小于或者等于key. for i := uint16(1); i < nkeys; i++ { cmp := bytes.Compare(node.getKey(i), key) if cmp <= 0 { found = i } if cmp >= 0 { break } } return found }
查找对叶节点和内部节点都有效。注意,为了进行比较,跳过了第一个键,因为已经与父结点进行了比较
- 更新叶节点
在查找要插入的位置后,需要创建一个包含新键的节点副本
// 给叶子节点插入一个新的key func leafInsert( new BNode, old BNode, idx uint16, key []byte, val []byte, ) { // 设置新的node节点的头 new.setHeader(BNODE_LEAF, old.nkeys()+1) // 复制老的节点插入位置的前半段到新的节点 nodeAppendRange(new, old, 0, 0, idx) // 给新的节点插入k/v nodeAppendKV(new, idx, 0, key, val) // 复制老的节点插入位置的后半段到新的节点 nodeAppendRange(new, old, idx+1, idx, old.nkeys()-idx) } func leafUpdate( new BNode, old BNode, idx uint16, key []byte, val []byte, ) { new.setHeader(BNODE_LEAF, old.nkeys()) nodeAppendRange(new, old, 0, 0, idx) nodeAppendKV(new, idx, 0, key, val) nodeAppendRange(new, old, idx+1, idx+1, old.nkeys()-(idx+1)) }
nodeAppendRange函数将键从旧节点复制到新节点。
// copy multiple KVs into the position func nodeAppendRange( new BNode, old BNode, dstNew uint16, srcOld uint16, n uint16, ) { assert(srcOld+n <= old.nkeys()) assert(dstNew+n <= new.nkeys()) if n == 0 { return } // 设置指针 只有内部节点有指针 理论上叶子节点的指针都是0 for i := uint16(0); i < n; i++ { new.setPtr(dstNew+i, old.getPtr(srcOld+i)) } // 偏移量 偏移量比较特殊 要算的是后一位的偏移量 dstBegin := new.getOffset(dstNew) srcBegin := old.getOffset(srcOld) for i := uint16(1); i <= n; i++ { // NOTE: the range is [1, n] offset := dstBegin + old.getOffset(srcOld+i) - srcBegin new.setOffset(dstNew+i, offset) } // KVs begin := old.kvPos(srcOld) end := old.kvPos(srcOld + n) copy(new.data[new.kvPos(dstNew):], old.data[begin:end]) }
nodeAppendKV函数将一个KV对复制到新节点。
// copy a KV into the position func nodeAppendKV(new BNode, idx uint16, ptr uint64, key []byte, val []byte) { // ptrs new.setPtr(idx, ptr) // KVs pos := new.kvPos(idx) binary.LittleEndian.PutUint16(new.data[pos+0:], uint16(len(key))) binary.LittleEndian.PutUint16(new.data[pos+2:], uint16(len(val))) copy(new.data[pos+4:], key) copy(new.data[pos+4+uint16(len(key)):], val) // the offset of the next key new.setOffset(idx+1, new.getOffset(idx)+4+uint16((len(key)+len(val)))) }
- 递归插入
插入一个键的主要功能。
// 向一个节点插入一个KV,结果可能会被分成2个节点 // the caller 负责取消分配输入节点并拆分和分配结果节点. func treeInsert(tree *BTree, node BNode, key []byte, val []byte) BNode { // 插入的节点允许大于 1 页,如果超过则将被拆分 new := BNode{data: make([]byte, 2*BTREE_PAGE_SIZE)} // 要插入的key的位置 idx := nodeLookupLE(node, key) // 节点的类型 switch node.btype() { case BNODE_LEAF: // 叶子节点 节点的key小于等于要插入的key的位置 if bytes.Equal(key, node.getKey(idx)) { // 等于 直接更新 leafUpdate(new, node, idx, key, val) } else { // 小于 在当前key后边插入 leafInsert(new, node, idx+1, key, val) } case BNODE_NODE: // 内部节点 将其插入到子节点中 nodeInsert(tree, new, node, idx, key, val) default: panic("bad node!") } return new }
leafUpdate函数类似于leafInsert函数。
- 处理内部节点
现在是处理内部节点的代码。
// part of the treeInsert(): KV insertion to an internal node func nodeInsert( tree *BTree, new BNode, node BNode, idx uint16, key []byte, val []byte, ) { // 获取子节点的指针 kptr := node.getPtr(idx) // 根据指针获取子节点 knode := tree.get(kptr) // 删除子节点 tree.del(kptr) // 递归插入到子节点 knode = treeInsert(tree, knode, key, val) // 分裂节点 nsplit, splited := nodeSplit3(knode) // 更新子节点的指针 nodeReplaceKidN(tree, new, node, idx, splited[:nsplit]...) }
- 拆分大节点
将键插入节点会增加节点的大小,导致其超出页面大小。在这种情况下,节点被分割成多个较小的节点。
允许的最大键大小和值大小只能保证单个KV对始终适合于一个页面。在最坏的情况下,较大的节点被分成3个节点(中间有一个大的KV对)。
// 将大于允许的节点分成两个。 // 第二个节点始终适合页面。 func nodeSplit2(left BNode, right BNode, old BNode) { assert(old.nkeys() >= 2) // 最初的值 nleft := old.nkeys() / 2 // 尝试分配左半部分 left_bytes := func() uint16 { return HEADER + 8*nleft + 2*nleft + old.getOffset(nleft) } for left_bytes() > BTREE_PAGE_SIZE { nleft-- } assert(nleft >= 1) // 尝试分配右半部分 right_bytes := func() uint16 { return old.nbytes() - left_bytes() + HEADER } for right_bytes() > BTREE_PAGE_SIZE { nleft++ } assert(nleft < old.nkeys()) nright := old.nkeys() - nleft left.setHeader(old.btype(), nleft) right.setHeader(old.btype(), nright) nodeAppendRange(left, old, 0, 0, nleft) nodeAppendRange(right, old, 0, nleft, nright) // 左半部分可能依旧比较 assert(right.nbytes() <= BTREE_PAGE_SIZE) } // 如果节点太大,则拆分节点。结果是1~3个节点。 func nodeSplit3(old BNode) (uint16, [3]BNode) { // 不用分裂的情况 直接返回指定的格式 if old.nbytes() <= BTREE_PAGE_SIZE { old.data = old.data[:BTREE_PAGE_SIZE] return 1, [3]BNode{old} } left := BNode{make([]byte, 2*BTREE_PAGE_SIZE)} // 或许之后再分裂 right := BNode{make([]byte, BTREE_PAGE_SIZE)} nodeSplit2(left, right, old) if left.nbytes() <= BTREE_PAGE_SIZE { left.data = left.data[:BTREE_PAGE_SIZE] return 2, [3]BNode{left, right} } // 左边的节点依旧太大了 继续分割 leftleft := BNode{make([]byte, BTREE_PAGE_SIZE)} middle := BNode{make([]byte, BTREE_PAGE_SIZE)} nodeSplit2(leftleft, middle, left) assert(leftleft.nbytes() <= BTREE_PAGE_SIZE) return 3, [3]BNode{leftleft, middle, right} }
- 更新内部节点
在节点中插入一个键可能会产生1个、2个或3个节点。父结点必须相应地更新自身。更新内部节点的代码与更新叶节点的代码类似。
// replace a link with multiple links func nodeReplaceKidN( tree *BTree, new BNode, old BNode, idx uint16, kids ...BNode, ) { inc := uint16(len(kids)) // 更新一个节点 if inc == 1 && bytes.Equal(kids[0].getKey(0), old.getKey(idx)) { // 普通情况 只替换一个指针 nodeReplaceKid1ptr(new, old, idx, tree.new(kids[0])) return } // 这里的new是内部节点 先设置内部节点的头 然后把旧的内部节点信息拷贝到新的内部节点上 new.setHeader(BNODE_NODE, old.nkeys()+inc-1) nodeAppendRange(new, old, 0, 0, idx) for i, node := range kids { nodeAppendKV(new, idx+uint16(i), tree.new(node), node.getKey(0), nil) } nodeAppendRange(new, old, idx+inc, idx+1, old.nkeys()-(idx+1)) } func nodeReplaceKid1ptr(new BNode, old BNode, idx uint16, ptr uint64) { copy(new.data, old.data[:old.nbytes()]) new.setPtr(idx, ptr) // only the pointer is changed }
已经完成了B树插入。删除和代码的其余部分将在下一章介绍