行和列
在KV存储之上构建关系DB的第一步是添加表。表只是一堆行和列。列的子集被定义为“主键”;主键是唯一的,因此它们可以用于引用一行(在查询和辅助索引中)。
一张表是如何放进KV存储器的?我们可以把一行分成两部分:
- 主键列进入KV存储的“键”部分。
- 非主键列进入“值”部分
数据结构
下面是行和单元格的定义。目前,我们只支持两种数据类型(int64和bytes)。
const ( TYPE_ERROR = 0 TYPE_BYTES = 1 TYPE_INT64 = 2 ) // table cell type Value struct { Type uint32 I64 int64 Str []byte } // table row type Record struct { Cols []string Vals []Value } func (rec *Record) AddStr(key string, val []byte) *Record func (rec *Record) AddInt64(key string, val int64) *Record func (rec *Record) Get(key string) *Value
数据库和表的定义:
type DB struct { Path string // internals kv KV tables map[string]*TableDef // cached table definition } // table definition type TableDef struct { // user defined Name string Types []uint32 // column types Cols []string // column names PKeys int // the first `PKeys` columns are the primary key // auto-assigned B-tree key prefixes for different tables Prefix uint32 }
为了支持多个表,KV存储区中的键以一个唯一的32位数字作为前缀。
表定义必须存储在某个地方,我们将使用一个内部表来存储它们。我们还将添加一个内部表来存储DB本身使用的元数据。
// internal table: metadata var TDEF_META = &TableDef{ Prefix: 1, Name: "@meta", Types: []uint32{TYPE_BYTES, TYPE_BYTES}, Cols: []string{"key", "val"}, PKeys: 1, } // internal table: table schemas var TDEF_TABLE = &TableDef{ Prefix: 2, Name: "@table", Types: []uint32{TYPE_BYTES, TYPE_BYTES}, Cols: []string{"name", "def"}, PKeys: 1, }
点查询
让我们通过主键实现点查询,范围查询将在下一章中添加。
// get a single row by the primary key func dbGet(db *DB, tdef *TableDef, rec *Record) (bool, error) { values, err := checkRecord(tdef, *rec, tdef.PKeys) if err != nil { return false, err } key := encodeKey(nil, tdef.Prefix, values[:tdef.PKeys]) val, ok := db.kv.Get(key) if !ok { return false, nil } for i := tdef.PKeys; i < len(tdef.Cols); i++ { values[i].Type = tdef.Types[i] } decodeValues(val, values[tdef.PKeys:]) rec.Cols = append(rec.Cols, tdef.Cols[tdef.PKeys:]...) rec.Vals = append(rec.Vals, values[tdef.PKeys:]...) return true, nil }
程序是:
- 确认输入是一个完整的主键。(
checkRecord
)
- 对主键进行编码。(
encodeKey
)
- 查询KV存储。(
db.kv.Get
)
- 解码值。(
decodeValues
)
// reorder a record and check for missing columns. // n == tdef.PKeys: record is exactly a primary key // n == len(tdef.Cols): record contains all columns func checkRecord(tdef *TableDef, rec Record, n int) ([]Value, error) { // omitted... }
将数据编码成字节和从字节解码的方法将在下一章中解释。目前,任何序列化方案都适用于本章。
func encodeValues(out []byte, vals []Value) []byte func decodeValues(in []byte, out []Value) // for primary keys func encodeKey(out []byte, prefix uint32, vals []Value) []byte { var buf [4]byte binary.BigEndian.PutUint32(buf[:], prefix) out = append(out, buf[:]...) out = encodeValues(out, vals) return out }
要查询一个表,我们必须先得到它的定义。
// get a single row by the primary key func (db *DB) Get(table string, rec *Record) (bool, error) { tdef := getTableDef(db, table) if tdef == nil { return false, fmt.Errorf("table not found: %s", table) } return dbGet(db, tdef, rec) }
该定义以JSON形式存储在内部表TDEF_TABLE中。
// get the table definition by name func getTableDef(db *DB, name string) *TableDef { tdef, ok := db.tables[name] if !ok { if db.tables == nil { db.tables = map[string]*TableDef{} } tdef = getTableDefDB(db, name) if tdef != nil { db.tables[name] = tdef } } return tdef } func getTableDefDB(db *DB, name string) *TableDef { rec := (&Record{}).AddStr("name", []byte(name)) ok, err := dbGet(db, TDEF_TABLE, rec) assert(err == nil) if !ok { return nil } tdef := &TableDef{} err = json.Unmarshal(rec.Get("def").Str, tdef) assert(err == nil) return tdef }
更新
更新可以插入新行或替换现有行。B树接口进行了修改,以支持不同的更新模式。
// modes of the updates const ( MODE_UPSERT = 0 // insert or replace MODE_UPDATE_ONLY = 1 // update existing keys MODE_INSERT_ONLY = 2 // only add new keys ) type InsertReq struct { tree *BTree // out Added bool // added a new key // in Key []byte Val []byte Mode int } func (tree *BTree) InsertEx(req *InsertReq) func (db *KV) Update(key []byte, val []byte, mode int) (bool, error)
通过主键更新记录的功能:
// add a row to the table func dbUpdate(db *DB, tdef *TableDef, rec Record, mode int) (bool, error) { values, err := checkRecord(tdef, rec, len(tdef.Cols)) if err != nil { return false, err } key := encodeKey(nil, tdef.Prefix, values[:tdef.PKeys]) val := encodeValues(nil, values[tdef.PKeys:]) return db.kv.Update(key, val, mode) }
不同的更新模式:
// add a record func (db *DB) Set(table string, rec Record, mode int) (bool, error) { tdef := getTableDef(db, table) if tdef == nil { return false, fmt.Errorf("table not found: %s", table) } return dbUpdate(db, tdef, rec, mode) } func (db *DB) Insert(table string, rec Record) (bool, error) { return db.Set(table, rec, MODE_INSERT_ONLY) } func (db *DB) Update(table string, rec Record) (bool, error) { return db.Set(table, rec, MODE_UPDATE_ONLY) } func (db *DB) Upsert(table string, rec Record) (bool, error) { return db.Set(table, rec, MODE_UPSERT) }
删除行与此类似:
// delete a record by its primary key func dbDelete(db *DB, tdef *TableDef, rec Record) (bool, error) { values, err := checkRecord(tdef, rec, tdef.PKeys) if err != nil { return false, err } key := encodeKey(nil, tdef.Prefix, values[:tdef.PKeys]) return db.kv.Del(key) } func (db *DB) Delete(table string, rec Record) (bool, error) { tdef := getTableDef(db, table) if tdef == nil { return false, fmt.Errorf("table not found: %s", table) } return dbDelete(db, tdef, rec) }
创建新表
三个步骤:
- 检查表定义。
- 分配表键前缀。
- 存储下一个表前缀和表定义。
func (db *DB) TableNew(tdef *TableDef) error { if err := tableDefCheck(tdef); err != nil { return err } // check the existing table table := (&Record{}).AddStr("name", []byte(tdef.Name)) ok, err := dbGet(db, TDEF_TABLE, table) assert(err == nil) if ok { return fmt.Errorf("table exists: %s", tdef.Name) } // allocate a new prefix assert(tdef.Prefix == 0) tdef.Prefix = TABLE_PREFIX_MIN meta := (&Record{}).AddStr("key", []byte("next_prefix")) ok, err = dbGet(db, TDEF_META, meta) assert(err == nil) if ok { tdef.Prefix = binary.LittleEndian.Uint32(meta.Get("val").Str) assert(tdef.Prefix > TABLE_PREFIX_MIN) } else { meta.AddStr("val", make([]byte, 4)) } // update the next prefix binary.LittleEndian.PutUint32(meta.Get("val").Str, tdef.Prefix+1) _, err = dbUpdate(db, TDEF_META, *meta, 0) if err != nil { return err } // store the definition val, err := json.Marshal(tdef) assert(err == nil) table.AddStr("def", val) _, err = dbUpdate(db, TDEF_TABLE, *table, 0) return err }
前缀号是从TDEF META内部表的next prefix键开始递增分配的。表定义作为JSON存储在TDEF_TABLE表中。
虽然我们添加了表结构,但结果仍然是KV存储。缺少了一些重要的方面:
范围查询在下一章。
下一章中的二级索引。