// table definition
type TableDef struct {
// user defined
Name string
Types []uint32 // column types
Cols []string // column names
PKeys int // the first `PKeys` columns are the primary key
Indexes [][]string
// auto-assigned B-tree key prefixes for different tables/indexes
Prefix uint32
IndexPrefixes []uint32
}
func checkIndexKeys(tdef *TableDef, index []string) ([]string, error) {
icols := map[string]bool{}
for _, c := range index {
// check the index columns
// omitted...
icols[c] = true
}
// add the primary key to the index
for _, c := range tdef.Cols[:tdef.PKeys] {
if !icols[c] {
index = append(index, c)
}
}
assert(len(index) < len(tdef.Cols))
return index, nil
}
func colIndex(tdef *TableDef, col string) int {
for i, c := range tdef.Cols {
if c == col {
return i
}
}
return -1
}
func tableDefCheck(tdef *TableDef) error {
// verify the table definition
// omitted...
// verify the indexes
for i, index := range tdef.Indexes {
index, err := checkIndexKeys(tdef, index)
if err != nil {
return err
}
tdef.Indexes[i] = index
}
return nil
}
// create a new table
func (db *DB) TableNew(tdef *TableDef) error {
if err := tableDefCheck(tdef); err != nil {
return err
}
// check the existing table
// omited...
// allocate new prefixes
tdef.Prefix = /* omited... */
for i := range tdef.Indexes {
prefix := tdef.Prefix + 1 + uint32(i)
tdef.IndexPrefixes = append(tdef.IndexPrefixes, prefix)
}
// update the next prefix
ntree := 1 + uint32(len(tdef.Indexes))
binary.LittleEndian.PutUint32(meta.Get("val").Str, tdef.Prefix+ntree)
_, err = dbUpdate(db, TDEF_META, *meta, 0)
if err != nil {
return err
}
// store the definition
// omited...
}
type InsertReq struct {
tree *BTree
// out
Added bool // added a new key
Updated bool // added a new key or an old key was changed
Old []byte // the value before the update
// in
Key []byte
Val []byte
Mode int
}
type DeleteReq struct {
tree *BTree
// in
Key []byte
// out
Old []byte
}
func (tree *BTree) InsertEx(req *InsertReq)
func (tree *BTree) DeleteEx(req *DeleteReq)
const (
INDEX_ADD = 1
INDEX_DEL = 2
)
// maintain indexes after a record is added or removed
func indexOp(db *DB, tdef *TableDef, rec Record, op int) {
key := make([]byte, 0, 256)
irec := make([]Value, len(tdef.Cols))
for i, index := range tdef.Indexes {
// the indexed key
for j, c := range index {
irec[j] = *rec.Get(c)
}
// update the KV store
key = encodeKey(key[:0], tdef.IndexPrefixes[i], irec[:len(index)])
done, err := false, error(nil)
switch op {
case INDEX_ADD:
done, err = db.kv.Update(&InsertReq{Key: key})
case INDEX_DEL:
done, err = db.kv.Del(&DeleteReq{Key: key})
default:
panic("what?")
}
assert(err == nil) // XXX: will fix this in later chapters
assert(done)
}
}
// add a row to the table
func dbUpdate(db *DB, tdef *TableDef, rec Record, mode int) (bool, error) {
// omitted...
req := InsertReq{Key: key, Val: val, Mode: mode}
added, err := db.kv.Update(&req)
if err != nil || !req.Updated || len(tdef.Indexes) == 0 {
return added, err
}
// maintain indexes
if req.Updated && !req.Added {
decodeValues(req.Old, values[tdef.PKeys:]) // get the old row
indexOp(db, tdef, Record{tdef.Cols, values}, INDEX_DEL)
}
if req.Updated {
indexOp(db, tdef, rec, INDEX_ADD)
}
return added, nil
}
// delete a record by its primary key
func dbDelete(db *DB, tdef *TableDef, rec Record) (bool, error) {
// omitted...
deleted, err := db.kv.Del(&req)
if err != nil || !deleted || len(tdef.Indexes) == 0 {
return deleted, err
}
// maintain indexes
if deleted {
// likewise...
}
return true, nil
}
func findIndex(tdef *TableDef, keys []string) (int, error) {
pk := tdef.Cols[:tdef.PKeys]
if isPrefix(pk, keys) {
// use the primary key.
// also works for full table scans without a key.
return -1, nil
}
// find a suitable index
winner := -2
for i, index := range tdef.Indexes {
if !isPrefix(index, keys) {
continue
}
if winner == -2 || len(index) < len(tdef.Indexes[winner]) {
winner = i
}
}
if winner == -2 {
return -2, fmt.Errorf("no index found")
}
return winner, nil
}
func isPrefix(long []string, short []string) bool {
if len(long) < len(short) {
return false
}
for i, c := range short {
if long[i] != c {
return false
}
}
return true
}
// The range key can be a prefix of the index key,
// we may have to encode missing columns to make the comparison work.
func encodeKeyPartial(
out []byte, prefix uint32, values []Value,
tdef *TableDef, keys []string, cmp int,
) []byte {
out = encodeKey(out, prefix, values)
// Encode the missing columns as either minimum or maximum values,
// depending on the comparison operator.
// 1. The empty string is lower than all possible value encodings,
// thus we don't need to add anything for CMP_LT and CMP_GE.
// 2. The maximum encodings are all 0xff bytes.
max := cmp == CMP_GT || cmp == CMP_LE
loop:
for i := len(values); max && i < len(keys); i++ {
switch tdef.Types[colIndex(tdef, keys[i])] {
case TYPE_BYTES:
out = append(out, 0xff)
break loop // stops here since no string encoding starts with 0xff
case TYPE_INT64:
out = append(out, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff)
default:
panic("what?")
}
}
return out
}
// 1. strings are encoded as null-terminated strings,
// escape the null byte so that strings contain no null byte.
// 2. "\xff" represents the highest order in key comparisons,
// also escape the first byte if it's 0xff.
func escapeString(in []byte) []byte {
// omitted...
pos := 0
if len(in) > 0 && in[0] >= 0xfe {
out[0] = 0xfe
out[1] = in[0]
pos += 2
in = in[1:]
}
// omitted...
return out
}
// the iterator for range queries
type Scanner struct {
// omitted...
db *DB
tdef *TableDef
indexNo int // -1: use the primary key; >= 0: use an index
iter *BIter // the underlying B-tree iterator
keyEnd []byte // the encoded Key2
}
// fetch the current row
func (sc *Scanner) Deref(rec *Record) {
assert(sc.Valid())
tdef := sc.tdef
rec.Cols = tdef.Cols
rec.Vals = rec.Vals[:0]
key, val := sc.iter.Deref()
if sc.indexNo < 0 {
// primary key, decode the KV pair
// omitted...
} else {
// secondary index
// The "value" part of the KV store is not used by indexes
assert(len(val) == 0)
// decode the primary key first
index := tdef.Indexes[sc.indexNo]
ival := make([]Value, len(index))
for i, c := range index {
ival[i].Type = tdef.Types[colIndex(tdef, c)]
}
decodeValues(key[4:], ival)
icol := Record{index, ival}
// fetch the row by the primary key
rec.Cols = tdef.Cols[:tdef.PKeys]
for _, c := range rec.Cols {
rec.Vals = append(rec.Vals, *icol.Get(c))
}
// TODO: skip this if the index contains all the columns
ok, err := dbGet(sc.db, tdef, rec)
assert(ok && err == nil)
}
}