上一篇文章写到了tidb事务的提交和kv引擎的存储,这篇文章就开始介绍下SQL语句的具体执行过程。
create table t (c varchar(30), r varchar(30));
首先入口还是在/interpreter/main.go
...
err = executeLine(tx, l)
if err != nil {
log.Error(errors.ErrorStack(err))
tx.Rollback()
} else {
tx.Commit()
}
}
}
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L200
然后进入到executeLine:
func executeLine(tx *sql.Tx, txnLine string) error {
if tidb.IsQuery(txnLine) {
rows, err := tx.Query(txnLine)
if err != nil {
return errors.Trace(err)
}
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
return errors.Trace(err)
}
values := make([][]byte, len(cols))
scanArgs := make([]interface{}, len(values))
for i := range values {
scanArgs[i] = &values[i]
}
var datas [][]string
for rows.Next() {
err := rows.Scan(scanArgs...)
if err != nil {
return errors.Trace(err)
}
data := make([]string, len(cols))
for i, value := range values {
if value == nil {
data[i] = "NULL"
} else {
data[i] = string(value)
}
}
datas = append(datas, data)
}
// For `cols` and `datas[i]` always has the same length,
// no need to check return validity.
result, _ := printer.GetPrintResult(cols, datas)
fmt.Printf("%s", result)
if err := rows.Err(); err != nil {
return errors.Trace(err)
}
} else {
// TODO: rows affected and last insert id
_, err := tx.Exec(txnLine)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L56
...
// TODO: rows affected and last insert id
_, err := tx.Exec(txnLine)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
由于不是query,接着会运行到这里,看到todo,看来还没有完成ows affected 和 last insert id。
接着,进入_, err := tx.Exec(txnLine),这个先会调用sql包中的tx.Exec(),随后进入到
func (c *driverConn) Exec(query string, args []driver.Value) (driver.Result, error) {
return c.driverExec(query, args)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L263-L266
继而来到:
func (c *driverConn) driverExec(query string, args []driver.Value) (driver.Result, error) {
if len(args) == 0 {
if _, err := c.s.Execute(query); err != nil {
return nil, errors.Trace(err)
}
r := &driverResult{}
r.lastInsertID, r.rowsAffected = int64(c.s.LastInsertID()), int64(c.s.AffectedRows())
return r, nil
}
stmt, err := c.getStmt(query)
if err != nil {
return nil, errors.Trace(err)
}
return stmt.Exec(args)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L279-L293
然后进入c.s.Execute(query):
func (s *session) Execute(sql string) ([]rset.Recordset, error) {
stmts, err := Compile(sql)
if err != nil {
log.Errorf("Compile sql error: %s - %s", sql, err)
return nil, errors.Trace(err)
}
var rs []rset.Recordset
for _, si := range stmts {
r, err := runStmt(s, si)
if err != nil {
log.Warnf("session:%v, err:%v", s, err)
return nil, errors.Trace(err)
}
if r != nil {
rs = append(rs, r)
}
}
return rs, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120-L142
首先,stmts, err := Compile(sql)是把string类型的字符串sql编译成stmts结构(上一篇也说过,这里暂时不展开了)
然后就r, err := runStmt(s, si)运行每条sql语句了(这里只有一条):
func runStmt(ctx context.Context, s stmt.Statement, args ...interface{}) (rset.Recordset, error) {
var err error
var rs rset.Recordset
// before every execution, we must clear affectedrows.
variable.GetSessionVars(ctx).SetAffectedRows(0)
switch s.(type) {
case *stmts.PreparedStmt:
ps := s.(*stmts.PreparedStmt)
return runPreparedStmt(ctx, ps)
case *stmts.ExecuteStmt:
es := s.(*stmts.ExecuteStmt)
rs, err = runExecute(ctx, es, args...)
if err != nil {
return nil, errors.Trace(err)
}
default:
if s.IsDDL() {
err = ctx.FinishTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
}
stmt.BindExecArgs(ctx, args)
rs, err = s.Exec(ctx)
stmt.ClearExecArgs(ctx)
}
// MySQL DDL should be auto-commit
if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
err = ctx.FinishTxn(false)
}
return rs, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L130-L161
这里清除affectedrows后会运行:
default:
if s.IsDDL() {
err = ctx.FinishTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
}
stmt.BindExecArgs(ctx, args)
rs, err = s.Exec(ctx)
stmt.ClearExecArgs(ctx)
}
// MySQL DDL should be auto-commit
if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
err = ctx.FinishTxn(false)
}
return rs, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L145-L161
然后是ddl语句,所以会运行ctx.FinishTxn以结束(提交或回滚)正在执行的事务。
到这里,基本上接下来的所有非query类语句都基本一致,所以以后的exec sql就不再重复介绍了,接下来执行rs, err = s.Exec(ctx):
// Exec implements the stmt.Statement Exec interface.
func (s *CreateTableStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, s.Ident.Full(ctx), s.Cols, s.Constraints)
if errors2.ErrorEqual(err, ddl.ErrExists) {
if s.IfNotExists {
return nil, nil
}
return nil, errors.Errorf("CREATE TABLE: table exists %s", s.Ident)
}
return nil, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L111-L121
然后获取ddl对象后执行CreateTable():
func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*coldef.ColumnDef, constraints []*coldef.TableConstraint) (err error) {
is := d.GetInformationSchema() //获取Schema信息
if !is.SchemaExists(ident.Schema) {//检查Schema是否确实存在
return errors.Trace(qerror.ErrDatabaseNotExist)
}
if is.TableExists(ident.Schema, ident.Name) {//检查表名是否已经在此Schema存在
return errors.Trace(ErrExists)
}
if err = checkDuplicateColumn(colDefs); err != nil {//检查新建的表是否有相同的字段
return errors.Trace(err)
}
cols, newConstraints, err := d.buildColumnsAndConstraints(colDefs, constraints)
if err != nil {
return errors.Trace(err)
}
tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints)
if err != nil {
return errors.Trace(err)
}
log.Infof("New table: %+v", tbInfo)
err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)
return errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L303-L327
各种检查完后进入d.buildColumnsAndConstraints(colDefs, constraints):
func (d *ddl) buildColumnAndConstraint(offset int, colDef *coldef.ColumnDef) (*column.Col, []*coldef.TableConstraint, error) {
// set charset
if len(colDef.Tp.Charset) == 0 {
switch colDef.Tp.Tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
colDef.Tp.Charset, colDef.Tp.Collate = getDefaultCharsetAndCollate()//获取默认字符集和编码
default:
colDef.Tp.Charset = charset.CharsetBin
colDef.Tp.Collate = charset.CharsetBin
}
}
// convert colDef into col
col, cts, err := coldef.ColumnDefToCol(offset, colDef)//把解析出来的colDef转换成具有更多属性的col结构,为以后准备
if err != nil {
return nil, nil, errors.Trace(err)
}
col.ID, err = meta.GenGlobalID(d.store)//获取列ID,和第一篇文章提到的分配schema的ID用的是同一个方法,递增前缀都相同
if err != nil {
return nil, nil, errors.Trace(err)
}
return col, cts, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L225-L246
然后构造完列和约束后返回到:
...
col, cts, err := d.buildColumnAndConstraint(i, colDef)
if err != nil {
return nil, nil, errors.Trace(err)
}
constraints = append(constraints, cts...)
cols = append(cols, col)
colMap[strings.ToLower(colDef.Name)] = col
}
// traverse table Constraints and set col.flag
for _, v := range constraints {//这里有约束(如主键索引等)的话需要标记
setColumnFlagWithConstraint(colMap, v)
}
return cols, constraints, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L210-L223
接着继续回到:
...
cols, newConstraints, err := d.buildColumnsAndConstraints(colDefs, constraints)
if err != nil {
return errors.Trace(err)
}
tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints)
if err != nil {
return errors.Trace(err)
}
log.Infof("New table: %+v", tbInfo)
err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)
return errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L316-L316
构造完列信息后开始构造表信息变量d.buildTableInfo(ident.Name, cols, newConstraints):
func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constraints []*coldef.TableConstraint) (tbInfo *model.TableInfo, err error) {
tbInfo = &model.TableInfo{
Name: tableName,
}
tbInfo.ID, err = meta.GenGlobalID(d.store)//以同样方法获得一个表的id
if err != nil {
return nil, errors.Trace(err)
}
for _, v := range cols {
tbInfo.Columns = append(tbInfo.Columns, &v.ColumnInfo)
}
for _, constr := range constraints {//如果有则检查并添加索引
// 1. check if the column is exists
// 2. add index
indexColumns := make([]*model.IndexColumn, 0, len(constr.Keys))
for _, key := range constr.Keys {
col := column.FindCol(cols, key.ColumnName)
if col == nil {
return nil, errors.Errorf("No such column: %v", key)
}
indexColumns = append(indexColumns, &model.IndexColumn{
Name: model.NewCIStr(key.ColumnName),
Offset: col.Offset,
Length: key.Length,
})
}
idxInfo := &model.IndexInfo{
Name: model.NewCIStr(constr.ConstrName),
Columns: indexColumns,
}
switch constr.Tp {
case coldef.ConstrPrimaryKey:
idxInfo.Unique = true
idxInfo.Primary = true
idxInfo.Name = model.NewCIStr(column.PrimaryKeyName)
case coldef.ConstrUniq, coldef.ConstrUniqKey, coldef.ConstrUniqIndex:
idxInfo.Unique = true
}
tbInfo.Indices = append(tbInfo.Indices, idxInfo)
}
return
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L260-L301
然后又回到:
...
tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints)
if err != nil {
return errors.Trace(err)
}
log.Infof("New table: %+v", tbInfo)
err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)
return errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L320-L327
然后进入d.updateInfoSchema开始把表信息更新到Schema并写入kv:
func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error {
clonedInfo := d.GetInformationSchema().Clone()//克隆一个Schema防止影响到现有的
for _, info := range clonedInfo {
if info.Name == schema {
var match bool
for i := range info.Tables {//检查修改的表是否已经存在,不存在则需要添加
if info.Tables[i].Name == tbInfo.Name {
info.Tables[i] = tbInfo
match = true
}
}
if !match {
info.Tables = append(info.Tables, tbInfo)
}
err := d.writeSchemaInfo(info)
if err != nil {
return errors.Trace(err)
}
}
}
d.infoHandle.Set(clonedInfo)
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L593-L615
然后运行d.writeSchemaInfo(info)把新的Schema信息写入到kv:
func (d *ddl) writeSchemaInfo(info *model.DBInfo) error {
var b []byte
b, err := json.Marshal(info)
if err != nil {
return errors.Trace(err)
}
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
key := []byte(meta.DBMetaKey(info.ID))
if err := txn.LockKeys(key); err != nil {
return errors.Trace(err)
}
return txn.Set(key, b)
})
log.Warn("save schema", string(b))
return errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L576-L591
写完Schema信息则回到:
err := d.writeSchemaInfo(info)
if err != nil {
return errors.Trace(err)
}
}
}
d.infoHandle.Set(clonedInfo)//把新的DB(Schema)信息同步到这个session中
return nil
}
刷完内存则一路回到:
if errors2.ErrorEqual(err, ddl.ErrExists) {
if s.IfNotExists {
return nil, nil
}
return nil, errors.Errorf("CREATE TABLE: table exists %s", s.Ident)
}
return nil, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L114-L121
然后回到结束事务:
rs, err = s.Exec(ctx)
stmt.ClearExecArgs(ctx)
}
// MySQL DDL should be auto-commit
if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
err = ctx.FinishTxn(false)
}
return rs, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153-L161
接着这条sql执行完,返回执行结果:
r, err := runStmt(s, si)
if err != nil {
log.Warnf("session:%v, err:%v", s, err)
return nil, errors.Trace(err)
}
if r != nil {
rs = append(rs, r)
}
}
return rs, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L130-L142
继续返回:
if _, err := c.s.Execute(query); err != nil {
return nil, errors.Trace(err)
}
r := &driverResult{}
r.lastInsertID, r.rowsAffected = int64(c.s.LastInsertID()), int64(c.s.AffectedRows())//由于返回的是nil,所以这个都是0
return r, nil
}
stmt, err := c.getStmt(query)
if err != nil {
return nil, errors.Trace(err)
}
return stmt.Exec(args)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L281-L293
接着就返回到了main.go:
err = executeLine(tx, l)
if err != nil {
log.Error(errors.ErrorStack(err))
tx.Rollback()
} else {
tx.Commit()
}
}
}
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L207-L216
commit后就等待下一个SQL。。。
小结
这个语句还是比较简单的,现在这个版本的schema,table,col的ID都是共用同一个计数器累加的。