从头实现数据库-6

Oct 31, 2023
B树数据结构可以很容易地转储到磁盘上,由于之前的B树实现是不可变的,这里 将以只追加的方式分配磁盘空间,磁盘空间的重用将推迟到下一章。

保存数据的方法

将数据持久化到磁盘不仅仅是将数据转储到文件中。有几点考虑:
  • 崩溃恢复:这包括数据库进程崩溃、操作系统崩溃和电源故障。重新启动后,数据库必须处于可用状态。
  • 持久性:在数据库成功响应之后,所涉及的数据保证会持久存在,即使在崩溃之后也是如此。换句话说,持久性发生在响应客户端之前。
有许多资料使用ACID术语(原子性、一致性、隔离性、持久性)来描述数据库,但是这些概念并不是正交的,而且很难解释,所以让我们把重点放在我们的实际示例上。
  • B树不可变的一面:更新B树不会触及B树的前一个版本,这使得崩溃恢复变得容易——如果更新出错,我们可以简单地恢复到前一个版本。
  • 持久性是通过fsync Linux系统调用实现的。通过write或mmap的普通文件IO首先进入页面缓存,系统必须稍后将页面缓存刷新到磁盘。fsync系统调用阻塞,直到所有脏页被刷新。
如果更新出错,我们如何恢复到以前的版本?我们可以将更新分为两个阶段:
  • 更新将创建新节点;将它们写入磁盘。
  • 每次更新都会创建一个新的根结点,我们需要将指向该根结点的指针存储
第一阶段可能涉及到向磁盘写入多个页面,这通常不是原子操作。但是第二阶段只涉及单个指针,并且可以在原子单页写入中完成。这使得整个操作具有原子性——如果数据库崩溃,更新就不会发生。
第一阶段必须在第二阶段之前持久化,否则,根指针可能会在崩溃后指向损坏(部分持久化)的树版本。两个阶段之间应该有一个fsync(作为屏障)。
并且在响应客户端之前也应该对第二阶段进行fsync。

基于mMAP的IO

 
// create the initial mmap that covers the whole file. func mmapInit(fp *os.File) (int, []byte, error) { fi, err := fp.Stat() if err != nil { return 0, nil, fmt.Errorf("stat: %w", err) } if fi.Size()%BTREE_PAGE_SIZE != 0 { return 0, nil, errors.New("File size is not a multiple of page size.") } mmapSize := 64 << 20 assert(mmapSize%BTREE_PAGE_SIZE == 0) for mmapSize < int(fi.Size()) { mmapSize *= 2 } // mmapSize can be larger than the file chunk, err := syscall.Mmap( int(fp.Fd()), 0, mmapSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, ) if err != nil { return 0, nil, fmt.Errorf("mmap: %w", err) } return int(fi.Size()), chunk, nil }
上面的函数创建的初始映射至少是文件的大小。映射的大小可以大于文件大小,并且文件结束后的范围是不可访问的(SIGBUS),但是可以在以后扩展该文件。
随着文件的增长,可能不得不扩展映射的范围。扩展mmap范围的系统调用是mremap。不幸的是,当通过重新映射扩展一个范围时,我们可能无法保留起始地址。扩展映射的方法是使用多个映射—为溢出文件范围创建一个新的映射。
type KV struct { Path string // internals fp *os.File tree BTree mmap struct { file int // file size, can be larger than the database size total int // mmap size, can be larger than the file size chunks [][]byte // multiple mmaps, can be non-continuous } page struct { flushed uint64 // database size in number of pages temp [][]byte // newly allocated pages } }
// extend the mmap by adding new mappings. func extendMmap(db *KV, npages int) error { if db.mmap.total >= npages*BTREE_PAGE_SIZE { return nil } // double the address space chunk, err := syscall.Mmap( int(db.fp.Fd()), int64(db.mmap.total), db.mmap.total, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, ) if err != nil { return fmt.Errorf("mmap: %w", err) } db.mmap.total += db.mmap.total db.mmap.chunks = append(db.mmap.chunks, chunk) return nil }
新映射的大小呈指数增长,因此我们不必频繁调用mmap。
下面是如何从映射地址访问一个页面。
// callback for BTree, dereference a pointer. func (db *KV) pageGet(ptr uint64) BNode { start := uint64(0) for _, chunk := range db.mmap.chunks { end := start + uint64(len(chunk))/BTREE_PAGE_SIZE if ptr < end { offset := BTREE_PAGE_SIZE * (ptr - start) return BNode{chunk[offset : offset+BTREE_PAGE_SIZE]} } start = end } panic("bad ptr") }

master page

文件的第一页是用来存储指向根目录的指针的,称之为“master-page”。分配新节点所需的页面总数,因此它也存储在那里。
| the_master_page | pages... | tree_root | pages... | | btree_root | page_used | ^ ^ | | | | +------------+----------------------+ | | | +---------------------------------------+
下面的函数在初始化数据库时读取母版页:
const DB_SIG = "BuildYourOwnDB05" // the master page format. // it contains the pointer to the root and other important bits. // | sig | btree_root | page_used | // | 16B | 8B | 8B | func masterLoad(db *KV) error { if db.mmap.file == 0 { // empty file, the master page will be created on the first write. db.page.flushed = 1 // reserved for the master page return nil } data := db.mmap.chunks[0] root := binary.LittleEndian.Uint64(data[16:]) used := binary.LittleEndian.Uint64(data[24:]) // verify the page if !bytes.Equal([]byte(DB_SIG), data[:16]) { return errors.New("Bad signature.") } bad := !(1 <= used && used <= uint64(db.mmap.file/BTREE_PAGE_SIZE)) bad = bad || !(0 <= root && root < used) if bad { return errors.New("Bad master page.") } db.tree.root = root db.page.flushed = used return nil }
下面是更新master page的功能。与读取的代码不同,它不使用映射地址进行写入。这是因为通过mmap修改页面不是原子操作。内核可能会在中途刷新页面并损坏磁盘文件,而不跨越页面边界的小写操作肯定是原子的。
// update the master page. it must be atomic. func masterStore(db *KV) error { var data [32]byte copy(data[:16], []byte(DB_SIG)) binary.LittleEndian.PutUint64(data[16:], db.tree.root) binary.LittleEndian.PutUint64(data[24:], db.page.flushed) // NOTE: Updating the page via mmap is not atomic. // Use the `pwrite()` syscall instead. _, err := db.fp.WriteAt(data[:], 0) if err != nil { return fmt.Errorf("write master page: %w", err) } return nil }

分配磁盘页面

简单地将新页面追加到数据库的末尾,直到我们在下一章中添加一个空闲列表。
新页面暂时保存在内存中,直到稍后(可能扩展文件之后)复制到文件中。
type KV struct { // omitted... page struct { flushed uint64 // database size in number of pages temp [][]byte // newly allocated pages } }
// callback for BTree, allocate a new page. func (db *KV) pageNew(node BNode) uint64 { // TODO: reuse deallocated pages assert(len(node.data) <= BTREE_PAGE_SIZE) ptr := db.page.flushed + uint64(len(db.page.temp)) db.page.temp = append(db.page.temp, node.data) return ptr } // callback for BTree, deallocate a page. func (db *KV) pageDel(uint64) { // TODO: implement this }
在编写挂起的页面之前,可能需要先扩展文件。对应的系统调用是fallocate。
// extend the file to at least `npages`. func extendFile(db *KV, npages int) error { filePages := db.mmap.file / BTREE_PAGE_SIZE if filePages >= npages { return nil } for filePages < npages { // 文件大小呈指数级增长,因此我们不必为每次更新都扩展文件。 inc := filePages / 8 if inc < 1 { inc = 1 } filePages += inc } fileSize := filePages * BTREE_PAGE_SIZE err := syscall.Fallocate(int(db.fp.Fd()), 0, 0, int64(fileSize)) if err != nil { return fmt.Errorf("fallocate: %w", err) } db.mmap.file = fileSize return nil }

初始化数据库

把我们所做的事放在一起。
func (db *KV) Open() error { // open or create the DB file fp, err := os.OpenFile(db.Path, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return fmt.Errorf("OpenFile: %w", err) } db.fp = fp // create the initial mmap sz, chunk, err := mmapInit(db.fp) if err != nil { goto fail } db.mmap.file = sz db.mmap.total = len(chunk) db.mmap.chunks = [][]byte{chunk} // btree callbacks db.tree.get = db.pageGet db.tree.new = db.pageNew db.tree.del = db.pageDel // read the master page err = masterLoad(db) if err != nil { goto fail } // done return nil fail: db.Close() return fmt.Errorf("KV.Open: %w", err) }
// cleanups func (db *KV) Close() { for _, chunk := range db.mmap.chunks { err := syscall.Munmap(chunk) assert(err == nil) } _ = db.fp.Close() }

更新操作

与查询不同,更新操作必须在返回之前持久化数据。
// read the db func (db *KV) Get(key []byte) ([]byte, bool) { return db.tree.Get(key) } // update the db func (db *KV) Set(key []byte, val []byte) error { db.tree.Insert(key, val) return flushPages(db) } func (db *KV) Del(key []byte) (bool, error) { deleted := db.tree.Delete(key) return deleted, flushPages(db) }
flushPages是持久化新页面的函数。
// persist the newly allocated pages after updates func flushPages(db *KV) error { if err := writePages(db); err != nil { return err } return syncPages(db) }
如前所述,它分为两个阶段。
func writePages(db *KV) error { // extend the file & mmap if needed npages := int(db.page.flushed) + len(db.page.temp) if err := extendFile(db, npages); err != nil { return err } if err := extendMmap(db, npages); err != nil { return err } // copy data to the file for i, page := range db.page.temp { ptr := db.page.flushed + uint64(i) copy(db.pageGet(ptr).data, page) } return nil }
超同步信号在它们之间和之后。
func syncPages(db *KV) error { // flush data to the disk. must be done before updating the master page. if err := db.fp.Sync(); err != nil { return fmt.Errorf("fsync: %w", err) } db.page.flushed += uint64(len(db.page.temp)) db.page.temp = db.page.temp[:0] // update & flush the master page if err := masterStore(db); err != nil { return err } if err := db.fp.Sync(); err != nil { return fmt.Errorf("fsync: %w", err) } return nil }
KV存储是可以使用的,但是当更新数据库时,文件不可能永远增长,将在下一章中通过重用磁盘页面来完成KV存储。

备注

Copyright © 2025 later

logo