简介
一直想深入阅读tidb源码,由于本人水平有限,而且现在tidb源码已经超过11万行(申砾在Tech Day上说过他们公司内部也很少有人掌握全部代码),虽然贡献过3次代码,但都不涉及核心部分,所以还是只看了非常小的一部分源码;所以这次决定利用空闲时间从github上能查到的第一次提交开始,跟随提交记录去慢慢理解学习tidb并全部会记录到本博客,希望能坚持下去吧,而且作为一个弱渣,写的有什么错误的地方希望各位大大给我指出。
https://github.com/pingcap/tidb/tree/0d6f270068e8ff2aedc1c314e907771b6a508ebd 这是tidb在github的第一次提交(2015-9-6),据申砾说他们是从15年6月份开始开发的tidb,这应该是第一个能运行的版本;这个版本只是把tidb作为一个golang内置sql的驱动嵌入的,没有实现mysql-server的协议,只是实现了部分mysql的语法,当时也还没有tikv和pd了,只有单机kv引擎(当然就没有分布式计算框架,分布式事务,在线DDL等),所以相对来说代码量还是不大的,读起来相对容易点。
阅读本系列文章前建议先阅读PingCap官方对tidb原理的描述(虽然这些文章是基于后面版本的描述,但还是会有帮助理解这个版本),然后最好把代码运行起来一步一步跟着代码走:
https://zhuanlan.zhihu.com/p/24564238(TiDB 源码初探)
https://pingcap.com/bloglist-zh(三篇文章了解 TiDB 技术内幕 —— 说存储)
https://pingcap.com/blog-tidb-internal-2-zh(三篇文章了解 TiDB 技术内幕 —— 说计算)
分析
先分析下大致的包功能,然后跟着几个sql语句来具体分析:
编译/运行环境:
ubuntu 16.04LTS golang 1.8.3 Gogland 1.0EAP
入口
首先,启动入口在包/interpreter的main.go.
func main() {
printer.PrintTiDBInfo()
flag.Parse()
log.SetLevelByString(*logLevel)
// support for signal notify
runtime.GOMAXPROCS(runtime.NumCPU())
line = liner.NewLiner()
defer line.Close()
line.SetCtrlCAborts(true)
openHistory()
mdb, err := sql.Open(tidb.DriverName, *store+"://"+*dbPath)
if err != nil {
log.Fatal(errors.ErrorStack(err))
}
for {
l, err := readStatement("tidb> ")
mayExit(err, l)
line.AppendHistory(l)
// if we're in transaction
if strings.HasPrefix(l, "BEGIN") || strings.HasPrefix(l, "begin") {
tx, err := mdb.Begin()
if err != nil {
log.Error(errors.ErrorStack(err))
continue
}
for {
txnLine, err := readStatement(">> ")
mayExit(err, txnLine)
line.AppendHistory(txnLine)
if !strings.HasSuffix(txnLine, ";") {
txnLine += ";"
}
if strings.HasPrefix(txnLine, "COMMIT") || strings.HasPrefix(txnLine, "commit") {
err := tx.Commit()
if err != nil {
log.Error(errors.ErrorStack(err))
tx.Rollback()
}
break
}
// normal sql statement
err = executeLine(tx, txnLine)
if err != nil {
log.Error(errors.ErrorStack(err))
tx.Rollback()
break
}
}
} else {
tx, err := mdb.Begin()
if err != nil {
log.Error(errors.ErrorStack(err))
continue
}
err = executeLine(tx, l)
if err != nil {
log.Error(errors.ErrorStack(err))
tx.Rollback()
} else {
tx.Commit()
}
}
}
}
其中
mdb, err := sql.Open(tidb.DriverName, *store+"://"+*dbPath)
在driver.go打开了一个tidb的Driver,这个Driver事先已经注册到了sql包
func init() {
RegisterDriver()
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L512
func RegisterDriver() {
driverOnce.Do(func() { sql.Register(DriverName, tidbDriver) })
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L107
然后回到main.go:
打开db后来到了这个循环
for {
l, err := readStatement("tidb> ")
mayExit(err, l)
line.AppendHistory(l)
...
它模拟了mysql的命令行输入,每次读取一条’;’结尾的sql语句,然后使用executeLine()来执行这条语句;当为BEGIN开始事务时,则多条sql组合成一个事务,否则一条语句就是一个事务。
接下来就来到了事务的begin():
tx, err := mdb.Begin()
在go的sql包中首先会调用到sqlDriver.Open()新建了Store的kv引擎(这里是goleveldb)
func (d *sqlDriver) Open(name string) (driver.Conn, error) {
store, err := NewStore(name)
if err != nil {
return nil, errors.Trace(err)
}
...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L129
s, err := d.Open(schema)
来打开本地kv:
func (d Driver) Open(schema string) (kv.Storage, error) {
mc.mu.Lock()
defer mc.mu.Unlock()
if store, ok := mc.cache[schema]; ok {
// TODO: check the cache store has the same engine with this Driver.
log.Info("cache store", schema)
return store, nil
}
db, err := d.Driver.Open(schema)
if err != nil {
return nil, errors.Trace(err)
}
log.Info("New store", schema)
s := &dbStore{
txns: make(map[int64]*dbTxn),
keysLocked: make(map[string]int64),
uuid: uuid.NewV4().String(),
path: schema,
db: db,
}
mc.cache[schema] = s
return s, nil
}
由于之前默认的schema(dbPath)是test,所以默认打开了test数据库;
然后继续回到:
sess, err := CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L146
这里开始新建一个session(后来tidb的session是对应一个客户端的连接)
// CreateSession creates a new session environment.
func CreateSession(store kv.Storage) (Session, error) {
s := &session{
values: make(map[fmt.Stringer]interface{}),
store: store,
sid: atomic.AddInt64(&sessionID, 1),
}
domain, err := domap.Get(store)
...
可以看到domap.Get()来获取对应的Domain(domain 上会绑定 information schema 信息,可以获取schema 的信息以及进一步来获取表的基础信息:索引,列等。)
func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
key := store.UUID()
dm.mu.Lock()
defer dm.mu.Unlock()
d = dm.domains[key]
if d != nil {
return
}
d, err = domain.NewDomain(store)
if err != nil {
return nil, errors.Trace(err)
}
dm.domains[key] = d
return
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L57
这里再进一步看一下domain.NewDomain()
// NewDomain creates a new domain.
func NewDomain(store kv.Storage) (d *Domain, err error) {
infoHandle := infoschema.NewHandle(store)
ddl := ddl.NewDDL(store, infoHandle)
d = &Domain{
store: store,
infoHandle: infoHandle,
ddl: ddl,
}
err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)
if err != nil {
return nil, errors.Trace(err)
}
return d, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L71
infoschema.NewHandle新建了一个infoschema.Handle{}(用来在内存获取和设置schema的相关信息)
// Handle handles information schema, including getting and setting.
type Handle struct {
value atomic.Value
store kv.Storage
}
...
回到:
// NewDomain creates a new domain.
func NewDomain(store kv.Storage) (d *Domain, err error) {
infoHandle := infoschema.NewHandle(store)
ddl := ddl.NewDDL(store, infoHandle)
d = &Domain{
store: store,
infoHandle: infoHandle,
ddl: ddl,
}
...
接下来会执行ddl.NewDDL():
// NewDDL create new DDL
func NewDDL(store kv.Storage, infoHandle *infoschema.Handle) DDL {
d := &ddl{
store: store,
infoHandle: infoHandle,
}
return d
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L66
它包含了之前新建的infoHandle,为其扩展了DDL相关的操作。
回到:
...
d = &Domain{
store: store,
infoHandle: infoHandle,
ddl: ddl,
}
err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)
if err != nil {
return nil, errors.Trace(err)
}
...
到这里一个初始的Domain创建完了,但是还没有实际上拿到这个schema真实的信息了吧,所以接下来就要从KV中取出对应的信息了。
这就要看err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)了,其中d.loadInfoSchema是获取Schema信息的方法。
// RunInNewTxn will run the f in a new transaction evnironment.
func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error {
for {
txn, err := store.Begin()
if err != nil {
log.Error(err)
return errors.Trace(err)
}
err = f(txn)
if retryable && IsRetryableError(err) {
log.Warnf("Retry txn %v", txn)
txn.Rollback()
continue
}
if err != nil {
return errors.Trace(err)
}
err = txn.Commit()
if retryable && IsRetryableError(err) {
log.Warnf("Retry txn %v", txn)
txn.Rollback()
continue
}
if err != nil {
return errors.Trace(err)
}
break
}
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/kv/txn.go#L36
其中RunInNewTxn是用来独立执行一个事务的,所以直接进入loadInfoSchema()观察吧:
func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
var schemas []*model.DBInfo
err = util.ScanMetaWithPrefix(txn, meta.SchemaMetaPrefix, func(key []byte, value []byte) bool {
di := &model.DBInfo{}
err := json.Unmarshal(value, di)
if err != nil {
log.Fatal(err)
}
schemas = append(schemas, di)
return true
})
if err != nil {
return errors.Trace(err)
}
do.infoHandle.Set(schemas)
return
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L37
然后调用ScanMetaWithPrefix()来获取库(Schema)的元信息:
// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(txn kv.Transaction, prefix string, filter func([]byte, []byte) bool) error {
iter, err := txn.Seek([]byte(prefix), hasPrefix([]byte(prefix)))
if err != nil {
return err
}
defer iter.Close()
for {
if err != nil {
return err
}
if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) {
if !filter([]byte(iter.Key()), iter.Value()) {
break
}
iter, err = iter.Next(hasPrefix([]byte(prefix)))
} else {
break
}
}
return nil
}
其中txn.Seek()通过数据种类前缀来从KV中获取元数据(所有的索引,行数据以及各种元数据都是以kv形式存储在kv中的,所以需要前缀来区分,大致思想查看,可能和这个版本不完全一样)
func (txn *dbTxn) Seek(k []byte, fnKeyCmp func([]byte) bool) (kv.Iterator, error) {
log.Debugf("seek %s txn:%d", k, txn.tID)
k = kv.EncodeKey(k)
iter, err := txn.UnionStore.Seek(k, txn)
if err != nil {
return nil, err
}
if !iter.Valid() {
return &kv.UnionIter{}, nil
}
if fnKeyCmp != nil {
if fnKeyCmp([]byte(iter.Key())[:1]) {
return &kv.UnionIter{}, nil
}
}
return iter, nil
}
k = kv.EncodeKey(k)获取对应元数据前缀的编码,然后通过txn.UnionStore.Seek从kv中获取对应的快照(us.Snapshot)和内存更新缓存(us.Dirty)的迭代过滤器。
// UnionStore is a implement of Store which contains a buffer for update.
type UnionStore struct {
Dirty *memdb.DB // updates are buffered in memory
Snapshot Snapshot // for read
}
以goleveldb为例:
us.Dirty是个内存缓存,没有提交的事务都会先写入在内存,在事务提交的时候批量写入到磁盘。
us.Snapshot是个只读的快照,从磁盘读取当前固定版本的数据,并缓存到内存。
// Seek implements the Snapshot Seek interface.
func (us *UnionStore) Seek(key []byte, txn Transaction) (Iterator, error) {
snapshotIt := us.Snapshot.NewIterator(key)
dirtyIt := us.Dirty.NewIterator(&util.Range{Start: key})
it := newUnionIter(dirtyIt, snapshotIt)
return it, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/kv/union_store.go#L126
然后回到ScanMetaWithPrefix();
// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(txn kv.Transaction, prefix string, filter func([]byte, []byte) bool) error {
iter, err := txn.Seek([]byte(prefix), hasPrefix([]byte(prefix)))
if err != nil {
return err
}
defer iter.Close()
for {
if err != nil {
return err
}
if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) {
if !filter([]byte(iter.Key()), iter.Value()) {
break
}
iter, err = iter.Next(hasPrefix([]byte(prefix)))
} else {
break
}
}
return nil
}
由于kv的key都是有序存储的(btree,跳表等增删查改的平均时间复杂度都是log(n)的数据结构),所以之前Seek返回的是>=StartKey的第一个数据value,所以接下来只要筛选到不存在这个前缀的数据即可退出循环。然后对取出来的数据json解码,解码实现在之前定义的filter().如果是第一次运行还没有创建test这个Schema了,这里是取不到任何值的。
接下来就回到了
// NewDomain creates a new domain.
func NewDomain(store kv.Storage) (d *Domain, err error) {
infoHandle := infoschema.NewHandle(store)
ddl := ddl.NewDDL(store, infoHandle)
d = &Domain{
store: store,
infoHandle: infoHandle,
ddl: ddl,
}
err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)
if err != nil {
return nil, errors.Trace(err)
}
return d, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L79
然后绑定一些参数到ctx和session后一直回到:
...
sess, err := CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
s := sess.(*session)
defer d.lock()()
DBName := model.NewCIStr(name[strings.LastIndex(name, "/")+1:])
domain := sessionctx.GetDomain(s)
if !domain.InfoSchema().SchemaExists(DBName) {
err = domain.DDL().CreateSchema(s, DBName)
if err != nil {
return nil, errors.Trace(err)
}
}
return newDriverConn(s, driver, DBName.O)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L147
接下来取出Schema名字和之前创建的domain后就需要检查当前Schema是否存在了,这里就以默认的test为例,由于是第一次运行,所以肯定还没有创建Schema了,然后就会进入domain.DDL().CreateSchema(s, DBName):
func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) {
is := d.GetInformationSchema()
_, ok := is.SchemaByName(schema)
if ok {
return ErrExists
}
info := &model.DBInfo{Name: schema}
info.ID, err = meta.GenGlobalID(d.store)
if err != nil {
return errors.Trace(err)
}
err = d.writeSchemaInfo(info)
if err != nil {
return errors.Trace(err)
}
newInfo := append(is.Clone(), info)
d.infoHandle.Set(newInfo)
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L78
这里又通过schema_name查了一次是否存在这个schema,然后进入meta.GenGlobalID(d.store)获取这个schema的全局唯一ID.
具体获取过程:
// GenGlobalID generates the next id in the store scope.
func GenGlobalID(store kv.Storage) (ID int64, err error) {
err = kv.RunInNewTxn(store, true, func(txn kv.Transaction) error {
ID, err = GenID(txn, []byte(nextGlobalIDPrefix), 1)
if err != nil {
return errors.Trace(err)
}
log.Info("Generate global id", ID)
return nil
})
return
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L66
这里独立运行一个事务来获取ID,其中nextGlobalIDPrefix是
var (
nextGlobalIDPrefix = []byte("mNextGlobalID")
)
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L32
这个KEY是固定的存放累加计数。
然后进入GenID(txn, []byte(nextGlobalIDPrefix), 1):
// GenID adds step to the value for key and returns the sum.
func GenID(txn kv.Transaction, key []byte, step int) (int64, error) {
if len(key) == 0 {
return 0, errors.New("Invalid key")
}
err := txn.LockKeys(key)
if err != nil {
return 0, err
}
id, err := txn.Inc(key, int64(step))
if err != nil {
return 0, errors.Trace(err)
}
return id, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L36
txn.LockKeys(key):加锁:从kv的Snapshot(快照)中取出当前key的值缓存到txn.snapshotVals,以后本次txn读取这个key都从这个缓存读取了。具体查看:
然后进入txn.LockKeys():
func (txn *dbTxn) Inc(k []byte, step int64) (int64, error) {
log.Debugf("Inc %s, step %d txn:%d", k, step, txn.tID)
k = kv.EncodeKey(k)
if err := txn.markOrigin(k); err != nil {
return 0, err
}
val, err := txn.UnionStore.Get(k)
if kv.IsErrNotFound(err) {
err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10)))
if err != nil {
return 0, err
}
return step, nil
}
if err != nil {
return 0, err
}
intVal, err := strconv.ParseInt(string(val), 10, 0)
if err != nil {
return intVal, err
}
intVal += step
err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(intVal, 10)))
if err != nil {
return 0, err
}
return intVal, nil
}
txn.markOrigin(k)先确保这个k缓存过了,接下来txn.UnionStore.Get(k)则从快照获取这个值,如果不存在就把当前step值赋给这个k,然后写入到UnionStore:txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10)));(这里写入的是内存,成功commit才会写入磁盘嘛);当然如果这个k已经写入过,那么取出这个值加上step后再写回去就可以了。然后就得到ID啦。
这个时候分配DBInfo的Id完成了(由于单独起了个事务,所以这时已经提交,ID已经写入磁盘全局可见了,注意并不是把schema信息写入),一路往上回,直到:
...
info.ID, err = meta.GenGlobalID(d.store)
if err != nil {
return errors.Trace(err)
}
err = d.writeSchemaInfo(info)
if err != nil {
return errors.Trace(err)
}
newInfo := append(is.Clone(), info)
d.infoHandle.Set(newInfo)
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L86
然后这里的d.writeSchemaInfo(info)就是启动一个独立事务把Schema信息用json编码后写入到磁盘。
func (d *ddl) writeSchemaInfo(info *model.DBInfo) error {
var b []byte
b, err := json.Marshal(info)
if err != nil {
return errors.Trace(err)
}
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
key := []byte(meta.DBMetaKey(info.ID))
if err := txn.LockKeys(key); err != nil {
return errors.Trace(err)
}
return txn.Set(key, b)
})
log.Warn("save schema", string(b))
return errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L576
然后回到:
...
err = d.writeSchemaInfo(info)
if err != nil {
return errors.Trace(err)
}
newInfo := append(is.Clone(), info)
d.infoHandle.Set(newInfo)
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L90
这里把Schemainfo赋值到ddl的infoHandle。
接下来回到:
...
if !domain.InfoSchema().SchemaExists(DBName) {
err = domain.DDL().CreateSchema(s, DBName)
if err != nil {
return nil, errors.Trace(err)
}
}
return newDriverConn(s, driver, DBName.O)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L156
再进入newDriverConn(s, driver, DBName.O):
func newDriverConn(sess *session, d *sqlDriver, schema string) (driver.Conn, error) {
r := &driverConn{
driver: d,
stmts: map[string]driver.Stmt{},
s: sess,
}
_, err := r.s.Execute("use " + schema)
if err != nil {
return nil, errors.Trace(err)
}
return r, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L174
创建driverConn并赋值后,这里默认运行了一个r.s.Execute(“use ” + schema)
func (s *session) Execute(sql string) ([]rset.Recordset, error) {
stmts, err := Compile(sql)
if err != nil {
log.Errorf("Compile sql error: %s - %s", sql, err)
return nil, errors.Trace(err)
}
var rs []rset.Recordset
for _, si := range stmts {
r, err := runStmt(s, si)
if err != nil {
log.Warnf("session:%v, err:%v", s, err)
return nil, errors.Trace(err)
}
if r != nil {
rs = append(rs, r)
}
}
return rs, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120
Compile(sql)是把sql语句变成stmts结构(通过parser包的语法解析,goyacc相关的,这块暂时没有详细看)
然后进入runStmt():
func runStmt(ctx context.Context, s stmt.Statement, args ...interface{}) (rset.Recordset, error) {
var err error
var rs rset.Recordset
// before every execution, we must clear affectedrows.
variable.GetSessionVars(ctx).SetAffectedRows(0)
switch s.(type) {
case *stmts.PreparedStmt:
ps := s.(*stmts.PreparedStmt)
return runPreparedStmt(ctx, ps)
case *stmts.ExecuteStmt:
es := s.(*stmts.ExecuteStmt)
rs, err = runExecute(ctx, es, args...)
if err != nil {
return nil, errors.Trace(err)
}
default:
if s.IsDDL() {
err = ctx.FinishTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
}
stmt.BindExecArgs(ctx, args)
rs, err = s.Exec(ctx)
stmt.ClearExecArgs(ctx)
}
// MySQL DDL should be auto-commit
if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
err = ctx.FinishTxn(false)
}
return rs, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L130
根据stmt的类型运行:
default:
if s.IsDDL() {
err = ctx.FinishTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
}
stmt.BindExecArgs(ctx, args)
rs, err = s.Exec(ctx)
stmt.ClearExecArgs(ctx)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153
进入到:
// Exec implements the stmt.Statement Exec interface.
func (s *UseStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
dbname := model.NewCIStr(s.DBName)
if !sessionctx.GetDomain(ctx).InfoSchema().SchemaExists(dbname) {
return nil, errors.ErrDatabaseNotExist
}
db.BindCurrentSchema(ctx, dbname.O)
return nil, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/use.go#L58
然后获取,验证,最后BindCurrentSchema(),设置下了session的当前Schema:
// BindCurrentSchema saves parameter schema as current schema name value into context
func BindCurrentSchema(ctx context.Context, schema string) {
ctx.SetValue(currentDBKey, schema)
}
然后一路回到:
... return newDriverConn(s, driver, DBName.O) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L161
到此sqlDriver.Open()结束了.
接下来根据sql包,会再执行到Begin():
// Begin starts and returns a new transaction.
func (c *driverConn) Begin() (driver.Tx, error) {
if c.s == nil {
return nil, errors.Errorf("Need init first")
}
if _, err := c.s.Execute(txBeginSQL); err != nil {
return nil, errors.Trace(err)
}
return c, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L221
和之前的use schema一样,会进入到自己的Exec():
// Exec implements the stmt.Statement Exec interface.
func (s *BeginStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
_, err = ctx.GetTxn(true)
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
variable.GetSessionVars(ctx).DisableAutocommit = true
return
}
至此,整个Begin结束了。
小结
1.事务提交(goleveldb为例,其他类似):事务开始时会新建一个UnionStore,包含一个内存写入修改缓存和磁盘读取快照,在事务内的修改等操作先从快照读取数据,然后把修改缓存在内存,最后在事务提交的时候批量提交操作刷新到磁盘,事务回滚则清除内存缓存即可。
2.对于KV引擎来说,每个键值KEY是可以保证有序的,而且增删查改操作都是平均log(n)的时间复杂度,这样就可以用来查找>=某个KEY值的第一个数据,然后就可以把索引按按照键值编码规则写入到KV,value保存数据的行号(且TIDB的key编码规则保证编码前后大小排序不变),这样就可以直接利用对应的startkey和endkey在kv中找到startkey,然后遍历到endkey终止,找出的所有行数据都是按照索引筛选条件的。对点查和范围查询都很友好。
通过具体SQL来分析
在接下来的文章,我会跟随下列这些SQL的执行,一条条分析:
create table t (c varchar(30), r varchar(30));
CREATE INDEX index_name ON t(c);
insert into t values('c1','ret1');
SELECT r FROM t WHERE c = 'c1';
SELECT r FROM t WHERE c >= 'c1' and c <= 'c5';
DROP TABLE t;
今天就先写到这了。。。
支持!