TIDB源码分析-从github第一次提交说起(2)

上一篇文章写到了tidb事务的提交和kv引擎的存储,这篇文章就开始介绍下SQL语句的具体执行过程。

create table t (c varchar(30), r varchar(30));

首先入口还是在/interpreter/main.go

...
			err = executeLine(tx, l)
			if err != nil {
				log.Error(errors.ErrorStack(err))
				tx.Rollback()
			} else {
				tx.Commit()
			}
		}
	}
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L200

然后进入到executeLine:

func executeLine(tx *sql.Tx, txnLine string) error {
	if tidb.IsQuery(txnLine) {
		rows, err := tx.Query(txnLine)
		if err != nil {
			return errors.Trace(err)
		}
		defer rows.Close()
		cols, err := rows.Columns()
		if err != nil {
			return errors.Trace(err)
		}

		values := make([][]byte, len(cols))
		scanArgs := make([]interface{}, len(values))
		for i := range values {
			scanArgs[i] = &values[i]
		}

		var datas [][]string
		for rows.Next() {
			err := rows.Scan(scanArgs...)
			if err != nil {
				return errors.Trace(err)
			}

			data := make([]string, len(cols))
			for i, value := range values {
				if value == nil {
					data[i] = "NULL"
				} else {
					data[i] = string(value)
				}
			}

			datas = append(datas, data)
		}

		// For `cols` and `datas[i]` always has the same length,
		// no need to check return validity.
		result, _ := printer.GetPrintResult(cols, datas)
		fmt.Printf("%s", result)

		if err := rows.Err(); err != nil {
			return errors.Trace(err)
		}
	} else {
		// TODO: rows affected and last insert id
		_, err := tx.Exec(txnLine)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L56

...
		// TODO: rows affected and last insert id
		_, err := tx.Exec(txnLine)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

由于不是query,接着会运行到这里,看到todo,看来还没有完成ows affected 和 last insert id。

接着,进入_, err := tx.Exec(txnLine),这个先会调用sql包中的tx.Exec(),随后进入到

func (c *driverConn) Exec(query string, args []driver.Value) (driver.Result, error) {
	return c.driverExec(query, args)

}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L263-L266

继而来到:

func (c *driverConn) driverExec(query string, args []driver.Value) (driver.Result, error) {
	if len(args) == 0 {
		if _, err := c.s.Execute(query); err != nil {
			return nil, errors.Trace(err)
		}
		r := &driverResult{}
		r.lastInsertID, r.rowsAffected = int64(c.s.LastInsertID()), int64(c.s.AffectedRows())
		return r, nil
	}
	stmt, err := c.getStmt(query)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return stmt.Exec(args)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L279-L293

然后进入c.s.Execute(query):

func (s *session) Execute(sql string) ([]rset.Recordset, error) {
	stmts, err := Compile(sql)
	if err != nil {
		log.Errorf("Compile sql error: %s - %s", sql, err)
		return nil, errors.Trace(err)
	}

	var rs []rset.Recordset

	for _, si := range stmts {
		r, err := runStmt(s, si)
		if err != nil {
			log.Warnf("session:%v, err:%v", s, err)
			return nil, errors.Trace(err)
		}

		if r != nil {
			rs = append(rs, r)
		}
	}

	return rs, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120-L142

首先,stmts, err := Compile(sql)是把string类型的字符串sql编译成stmts结构(上一篇也说过,这里暂时不展开了)

然后就r, err := runStmt(s, si)运行每条sql语句了(这里只有一条):

func runStmt(ctx context.Context, s stmt.Statement, args ...interface{}) (rset.Recordset, error) {
	var err error
	var rs rset.Recordset
	// before every execution, we must clear affectedrows.
	variable.GetSessionVars(ctx).SetAffectedRows(0)
	switch s.(type) {
	case *stmts.PreparedStmt:
		ps := s.(*stmts.PreparedStmt)
		return runPreparedStmt(ctx, ps)
	case *stmts.ExecuteStmt:
		es := s.(*stmts.ExecuteStmt)
		rs, err = runExecute(ctx, es, args...)
		if err != nil {
			return nil, errors.Trace(err)
		}
	default:
		if s.IsDDL() {
			err = ctx.FinishTxn(false)
			if err != nil {
				return nil, errors.Trace(err)
			}
		}
		stmt.BindExecArgs(ctx, args)
		rs, err = s.Exec(ctx)
		stmt.ClearExecArgs(ctx)
	}
	// MySQL DDL should be auto-commit
	if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
		err = ctx.FinishTxn(false)
	}
	return rs, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L130-L161

这里清除affectedrows后会运行:

	default:
		if s.IsDDL() {
			err = ctx.FinishTxn(false)
			if err != nil {
				return nil, errors.Trace(err)
			}
		}
		stmt.BindExecArgs(ctx, args)
		rs, err = s.Exec(ctx)
		stmt.ClearExecArgs(ctx)
	}
	// MySQL DDL should be auto-commit
	if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
		err = ctx.FinishTxn(false)
	}
	return rs, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L145-L161

然后是ddl语句,所以会运行ctx.FinishTxn以结束(提交或回滚)正在执行的事务。

到这里,基本上接下来的所有非query类语句都基本一致,所以以后的exec sql就不再重复介绍了,接下来执行rs, err = s.Exec(ctx):

// Exec implements the stmt.Statement Exec interface.
func (s *CreateTableStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
	err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, s.Ident.Full(ctx), s.Cols, s.Constraints)
	if errors2.ErrorEqual(err, ddl.ErrExists) {
		if s.IfNotExists {
			return nil, nil
		}
		return nil, errors.Errorf("CREATE TABLE: table exists %s", s.Ident)
	}
	return nil, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L111-L121

然后获取ddl对象后执行CreateTable():

func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*coldef.ColumnDef, constraints []*coldef.TableConstraint) (err error) {
	is := d.GetInformationSchema() //获取Schema信息
	if !is.SchemaExists(ident.Schema) {//检查Schema是否确实存在
		return errors.Trace(qerror.ErrDatabaseNotExist)
	}
	if is.TableExists(ident.Schema, ident.Name) {//检查表名是否已经在此Schema存在
		return errors.Trace(ErrExists)
	}
	if err = checkDuplicateColumn(colDefs); err != nil {//检查新建的表是否有相同的字段
		return errors.Trace(err)
	}

	cols, newConstraints, err := d.buildColumnsAndConstraints(colDefs, constraints)
	if err != nil {
		return errors.Trace(err)
	}

	tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints)
	if err != nil {
		return errors.Trace(err)
	}
	log.Infof("New table: %+v", tbInfo)
	err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)
	return errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L303-L327

各种检查完后进入d.buildColumnsAndConstraints(colDefs, constraints):

func (d *ddl) buildColumnAndConstraint(offset int, colDef *coldef.ColumnDef) (*column.Col, []*coldef.TableConstraint, error) {
	// set charset
	if len(colDef.Tp.Charset) == 0 {
		switch colDef.Tp.Tp {
		case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
			colDef.Tp.Charset, colDef.Tp.Collate = getDefaultCharsetAndCollate()//获取默认字符集和编码
		default:
			colDef.Tp.Charset = charset.CharsetBin
			colDef.Tp.Collate = charset.CharsetBin
		}
	}
	// convert colDef into col
	col, cts, err := coldef.ColumnDefToCol(offset, colDef)//把解析出来的colDef转换成具有更多属性的col结构,为以后准备
	if err != nil {
		return nil, nil, errors.Trace(err)
	}
	col.ID, err = meta.GenGlobalID(d.store)//获取列ID,和第一篇文章提到的分配schema的ID用的是同一个方法,递增前缀都相同
	if err != nil {
		return nil, nil, errors.Trace(err)
	}
	return col, cts, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L225-L246

然后构造完列和约束后返回到:

...
		col, cts, err := d.buildColumnAndConstraint(i, colDef)
		if err != nil {
			return nil, nil, errors.Trace(err)
		}
		constraints = append(constraints, cts...)
		cols = append(cols, col)
		colMap[strings.ToLower(colDef.Name)] = col
	}
	// traverse table Constraints and set col.flag
	for _, v := range constraints {//这里有约束(如主键索引等)的话需要标记
		setColumnFlagWithConstraint(colMap, v)
	}
	return cols, constraints, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L210-L223

接着继续回到:

...
	cols, newConstraints, err := d.buildColumnsAndConstraints(colDefs, constraints)
	if err != nil {
		return errors.Trace(err)
	}

	tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints)
	if err != nil {
		return errors.Trace(err)
	}
	log.Infof("New table: %+v", tbInfo)
	err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)
	return errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L316-L316

构造完列信息后开始构造表信息变量d.buildTableInfo(ident.Name, cols, newConstraints):

func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constraints []*coldef.TableConstraint) (tbInfo *model.TableInfo, err error) {
	tbInfo = &model.TableInfo{
		Name: tableName,
	}
	tbInfo.ID, err = meta.GenGlobalID(d.store)//以同样方法获得一个表的id
	if err != nil {
		return nil, errors.Trace(err)
	}
	for _, v := range cols {
		tbInfo.Columns = append(tbInfo.Columns, &v.ColumnInfo)
	}
	for _, constr := range constraints {//如果有则检查并添加索引
		// 1. check if the column is exists
		// 2. add index
		indexColumns := make([]*model.IndexColumn, 0, len(constr.Keys))
		for _, key := range constr.Keys {
			col := column.FindCol(cols, key.ColumnName)
			if col == nil {
				return nil, errors.Errorf("No such column: %v", key)
			}
			indexColumns = append(indexColumns, &model.IndexColumn{
				Name:   model.NewCIStr(key.ColumnName),
				Offset: col.Offset,
				Length: key.Length,
			})
		}
		idxInfo := &model.IndexInfo{
			Name:    model.NewCIStr(constr.ConstrName),
			Columns: indexColumns,
		}
		switch constr.Tp {
		case coldef.ConstrPrimaryKey:
			idxInfo.Unique = true
			idxInfo.Primary = true
			idxInfo.Name = model.NewCIStr(column.PrimaryKeyName)
		case coldef.ConstrUniq, coldef.ConstrUniqKey, coldef.ConstrUniqIndex:
			idxInfo.Unique = true
		}
		tbInfo.Indices = append(tbInfo.Indices, idxInfo)
	}
	return
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L260-L301

然后又回到:

...
	tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints)
	if err != nil {
		return errors.Trace(err)
	}
	log.Infof("New table: %+v", tbInfo)
	err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)
	return errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L320-L327

然后进入d.updateInfoSchema开始把表信息更新到Schema并写入kv:

func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error {
	clonedInfo := d.GetInformationSchema().Clone()//克隆一个Schema防止影响到现有的
	for _, info := range clonedInfo {
		if info.Name == schema {
			var match bool
			for i := range info.Tables {//检查修改的表是否已经存在,不存在则需要添加
				if info.Tables[i].Name == tbInfo.Name {
					info.Tables[i] = tbInfo
					match = true
				}
			}
			if !match {
				info.Tables = append(info.Tables, tbInfo)
			}
			err := d.writeSchemaInfo(info)
			if err != nil {
				return errors.Trace(err)
			}
		}
	}
	d.infoHandle.Set(clonedInfo)
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L593-L615

然后运行d.writeSchemaInfo(info)把新的Schema信息写入到kv:

func (d *ddl) writeSchemaInfo(info *model.DBInfo) error {
	var b []byte
	b, err := json.Marshal(info)
	if err != nil {
		return errors.Trace(err)
	}
	err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
		key := []byte(meta.DBMetaKey(info.ID))
		if err := txn.LockKeys(key); err != nil {
			return errors.Trace(err)
		}
		return txn.Set(key, b)
	})
	log.Warn("save schema", string(b))
	return errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L576-L591

写完Schema信息则回到:

			err := d.writeSchemaInfo(info)
			if err != nil {
				return errors.Trace(err)
			}
		}
	}
	d.infoHandle.Set(clonedInfo)//把新的DB(Schema)信息同步到这个session中
	return nil
}

刷完内存则一路回到:

	if errors2.ErrorEqual(err, ddl.ErrExists) {
		if s.IfNotExists {
			return nil, nil
		}
		return nil, errors.Errorf("CREATE TABLE: table exists %s", s.Ident)
	}
	return nil, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L114-L121

然后回到结束事务:

		rs, err = s.Exec(ctx)
		stmt.ClearExecArgs(ctx)
	}
	// MySQL DDL should be auto-commit
	if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
		err = ctx.FinishTxn(false)
	}
	return rs, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153-L161

接着这条sql执行完,返回执行结果:

		r, err := runStmt(s, si)
		if err != nil {
			log.Warnf("session:%v, err:%v", s, err)
			return nil, errors.Trace(err)
		}

		if r != nil {
			rs = append(rs, r)
		}
	}

	return rs, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L130-L142

继续返回:

		if _, err := c.s.Execute(query); err != nil {
			return nil, errors.Trace(err)
		}
		r := &driverResult{}
		r.lastInsertID, r.rowsAffected = int64(c.s.LastInsertID()), int64(c.s.AffectedRows())//由于返回的是nil,所以这个都是0
		return r, nil
	}
	stmt, err := c.getStmt(query)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return stmt.Exec(args)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L281-L293

接着就返回到了main.go:

			err = executeLine(tx, l)
			if err != nil {
				log.Error(errors.ErrorStack(err))
				tx.Rollback()
			} else {
				tx.Commit()
			}
		}
	}
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L207-L216

commit后就等待下一个SQL。。。

小结

这个语句还是比较简单的,现在这个版本的schema,table,col的ID都是共用同一个计数器累加的。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

请输入正确的验证码