简介
一直想深入阅读tidb源码,由于本人水平有限,而且现在tidb源码已经超过11万行(申砾在Tech Day上说过他们公司内部也很少有人掌握全部代码),虽然贡献过3次代码,但都不涉及核心部分,所以还是只看了非常小的一部分源码;所以这次决定利用空闲时间从github上能查到的第一次提交开始,跟随提交记录去慢慢理解学习tidb并全部会记录到本博客,希望能坚持下去吧,而且作为一个弱渣,写的有什么错误的地方希望各位大大给我指出。
https://github.com/pingcap/tidb/tree/0d6f270068e8ff2aedc1c314e907771b6a508ebd 这是tidb在github的第一次提交(2015-9-6),据申砾说他们是从15年6月份开始开发的tidb,这应该是第一个能运行的版本;这个版本只是把tidb作为一个golang内置sql的驱动嵌入的,没有实现mysql-server的协议,只是实现了部分mysql的语法,当时也还没有tikv和pd了,只有单机kv引擎(当然就没有分布式计算框架,分布式事务,在线DDL等),所以相对来说代码量还是不大的,读起来相对容易点。
阅读本系列文章前建议先阅读PingCap官方对tidb原理的描述(虽然这些文章是基于后面版本的描述,但还是会有帮助理解这个版本),然后最好把代码运行起来一步一步跟着代码走:
https://zhuanlan.zhihu.com/p/24564238(TiDB 源码初探)
https://pingcap.com/bloglist-zh(三篇文章了解 TiDB 技术内幕 —— 说存储)
https://pingcap.com/blog-tidb-internal-2-zh(三篇文章了解 TiDB 技术内幕 —— 说计算)
分析
先分析下大致的包功能,然后跟着几个sql语句来具体分析:
编译/运行环境:
ubuntu 16.04LTS golang 1.8.3 Gogland 1.0EAP
入口
首先,启动入口在包/interpreter的main.go.
func main() { printer.PrintTiDBInfo() flag.Parse() log.SetLevelByString(*logLevel) // support for signal notify runtime.GOMAXPROCS(runtime.NumCPU()) line = liner.NewLiner() defer line.Close() line.SetCtrlCAborts(true) openHistory() mdb, err := sql.Open(tidb.DriverName, *store+"://"+*dbPath) if err != nil { log.Fatal(errors.ErrorStack(err)) } for { l, err := readStatement("tidb> ") mayExit(err, l) line.AppendHistory(l) // if we're in transaction if strings.HasPrefix(l, "BEGIN") || strings.HasPrefix(l, "begin") { tx, err := mdb.Begin() if err != nil { log.Error(errors.ErrorStack(err)) continue } for { txnLine, err := readStatement(">> ") mayExit(err, txnLine) line.AppendHistory(txnLine) if !strings.HasSuffix(txnLine, ";") { txnLine += ";" } if strings.HasPrefix(txnLine, "COMMIT") || strings.HasPrefix(txnLine, "commit") { err := tx.Commit() if err != nil { log.Error(errors.ErrorStack(err)) tx.Rollback() } break } // normal sql statement err = executeLine(tx, txnLine) if err != nil { log.Error(errors.ErrorStack(err)) tx.Rollback() break } } } else { tx, err := mdb.Begin() if err != nil { log.Error(errors.ErrorStack(err)) continue } err = executeLine(tx, l) if err != nil { log.Error(errors.ErrorStack(err)) tx.Rollback() } else { tx.Commit() } } } }
其中
mdb, err := sql.Open(tidb.DriverName, *store+"://"+*dbPath)
在driver.go打开了一个tidb的Driver,这个Driver事先已经注册到了sql包
func init() { RegisterDriver() }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L512
func RegisterDriver() { driverOnce.Do(func() { sql.Register(DriverName, tidbDriver) }) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L107
然后回到main.go:
打开db后来到了这个循环
for { l, err := readStatement("tidb> ") mayExit(err, l) line.AppendHistory(l) ...
它模拟了mysql的命令行输入,每次读取一条’;’结尾的sql语句,然后使用executeLine()来执行这条语句;当为BEGIN开始事务时,则多条sql组合成一个事务,否则一条语句就是一个事务。
接下来就来到了事务的begin():
tx, err := mdb.Begin()
在go的sql包中首先会调用到sqlDriver.Open()新建了Store的kv引擎(这里是goleveldb)
func (d *sqlDriver) Open(name string) (driver.Conn, error) { store, err := NewStore(name) if err != nil { return nil, errors.Trace(err) } ...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L129
s, err := d.Open(schema)
来打开本地kv:
func (d Driver) Open(schema string) (kv.Storage, error) { mc.mu.Lock() defer mc.mu.Unlock() if store, ok := mc.cache[schema]; ok { // TODO: check the cache store has the same engine with this Driver. log.Info("cache store", schema) return store, nil } db, err := d.Driver.Open(schema) if err != nil { return nil, errors.Trace(err) } log.Info("New store", schema) s := &dbStore{ txns: make(map[int64]*dbTxn), keysLocked: make(map[string]int64), uuid: uuid.NewV4().String(), path: schema, db: db, } mc.cache[schema] = s return s, nil }
由于之前默认的schema(dbPath)是test,所以默认打开了test数据库;
然后继续回到:
sess, err := CreateSession(store) if err != nil { return nil, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L146
这里开始新建一个session(后来tidb的session是对应一个客户端的连接)
// CreateSession creates a new session environment. func CreateSession(store kv.Storage) (Session, error) { s := &session{ values: make(map[fmt.Stringer]interface{}), store: store, sid: atomic.AddInt64(&sessionID, 1), } domain, err := domap.Get(store) ...
可以看到domap.Get()来获取对应的Domain(domain 上会绑定 information schema 信息,可以获取schema 的信息以及进一步来获取表的基础信息:索引,列等。)
func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { key := store.UUID() dm.mu.Lock() defer dm.mu.Unlock() d = dm.domains[key] if d != nil { return } d, err = domain.NewDomain(store) if err != nil { return nil, errors.Trace(err) } dm.domains[key] = d return }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L57
这里再进一步看一下domain.NewDomain()
// NewDomain creates a new domain. func NewDomain(store kv.Storage) (d *Domain, err error) { infoHandle := infoschema.NewHandle(store) ddl := ddl.NewDDL(store, infoHandle) d = &Domain{ store: store, infoHandle: infoHandle, ddl: ddl, } err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema) if err != nil { return nil, errors.Trace(err) } return d, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L71
infoschema.NewHandle新建了一个infoschema.Handle{}(用来在内存获取和设置schema的相关信息)
// Handle handles information schema, including getting and setting. type Handle struct { value atomic.Value store kv.Storage } ...
回到:
// NewDomain creates a new domain. func NewDomain(store kv.Storage) (d *Domain, err error) { infoHandle := infoschema.NewHandle(store) ddl := ddl.NewDDL(store, infoHandle) d = &Domain{ store: store, infoHandle: infoHandle, ddl: ddl, } ...
接下来会执行ddl.NewDDL():
// NewDDL create new DDL func NewDDL(store kv.Storage, infoHandle *infoschema.Handle) DDL { d := &ddl{ store: store, infoHandle: infoHandle, } return d }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L66
它包含了之前新建的infoHandle,为其扩展了DDL相关的操作。
回到:
... d = &Domain{ store: store, infoHandle: infoHandle, ddl: ddl, } err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema) if err != nil { return nil, errors.Trace(err) } ...
到这里一个初始的Domain创建完了,但是还没有实际上拿到这个schema真实的信息了吧,所以接下来就要从KV中取出对应的信息了。
这就要看err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)了,其中d.loadInfoSchema是获取Schema信息的方法。
// RunInNewTxn will run the f in a new transaction evnironment. func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error { for { txn, err := store.Begin() if err != nil { log.Error(err) return errors.Trace(err) } err = f(txn) if retryable && IsRetryableError(err) { log.Warnf("Retry txn %v", txn) txn.Rollback() continue } if err != nil { return errors.Trace(err) } err = txn.Commit() if retryable && IsRetryableError(err) { log.Warnf("Retry txn %v", txn) txn.Rollback() continue } if err != nil { return errors.Trace(err) } break } return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/kv/txn.go#L36
其中RunInNewTxn是用来独立执行一个事务的,所以直接进入loadInfoSchema()观察吧:
func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) { var schemas []*model.DBInfo err = util.ScanMetaWithPrefix(txn, meta.SchemaMetaPrefix, func(key []byte, value []byte) bool { di := &model.DBInfo{} err := json.Unmarshal(value, di) if err != nil { log.Fatal(err) } schemas = append(schemas, di) return true }) if err != nil { return errors.Trace(err) } do.infoHandle.Set(schemas) return }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L37
然后调用ScanMetaWithPrefix()来获取库(Schema)的元信息:
// ScanMetaWithPrefix scans metadata with the prefix. func ScanMetaWithPrefix(txn kv.Transaction, prefix string, filter func([]byte, []byte) bool) error { iter, err := txn.Seek([]byte(prefix), hasPrefix([]byte(prefix))) if err != nil { return err } defer iter.Close() for { if err != nil { return err } if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) { if !filter([]byte(iter.Key()), iter.Value()) { break } iter, err = iter.Next(hasPrefix([]byte(prefix))) } else { break } } return nil }
其中txn.Seek()通过数据种类前缀来从KV中获取元数据(所有的索引,行数据以及各种元数据都是以kv形式存储在kv中的,所以需要前缀来区分,大致思想查看,可能和这个版本不完全一样)
func (txn *dbTxn) Seek(k []byte, fnKeyCmp func([]byte) bool) (kv.Iterator, error) { log.Debugf("seek %s txn:%d", k, txn.tID) k = kv.EncodeKey(k) iter, err := txn.UnionStore.Seek(k, txn) if err != nil { return nil, err } if !iter.Valid() { return &kv.UnionIter{}, nil } if fnKeyCmp != nil { if fnKeyCmp([]byte(iter.Key())[:1]) { return &kv.UnionIter{}, nil } } return iter, nil }
k = kv.EncodeKey(k)获取对应元数据前缀的编码,然后通过txn.UnionStore.Seek从kv中获取对应的快照(us.Snapshot)和内存更新缓存(us.Dirty)的迭代过滤器。
// UnionStore is a implement of Store which contains a buffer for update. type UnionStore struct { Dirty *memdb.DB // updates are buffered in memory Snapshot Snapshot // for read }
以goleveldb为例:
us.Dirty是个内存缓存,没有提交的事务都会先写入在内存,在事务提交的时候批量写入到磁盘。
us.Snapshot是个只读的快照,从磁盘读取当前固定版本的数据,并缓存到内存。
// Seek implements the Snapshot Seek interface. func (us *UnionStore) Seek(key []byte, txn Transaction) (Iterator, error) { snapshotIt := us.Snapshot.NewIterator(key) dirtyIt := us.Dirty.NewIterator(&util.Range{Start: key}) it := newUnionIter(dirtyIt, snapshotIt) return it, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/kv/union_store.go#L126
然后回到ScanMetaWithPrefix();
// ScanMetaWithPrefix scans metadata with the prefix. func ScanMetaWithPrefix(txn kv.Transaction, prefix string, filter func([]byte, []byte) bool) error { iter, err := txn.Seek([]byte(prefix), hasPrefix([]byte(prefix))) if err != nil { return err } defer iter.Close() for { if err != nil { return err } if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) { if !filter([]byte(iter.Key()), iter.Value()) { break } iter, err = iter.Next(hasPrefix([]byte(prefix))) } else { break } } return nil }
由于kv的key都是有序存储的(btree,跳表等增删查改的平均时间复杂度都是log(n)的数据结构),所以之前Seek返回的是>=StartKey的第一个数据value,所以接下来只要筛选到不存在这个前缀的数据即可退出循环。然后对取出来的数据json解码,解码实现在之前定义的filter().如果是第一次运行还没有创建test这个Schema了,这里是取不到任何值的。
接下来就回到了
// NewDomain creates a new domain. func NewDomain(store kv.Storage) (d *Domain, err error) { infoHandle := infoschema.NewHandle(store) ddl := ddl.NewDDL(store, infoHandle) d = &Domain{ store: store, infoHandle: infoHandle, ddl: ddl, } err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema) if err != nil { return nil, errors.Trace(err) } return d, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L79
然后绑定一些参数到ctx和session后一直回到:
... sess, err := CreateSession(store) if err != nil { return nil, errors.Trace(err) } s := sess.(*session) defer d.lock()() DBName := model.NewCIStr(name[strings.LastIndex(name, "/")+1:]) domain := sessionctx.GetDomain(s) if !domain.InfoSchema().SchemaExists(DBName) { err = domain.DDL().CreateSchema(s, DBName) if err != nil { return nil, errors.Trace(err) } } return newDriverConn(s, driver, DBName.O) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L147
接下来取出Schema名字和之前创建的domain后就需要检查当前Schema是否存在了,这里就以默认的test为例,由于是第一次运行,所以肯定还没有创建Schema了,然后就会进入domain.DDL().CreateSchema(s, DBName):
func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) { is := d.GetInformationSchema() _, ok := is.SchemaByName(schema) if ok { return ErrExists } info := &model.DBInfo{Name: schema} info.ID, err = meta.GenGlobalID(d.store) if err != nil { return errors.Trace(err) } err = d.writeSchemaInfo(info) if err != nil { return errors.Trace(err) } newInfo := append(is.Clone(), info) d.infoHandle.Set(newInfo) return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L78
这里又通过schema_name查了一次是否存在这个schema,然后进入meta.GenGlobalID(d.store)获取这个schema的全局唯一ID.
具体获取过程:
// GenGlobalID generates the next id in the store scope. func GenGlobalID(store kv.Storage) (ID int64, err error) { err = kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { ID, err = GenID(txn, []byte(nextGlobalIDPrefix), 1) if err != nil { return errors.Trace(err) } log.Info("Generate global id", ID) return nil }) return }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L66
这里独立运行一个事务来获取ID,其中nextGlobalIDPrefix是
var ( nextGlobalIDPrefix = []byte("mNextGlobalID") )
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L32
这个KEY是固定的存放累加计数。
然后进入GenID(txn, []byte(nextGlobalIDPrefix), 1):
// GenID adds step to the value for key and returns the sum. func GenID(txn kv.Transaction, key []byte, step int) (int64, error) { if len(key) == 0 { return 0, errors.New("Invalid key") } err := txn.LockKeys(key) if err != nil { return 0, err } id, err := txn.Inc(key, int64(step)) if err != nil { return 0, errors.Trace(err) } return id, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L36
txn.LockKeys(key):加锁:从kv的Snapshot(快照)中取出当前key的值缓存到txn.snapshotVals,以后本次txn读取这个key都从这个缓存读取了。具体查看:
然后进入txn.LockKeys():
func (txn *dbTxn) Inc(k []byte, step int64) (int64, error) { log.Debugf("Inc %s, step %d txn:%d", k, step, txn.tID) k = kv.EncodeKey(k) if err := txn.markOrigin(k); err != nil { return 0, err } val, err := txn.UnionStore.Get(k) if kv.IsErrNotFound(err) { err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10))) if err != nil { return 0, err } return step, nil } if err != nil { return 0, err } intVal, err := strconv.ParseInt(string(val), 10, 0) if err != nil { return intVal, err } intVal += step err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(intVal, 10))) if err != nil { return 0, err } return intVal, nil }
txn.markOrigin(k)先确保这个k缓存过了,接下来txn.UnionStore.Get(k)则从快照获取这个值,如果不存在就把当前step值赋给这个k,然后写入到UnionStore:txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10)));(这里写入的是内存,成功commit才会写入磁盘嘛);当然如果这个k已经写入过,那么取出这个值加上step后再写回去就可以了。然后就得到ID啦。
这个时候分配DBInfo的Id完成了(由于单独起了个事务,所以这时已经提交,ID已经写入磁盘全局可见了,注意并不是把schema信息写入),一路往上回,直到:
... info.ID, err = meta.GenGlobalID(d.store) if err != nil { return errors.Trace(err) } err = d.writeSchemaInfo(info) if err != nil { return errors.Trace(err) } newInfo := append(is.Clone(), info) d.infoHandle.Set(newInfo) return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L86
然后这里的d.writeSchemaInfo(info)就是启动一个独立事务把Schema信息用json编码后写入到磁盘。
func (d *ddl) writeSchemaInfo(info *model.DBInfo) error { var b []byte b, err := json.Marshal(info) if err != nil { return errors.Trace(err) } err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { key := []byte(meta.DBMetaKey(info.ID)) if err := txn.LockKeys(key); err != nil { return errors.Trace(err) } return txn.Set(key, b) }) log.Warn("save schema", string(b)) return errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L576
然后回到:
... err = d.writeSchemaInfo(info) if err != nil { return errors.Trace(err) } newInfo := append(is.Clone(), info) d.infoHandle.Set(newInfo) return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L90
这里把Schemainfo赋值到ddl的infoHandle。
接下来回到:
... if !domain.InfoSchema().SchemaExists(DBName) { err = domain.DDL().CreateSchema(s, DBName) if err != nil { return nil, errors.Trace(err) } } return newDriverConn(s, driver, DBName.O) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L156
再进入newDriverConn(s, driver, DBName.O):
func newDriverConn(sess *session, d *sqlDriver, schema string) (driver.Conn, error) { r := &driverConn{ driver: d, stmts: map[string]driver.Stmt{}, s: sess, } _, err := r.s.Execute("use " + schema) if err != nil { return nil, errors.Trace(err) } return r, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L174
创建driverConn并赋值后,这里默认运行了一个r.s.Execute(“use ” + schema)
func (s *session) Execute(sql string) ([]rset.Recordset, error) { stmts, err := Compile(sql) if err != nil { log.Errorf("Compile sql error: %s - %s", sql, err) return nil, errors.Trace(err) } var rs []rset.Recordset for _, si := range stmts { r, err := runStmt(s, si) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) } if r != nil { rs = append(rs, r) } } return rs, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120
Compile(sql)是把sql语句变成stmts结构(通过parser包的语法解析,goyacc相关的,这块暂时没有详细看)
然后进入runStmt():
func runStmt(ctx context.Context, s stmt.Statement, args ...interface{}) (rset.Recordset, error) { var err error var rs rset.Recordset // before every execution, we must clear affectedrows. variable.GetSessionVars(ctx).SetAffectedRows(0) switch s.(type) { case *stmts.PreparedStmt: ps := s.(*stmts.PreparedStmt) return runPreparedStmt(ctx, ps) case *stmts.ExecuteStmt: es := s.(*stmts.ExecuteStmt) rs, err = runExecute(ctx, es, args...) if err != nil { return nil, errors.Trace(err) } default: if s.IsDDL() { err = ctx.FinishTxn(false) if err != nil { return nil, errors.Trace(err) } } stmt.BindExecArgs(ctx, args) rs, err = s.Exec(ctx) stmt.ClearExecArgs(ctx) } // MySQL DDL should be auto-commit if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) { err = ctx.FinishTxn(false) } return rs, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L130
根据stmt的类型运行:
default: if s.IsDDL() { err = ctx.FinishTxn(false) if err != nil { return nil, errors.Trace(err) } } stmt.BindExecArgs(ctx, args) rs, err = s.Exec(ctx) stmt.ClearExecArgs(ctx) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153
进入到:
// Exec implements the stmt.Statement Exec interface. func (s *UseStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) { dbname := model.NewCIStr(s.DBName) if !sessionctx.GetDomain(ctx).InfoSchema().SchemaExists(dbname) { return nil, errors.ErrDatabaseNotExist } db.BindCurrentSchema(ctx, dbname.O) return nil, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/use.go#L58
然后获取,验证,最后BindCurrentSchema(),设置下了session的当前Schema:
// BindCurrentSchema saves parameter schema as current schema name value into context func BindCurrentSchema(ctx context.Context, schema string) { ctx.SetValue(currentDBKey, schema) }
然后一路回到:
... return newDriverConn(s, driver, DBName.O) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L161
到此sqlDriver.Open()结束了.
接下来根据sql包,会再执行到Begin():
// Begin starts and returns a new transaction. func (c *driverConn) Begin() (driver.Tx, error) { if c.s == nil { return nil, errors.Errorf("Need init first") } if _, err := c.s.Execute(txBeginSQL); err != nil { return nil, errors.Trace(err) } return c, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L221
和之前的use schema一样,会进入到自己的Exec():
// Exec implements the stmt.Statement Exec interface. func (s *BeginStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) { _, err = ctx.GetTxn(true) // With START TRANSACTION, autocommit remains disabled until you end // the transaction with COMMIT or ROLLBACK. The autocommit mode then // reverts to its previous state. variable.GetSessionVars(ctx).DisableAutocommit = true return }
至此,整个Begin结束了。
小结
1.事务提交(goleveldb为例,其他类似):事务开始时会新建一个UnionStore,包含一个内存写入修改缓存和磁盘读取快照,在事务内的修改等操作先从快照读取数据,然后把修改缓存在内存,最后在事务提交的时候批量提交操作刷新到磁盘,事务回滚则清除内存缓存即可。
2.对于KV引擎来说,每个键值KEY是可以保证有序的,而且增删查改操作都是平均log(n)的时间复杂度,这样就可以用来查找>=某个KEY值的第一个数据,然后就可以把索引按按照键值编码规则写入到KV,value保存数据的行号(且TIDB的key编码规则保证编码前后大小排序不变),这样就可以直接利用对应的startkey和endkey在kv中找到startkey,然后遍历到endkey终止,找出的所有行数据都是按照索引筛选条件的。对点查和范围查询都很友好。
通过具体SQL来分析
在接下来的文章,我会跟随下列这些SQL的执行,一条条分析:
create table t (c varchar(30), r varchar(30)); CREATE INDEX index_name ON t(c); insert into t values('c1','ret1'); SELECT r FROM t WHERE c = 'c1'; SELECT r FROM t WHERE c >= 'c1' and c <= 'c5'; DROP TABLE t;
今天就先写到这了。。。
支持!