TIDB源码分析-从github第一次提交说起(1)

简介

一直想深入阅读tidb源码,由于本人水平有限,而且现在tidb源码已经超过11万行(申砾在Tech Day上说过他们公司内部也很少有人掌握全部代码),虽然贡献过3次代码,但都不涉及核心部分,所以还是只看了非常小的一部分源码;所以这次决定利用空闲时间从github上能查到的第一次提交开始,跟随提交记录去慢慢理解学习tidb并全部会记录到本博客,希望能坚持下去吧,而且作为一个弱渣,写的有什么错误的地方希望各位大大给我指出。

https://github.com/pingcap/tidb/tree/0d6f270068e8ff2aedc1c314e907771b6a508ebd 这是tidb在github的第一次提交(2015-9-6),据申砾说他们是从15年6月份开始开发的tidb,这应该是第一个能运行的版本;这个版本只是把tidb作为一个golang内置sql的驱动嵌入的,没有实现mysql-server的协议,只是实现了部分mysql的语法,当时也还没有tikv和pd了,只有单机kv引擎(当然就没有分布式计算框架,分布式事务,在线DDL等),所以相对来说代码量还是不大的,读起来相对容易点。

阅读本系列文章前建议先阅读PingCap官方对tidb原理的描述(虽然这些文章是基于后面版本的描述,但还是会有帮助理解这个版本),然后最好把代码运行起来一步一步跟着代码走:

https://zhuanlan.zhihu.com/p/24564238(TiDB 源码初探)

https://pingcap.com/bloglist-zh(三篇文章了解 TiDB 技术内幕 —— 说存储)

https://pingcap.com/blog-tidb-internal-2-zh(三篇文章了解 TiDB 技术内幕 —— 说计算)

分析

先分析下大致的包功能,然后跟着几个sql语句来具体分析:

编译/运行环境:

ubuntu 16.04LTS
golang 1.8.3
Gogland 1.0EAP

入口

首先,启动入口在包/interpreter的main.go.

func main() {
	printer.PrintTiDBInfo()

	flag.Parse()
	log.SetLevelByString(*logLevel)
	// support for signal notify
	runtime.GOMAXPROCS(runtime.NumCPU())

	line = liner.NewLiner()
	defer line.Close()

	line.SetCtrlCAborts(true)
	openHistory()

	mdb, err := sql.Open(tidb.DriverName, *store+"://"+*dbPath)
	if err != nil {
		log.Fatal(errors.ErrorStack(err))
	}

	for {
		l, err := readStatement("tidb> ")
		mayExit(err, l)
		line.AppendHistory(l)

		// if we're in transaction
		if strings.HasPrefix(l, "BEGIN") || strings.HasPrefix(l, "begin") {
			tx, err := mdb.Begin()
			if err != nil {
				log.Error(errors.ErrorStack(err))
				continue
			}
			for {
				txnLine, err := readStatement(">> ")
				mayExit(err, txnLine)
				line.AppendHistory(txnLine)

				if !strings.HasSuffix(txnLine, ";") {
					txnLine += ";"
				}

				if strings.HasPrefix(txnLine, "COMMIT") || strings.HasPrefix(txnLine, "commit") {
					err := tx.Commit()
					if err != nil {
						log.Error(errors.ErrorStack(err))
						tx.Rollback()
					}
					break
				}
				// normal sql statement
				err = executeLine(tx, txnLine)
				if err != nil {
					log.Error(errors.ErrorStack(err))
					tx.Rollback()
					break
				}
			}
		} else {
			tx, err := mdb.Begin()
			if err != nil {
				log.Error(errors.ErrorStack(err))
				continue
			}
			err = executeLine(tx, l)
			if err != nil {
				log.Error(errors.ErrorStack(err))
				tx.Rollback()
			} else {
				tx.Commit()
			}
		}
	}
}

其中

mdb, err := sql.Open(tidb.DriverName, *store+"://"+*dbPath)

driver.go打开了一个tidb的Driver,这个Driver事先已经注册到了sql包

func init() {
	RegisterDriver()
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L512

func RegisterDriver() {
	driverOnce.Do(func() { sql.Register(DriverName, tidbDriver) })
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L107

然后回到main.go:

打开db后来到了这个循环

for {
		l, err := readStatement("tidb> ")
		mayExit(err, l)
		line.AppendHistory(l)
...

它模拟了mysql的命令行输入,每次读取一条’;’结尾的sql语句,然后使用executeLine()来执行这条语句;当为BEGIN开始事务时,则多条sql组合成一个事务,否则一条语句就是一个事务。

接下来就来到了事务的begin():

tx, err := mdb.Begin()

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L195

在go的sql包中首先会调用到sqlDriver.Open()新建了Store的kv引擎(这里是goleveldb)

func (d *sqlDriver) Open(name string) (driver.Conn, error) {
	store, err := NewStore(name)
	if err != nil {
		return nil, errors.Trace(err)
	}
...

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L129

NewStore(name)调用了

s, err := d.Open(schema)

来打开本地kv:

func (d Driver) Open(schema string) (kv.Storage, error) {
	mc.mu.Lock()
	defer mc.mu.Unlock()
	if store, ok := mc.cache[schema]; ok {
		// TODO: check the cache store has the same engine with this Driver.
		log.Info("cache store", schema)
		return store, nil
	}

	db, err := d.Driver.Open(schema)
	if err != nil {
		return nil, errors.Trace(err)
	}

	log.Info("New store", schema)
	s := &dbStore{
		txns:       make(map[int64]*dbTxn),
		keysLocked: make(map[string]int64),
		uuid:       uuid.NewV4().String(),
		path:       schema,
		db:         db,
	}

	mc.cache[schema] = s

	return s, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/store/localstore/kv.go#L66

由于之前默认的schema(dbPath)是test,所以默认打开了test数据库;

然后继续回到:

sess, err := CreateSession(store)
	if err != nil {
		return nil, errors.Trace(err)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L146

这里开始新建一个session(后来tidb的session是对应一个客户端的连接)

进入CreateSession()

// CreateSession creates a new session environment.
func CreateSession(store kv.Storage) (Session, error) {
	s := &session{
		values: make(map[fmt.Stringer]interface{}),
		store:  store,
		sid:    atomic.AddInt64(&sessionID, 1),
	}
	domain, err := domap.Get(store)
...

可以看到domap.Get()来获取对应的Domain(domain 上会绑定 information schema 信息,可以获取schema 的信息以及进一步来获取表的基础信息:索引,列等。)

func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
	key := store.UUID()
	dm.mu.Lock()
	defer dm.mu.Unlock()
	d = dm.domains[key]
	if d != nil {
		return
	}
	d, err = domain.NewDomain(store)
	if err != nil {
		return nil, errors.Trace(err)
	}
	dm.domains[key] = d
	return
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L57

这里再进一步看一下domain.NewDomain()

// NewDomain creates a new domain.
func NewDomain(store kv.Storage) (d *Domain, err error) {
	infoHandle := infoschema.NewHandle(store)
	ddl := ddl.NewDDL(store, infoHandle)
	d = &Domain{
		store:      store,
		infoHandle: infoHandle,
		ddl:        ddl,
	}
	err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return d, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L71

infoschema.NewHandle新建了一个infoschema.Handle{}(用来在内存获取和设置schema的相关信息)

// Handle handles information schema, including getting and setting.
type Handle struct {
	value atomic.Value
	store kv.Storage
}
...

回到:

// NewDomain creates a new domain.
func NewDomain(store kv.Storage) (d *Domain, err error) {
	infoHandle := infoschema.NewHandle(store)
	ddl := ddl.NewDDL(store, infoHandle)
	d = &Domain{
		store:      store,
		infoHandle: infoHandle,
		ddl:        ddl,
	}
...

接下来会执行ddl.NewDDL():

// NewDDL create new DDL
func NewDDL(store kv.Storage, infoHandle *infoschema.Handle) DDL {
	d := &ddl{
		store:      store,
		infoHandle: infoHandle,
	}
	return d
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L66

它包含了之前新建的infoHandle,为其扩展了DDL相关的操作。

回到:

...
        d = &Domain{
		store:      store,
		infoHandle: infoHandle,
		ddl:        ddl,
	}
	err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)
	if err != nil {
		return nil, errors.Trace(err)
	}
...

到这里一个初始的Domain创建完了,但是还没有实际上拿到这个schema真实的信息了吧,所以接下来就要从KV中取出对应的信息了。

这就要看err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)了,其中d.loadInfoSchema是获取Schema信息的方法。

// RunInNewTxn will run the f in a new transaction evnironment.
func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error {
	for {
		txn, err := store.Begin()
		if err != nil {
			log.Error(err)
			return errors.Trace(err)
		}

		err = f(txn)
		if retryable && IsRetryableError(err) {
			log.Warnf("Retry txn %v", txn)
			txn.Rollback()
			continue
		}
		if err != nil {
			return errors.Trace(err)
		}

		err = txn.Commit()
		if retryable && IsRetryableError(err) {
			log.Warnf("Retry txn %v", txn)
			txn.Rollback()
			continue
		}
		if err != nil {
			return errors.Trace(err)
		}
		break
	}

	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/kv/txn.go#L36

其中RunInNewTxn是用来独立执行一个事务的,所以直接进入loadInfoSchema()观察吧:

func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
	var schemas []*model.DBInfo
	err = util.ScanMetaWithPrefix(txn, meta.SchemaMetaPrefix, func(key []byte, value []byte) bool {
		di := &model.DBInfo{}
		err := json.Unmarshal(value, di)
		if err != nil {
			log.Fatal(err)
		}
		schemas = append(schemas, di)
		return true
	})
	if err != nil {
		return errors.Trace(err)
	}
	do.infoHandle.Set(schemas)
	return
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L37

然后调用ScanMetaWithPrefix()来获取库(Schema)的元信息:

// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(txn kv.Transaction, prefix string, filter func([]byte, []byte) bool) error {
	iter, err := txn.Seek([]byte(prefix), hasPrefix([]byte(prefix)))
	if err != nil {
		return err
	}
	defer iter.Close()
	for {
		if err != nil {
			return err
		}

		if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) {
			if !filter([]byte(iter.Key()), iter.Value()) {
				break
			}
			iter, err = iter.Next(hasPrefix([]byte(prefix)))
		} else {
			break
		}
	}

	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/util/prefix_helper.go#L34

其中txn.Seek()通过数据种类前缀来从KV中获取元数据(所有的索引,行数据以及各种元数据都是以kv形式存储在kv中的,所以需要前缀来区分,大致思想查看,可能和这个版本不完全一样)

func (txn *dbTxn) Seek(k []byte, fnKeyCmp func([]byte) bool) (kv.Iterator, error) {
	log.Debugf("seek %s txn:%d", k, txn.tID)
	k = kv.EncodeKey(k)

	iter, err := txn.UnionStore.Seek(k, txn)
	if err != nil {
		return nil, err
	}

	if !iter.Valid() {
		return &kv.UnionIter{}, nil
	}

	if fnKeyCmp != nil {
		if fnKeyCmp([]byte(iter.Key())[:1]) {
			return &kv.UnionIter{}, nil
		}
	}

	return iter, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/store/localstore/txn.go#L137

k = kv.EncodeKey(k)获取对应元数据前缀的编码,然后通过txn.UnionStore.Seek从kv中获取对应的快照(us.Snapshot)和内存更新缓存(us.Dirty)的迭代过滤器。

// UnionStore is a implement of Store which contains a buffer for update.
type UnionStore struct {
	Dirty    *memdb.DB // updates are buffered in memory
	Snapshot Snapshot  // for read
}

以goleveldb为例:

us.Dirty是个内存缓存,没有提交的事务都会先写入在内存,在事务提交的时候批量写入到磁盘。

us.Snapshot是个只读的快照,从磁盘读取当前固定版本的数据,并缓存到内存。

// Seek implements the Snapshot Seek interface.
func (us *UnionStore) Seek(key []byte, txn Transaction) (Iterator, error) {
	snapshotIt := us.Snapshot.NewIterator(key)
	dirtyIt := us.Dirty.NewIterator(&util.Range{Start: key})
	it := newUnionIter(dirtyIt, snapshotIt)
	return it, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/kv/union_store.go#L126

然后回到ScanMetaWithPrefix();

// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(txn kv.Transaction, prefix string, filter func([]byte, []byte) bool) error {
	iter, err := txn.Seek([]byte(prefix), hasPrefix([]byte(prefix)))
	if err != nil {
		return err
	}
	defer iter.Close()
	for {
		if err != nil {
			return err
		}

		if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) {
			if !filter([]byte(iter.Key()), iter.Value()) {
				break
			}
			iter, err = iter.Next(hasPrefix([]byte(prefix)))
		} else {
			break
		}
	}

	return nil
}

由于kv的key都是有序存储的(btree,跳表等增删查改的平均时间复杂度都是log(n)的数据结构),所以之前Seek返回的是>=StartKey的第一个数据value,所以接下来只要筛选到不存在这个前缀的数据即可退出循环。然后对取出来的数据json解码,解码实现在之前定义的filter().如果是第一次运行还没有创建test这个Schema了,这里是取不到任何值的。

接下来就回到了

// NewDomain creates a new domain.
func NewDomain(store kv.Storage) (d *Domain, err error) {
	infoHandle := infoschema.NewHandle(store)
	ddl := ddl.NewDDL(store, infoHandle)
	d = &Domain{
		store:      store,
		infoHandle: infoHandle,
		ddl:        ddl,
	}
	err = kv.RunInNewTxn(d.store, false, d.loadInfoSchema)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return d, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/domain/domain.go#L79

然后绑定一些参数到ctx和session后一直回到:

...
        sess, err := CreateSession(store)
	if err != nil {
		return nil, errors.Trace(err)
	}
	s := sess.(*session)
	defer d.lock()()
	DBName := model.NewCIStr(name[strings.LastIndex(name, "/")+1:])
	domain := sessionctx.GetDomain(s)
	if !domain.InfoSchema().SchemaExists(DBName) {
		err = domain.DDL().CreateSchema(s, DBName)
		if err != nil {
			return nil, errors.Trace(err)
		}
	}

	return newDriverConn(s, driver, DBName.O)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L147

接下来取出Schema名字和之前创建的domain后就需要检查当前Schema是否存在了,这里就以默认的test为例,由于是第一次运行,所以肯定还没有创建Schema了,然后就会进入domain.DDL().CreateSchema(s, DBName):

func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) {
	is := d.GetInformationSchema()
	_, ok := is.SchemaByName(schema)
	if ok {
		return ErrExists
	}
	info := &model.DBInfo{Name: schema}
	info.ID, err = meta.GenGlobalID(d.store)
	if err != nil {
		return errors.Trace(err)
	}
	err = d.writeSchemaInfo(info)
	if err != nil {
		return errors.Trace(err)
	}
	newInfo := append(is.Clone(), info)
	d.infoHandle.Set(newInfo)
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L78

这里又通过schema_name查了一次是否存在这个schema,然后进入meta.GenGlobalID(d.store)获取这个schema的全局唯一ID.

具体获取过程:

// GenGlobalID generates the next id in the store scope.
func GenGlobalID(store kv.Storage) (ID int64, err error) {
	err = kv.RunInNewTxn(store, true, func(txn kv.Transaction) error {
		ID, err = GenID(txn, []byte(nextGlobalIDPrefix), 1)
		if err != nil {
			return errors.Trace(err)
		}

		log.Info("Generate global id", ID)

		return nil
	})

	return
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L66

这里独立运行一个事务来获取ID,其中nextGlobalIDPrefix是

var (
	nextGlobalIDPrefix = []byte("mNextGlobalID")
)

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L32

这个KEY是固定的存放累加计数。

然后进入GenID(txn, []byte(nextGlobalIDPrefix), 1):

// GenID adds step to the value for key and returns the sum.
func GenID(txn kv.Transaction, key []byte, step int) (int64, error) {
	if len(key) == 0 {
		return 0, errors.New("Invalid key")
	}
	err := txn.LockKeys(key)
	if err != nil {
		return 0, err
	}
	id, err := txn.Inc(key, int64(step))
	if err != nil {
		return 0, errors.Trace(err)
	}

	return id, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/meta/meta.go#L36

txn.LockKeys(key):加锁:从kv的Snapshot(快照)中取出当前key的值缓存到txn.snapshotVals,以后本次txn读取这个key都从这个缓存读取了。具体查看:

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/store/localstore/txn.go#L236

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/store/localstore/txn.go#L48

然后进入txn.LockKeys():

func (txn *dbTxn) Inc(k []byte, step int64) (int64, error) {
	log.Debugf("Inc %s, step %d txn:%d", k, step, txn.tID)
	k = kv.EncodeKey(k)

	if err := txn.markOrigin(k); err != nil {
		return 0, err
	}
	val, err := txn.UnionStore.Get(k)
	if kv.IsErrNotFound(err) {
		err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10)))
		if err != nil {
			return 0, err
		}

		return step, nil
	}

	if err != nil {
		return 0, err
	}

	intVal, err := strconv.ParseInt(string(val), 10, 0)
	if err != nil {
		return intVal, err
	}

	intVal += step
	err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(intVal, 10)))
	if err != nil {
		return 0, err
	}

	return intVal, nil
}

txn.markOrigin(k)先确保这个k缓存过了,接下来txn.UnionStore.Get(k)则从快照获取这个值,如果不存在就把当前step值赋给这个k,然后写入到UnionStore:txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10)));(这里写入的是内存,成功commit才会写入磁盘嘛);当然如果这个k已经写入过,那么取出这个值加上step后再写回去就可以了。然后就得到ID啦。

这个时候分配DBInfo的Id完成了(由于单独起了个事务,所以这时已经提交,ID已经写入磁盘全局可见了,注意并不是把schema信息写入),一路往上回,直到:

...
	info.ID, err = meta.GenGlobalID(d.store)
        if err != nil {
		return errors.Trace(err)
	}
	err = d.writeSchemaInfo(info)
	if err != nil {
		return errors.Trace(err)
	}
	newInfo := append(is.Clone(), info)
	d.infoHandle.Set(newInfo)
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L86

然后这里的d.writeSchemaInfo(info)就是启动一个独立事务把Schema信息用json编码后写入到磁盘。

func (d *ddl) writeSchemaInfo(info *model.DBInfo) error {
	var b []byte
	b, err := json.Marshal(info)
	if err != nil {
		return errors.Trace(err)
	}
	err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
		key := []byte(meta.DBMetaKey(info.ID))
		if err := txn.LockKeys(key); err != nil {
			return errors.Trace(err)
		}
		return txn.Set(key, b)
	})
	log.Warn("save schema", string(b))
	return errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L576

然后回到:

...
	err = d.writeSchemaInfo(info)
	if err != nil {
		return errors.Trace(err)
	}
	newInfo := append(is.Clone(), info)
	d.infoHandle.Set(newInfo)
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L90

这里把Schemainfo赋值到ddl的infoHandle。

接下来回到:

...
if !domain.InfoSchema().SchemaExists(DBName) {
		err = domain.DDL().CreateSchema(s, DBName)
		if err != nil {
			return nil, errors.Trace(err)
		}
	}

	return newDriverConn(s, driver, DBName.O)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L156

再进入newDriverConn(s, driver, DBName.O):

func newDriverConn(sess *session, d *sqlDriver, schema string) (driver.Conn, error) {
	r := &driverConn{
		driver: d,
		stmts:  map[string]driver.Stmt{},
		s:      sess,
	}

	_, err := r.s.Execute("use " + schema)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return r, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L174

创建driverConn并赋值后,这里默认运行了一个r.s.Execute(“use ” + schema)

func (s *session) Execute(sql string) ([]rset.Recordset, error) {
	stmts, err := Compile(sql)
	if err != nil {
		log.Errorf("Compile sql error: %s - %s", sql, err)
		return nil, errors.Trace(err)
	}

	var rs []rset.Recordset

	for _, si := range stmts {
		r, err := runStmt(s, si)
		if err != nil {
			log.Warnf("session:%v, err:%v", s, err)
			return nil, errors.Trace(err)
		}

		if r != nil {
			rs = append(rs, r)
		}
	}

	return rs, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120

Compile(sql)是把sql语句变成stmts结构(通过parser包的语法解析,goyacc相关的,这块暂时没有详细看)

然后进入runStmt():

func runStmt(ctx context.Context, s stmt.Statement, args ...interface{}) (rset.Recordset, error) {
	var err error
	var rs rset.Recordset
	// before every execution, we must clear affectedrows.
	variable.GetSessionVars(ctx).SetAffectedRows(0)
	switch s.(type) {
	case *stmts.PreparedStmt:
		ps := s.(*stmts.PreparedStmt)
		return runPreparedStmt(ctx, ps)
	case *stmts.ExecuteStmt:
		es := s.(*stmts.ExecuteStmt)
		rs, err = runExecute(ctx, es, args...)
		if err != nil {
			return nil, errors.Trace(err)
		}
	default:
		if s.IsDDL() {
			err = ctx.FinishTxn(false)
			if err != nil {
				return nil, errors.Trace(err)
			}
		}
		stmt.BindExecArgs(ctx, args)
		rs, err = s.Exec(ctx)
		stmt.ClearExecArgs(ctx)
	}
	// MySQL DDL should be auto-commit
	if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
		err = ctx.FinishTxn(false)
	}
	return rs, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L130

根据stmt的类型运行:

default:
		if s.IsDDL() {
			err = ctx.FinishTxn(false)
			if err != nil {
				return nil, errors.Trace(err)
			}
		}
		stmt.BindExecArgs(ctx, args)
		rs, err = s.Exec(ctx)
		stmt.ClearExecArgs(ctx)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153

进入到:

// Exec implements the stmt.Statement Exec interface.
func (s *UseStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
	dbname := model.NewCIStr(s.DBName)
	if !sessionctx.GetDomain(ctx).InfoSchema().SchemaExists(dbname) {
		return nil, errors.ErrDatabaseNotExist
	}
	db.BindCurrentSchema(ctx, dbname.O)
	return nil, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/use.go#L58

然后获取,验证,最后BindCurrentSchema(),设置下了session的当前Schema:

// BindCurrentSchema saves parameter schema as current schema name value into context
func BindCurrentSchema(ctx context.Context, schema string) {
	ctx.SetValue(currentDBKey, schema)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/sessionctx/db/db.go#L36

然后一路回到:

...
	return newDriverConn(s, driver, DBName.O)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L161

到此sqlDriver.Open()结束了.

接下来根据sql包,会再执行到Begin():

// Begin starts and returns a new transaction.
func (c *driverConn) Begin() (driver.Tx, error) {
	if c.s == nil {
		return nil, errors.Errorf("Need init first")
	}

	if _, err := c.s.Execute(txBeginSQL); err != nil {
		return nil, errors.Trace(err)
	}

	return c, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L221

和之前的use schema一样,会进入到自己的Exec():

// Exec implements the stmt.Statement Exec interface.
func (s *BeginStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
	_, err = ctx.GetTxn(true)
	// With START TRANSACTION, autocommit remains disabled until you end
	// the transaction with COMMIT or ROLLBACK. The autocommit mode then
	// reverts to its previous state.
	variable.GetSessionVars(ctx).DisableAutocommit = true
	return
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/transaction.go#L57

至此,整个Begin结束了。

小结

1.事务提交(goleveldb为例,其他类似):事务开始时会新建一个UnionStore,包含一个内存写入修改缓存和磁盘读取快照,在事务内的修改等操作先从快照读取数据,然后把修改缓存在内存,最后在事务提交的时候批量提交操作刷新到磁盘,事务回滚则清除内存缓存即可。

2.对于KV引擎来说,每个键值KEY是可以保证有序的,而且增删查改操作都是平均log(n)的时间复杂度,这样就可以用来查找>=某个KEY值的第一个数据,然后就可以把索引按按照键值编码规则写入到KV,value保存数据的行号(且TIDB的key编码规则保证编码前后大小排序不变),这样就可以直接利用对应的startkey和endkey在kv中找到startkey,然后遍历到endkey终止,找出的所有行数据都是按照索引筛选条件的。对点查和范围查询都很友好。

通过具体SQL来分析

在接下来的文章,我会跟随下列这些SQL的执行,一条条分析:

create table t (c varchar(30), r varchar(30));
CREATE INDEX index_name ON t(c);
insert into t values('c1','ret1');
SELECT r FROM t WHERE c = 'c1';
SELECT r FROM t WHERE c >= 'c1' and c <= 'c5';
DROP TABLE t;

今天就先写到这了。。。

《TIDB源码分析-从github第一次提交说起(1)》有1个想法

发表评论

电子邮件地址不会被公开。 必填项已用*标注

请输入正确的验证码