上一篇文章写到了tidb事务的提交和kv引擎的存储,这篇文章就开始介绍下SQL语句的具体执行过程。
create table t (c varchar(30), r varchar(30));
首先入口还是在/interpreter/main.go
... err = executeLine(tx, l) if err != nil { log.Error(errors.ErrorStack(err)) tx.Rollback() } else { tx.Commit() } } } }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L200
然后进入到executeLine:
func executeLine(tx *sql.Tx, txnLine string) error { if tidb.IsQuery(txnLine) { rows, err := tx.Query(txnLine) if err != nil { return errors.Trace(err) } defer rows.Close() cols, err := rows.Columns() if err != nil { return errors.Trace(err) } values := make([][]byte, len(cols)) scanArgs := make([]interface{}, len(values)) for i := range values { scanArgs[i] = &values[i] } var datas [][]string for rows.Next() { err := rows.Scan(scanArgs...) if err != nil { return errors.Trace(err) } data := make([]string, len(cols)) for i, value := range values { if value == nil { data[i] = "NULL" } else { data[i] = string(value) } } datas = append(datas, data) } // For `cols` and `datas[i]` always has the same length, // no need to check return validity. result, _ := printer.GetPrintResult(cols, datas) fmt.Printf("%s", result) if err := rows.Err(); err != nil { return errors.Trace(err) } } else { // TODO: rows affected and last insert id _, err := tx.Exec(txnLine) if err != nil { return errors.Trace(err) } } return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L56
... // TODO: rows affected and last insert id _, err := tx.Exec(txnLine) if err != nil { return errors.Trace(err) } } return nil }
由于不是query,接着会运行到这里,看到todo,看来还没有完成ows affected 和 last insert id。
接着,进入_, err := tx.Exec(txnLine),这个先会调用sql包中的tx.Exec(),随后进入到
func (c *driverConn) Exec(query string, args []driver.Value) (driver.Result, error) { return c.driverExec(query, args) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L263-L266
继而来到:
func (c *driverConn) driverExec(query string, args []driver.Value) (driver.Result, error) { if len(args) == 0 { if _, err := c.s.Execute(query); err != nil { return nil, errors.Trace(err) } r := &driverResult{} r.lastInsertID, r.rowsAffected = int64(c.s.LastInsertID()), int64(c.s.AffectedRows()) return r, nil } stmt, err := c.getStmt(query) if err != nil { return nil, errors.Trace(err) } return stmt.Exec(args) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L279-L293
然后进入c.s.Execute(query):
func (s *session) Execute(sql string) ([]rset.Recordset, error) { stmts, err := Compile(sql) if err != nil { log.Errorf("Compile sql error: %s - %s", sql, err) return nil, errors.Trace(err) } var rs []rset.Recordset for _, si := range stmts { r, err := runStmt(s, si) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) } if r != nil { rs = append(rs, r) } } return rs, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120-L142
首先,stmts, err := Compile(sql)是把string类型的字符串sql编译成stmts结构(上一篇也说过,这里暂时不展开了)
然后就r, err := runStmt(s, si)运行每条sql语句了(这里只有一条):
func runStmt(ctx context.Context, s stmt.Statement, args ...interface{}) (rset.Recordset, error) { var err error var rs rset.Recordset // before every execution, we must clear affectedrows. variable.GetSessionVars(ctx).SetAffectedRows(0) switch s.(type) { case *stmts.PreparedStmt: ps := s.(*stmts.PreparedStmt) return runPreparedStmt(ctx, ps) case *stmts.ExecuteStmt: es := s.(*stmts.ExecuteStmt) rs, err = runExecute(ctx, es, args...) if err != nil { return nil, errors.Trace(err) } default: if s.IsDDL() { err = ctx.FinishTxn(false) if err != nil { return nil, errors.Trace(err) } } stmt.BindExecArgs(ctx, args) rs, err = s.Exec(ctx) stmt.ClearExecArgs(ctx) } // MySQL DDL should be auto-commit if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) { err = ctx.FinishTxn(false) } return rs, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L130-L161
这里清除affectedrows后会运行:
default: if s.IsDDL() { err = ctx.FinishTxn(false) if err != nil { return nil, errors.Trace(err) } } stmt.BindExecArgs(ctx, args) rs, err = s.Exec(ctx) stmt.ClearExecArgs(ctx) } // MySQL DDL should be auto-commit if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) { err = ctx.FinishTxn(false) } return rs, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L145-L161
然后是ddl语句,所以会运行ctx.FinishTxn以结束(提交或回滚)正在执行的事务。
到这里,基本上接下来的所有非query类语句都基本一致,所以以后的exec sql就不再重复介绍了,接下来执行rs, err = s.Exec(ctx):
// Exec implements the stmt.Statement Exec interface. func (s *CreateTableStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) { err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, s.Ident.Full(ctx), s.Cols, s.Constraints) if errors2.ErrorEqual(err, ddl.ErrExists) { if s.IfNotExists { return nil, nil } return nil, errors.Errorf("CREATE TABLE: table exists %s", s.Ident) } return nil, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L111-L121
然后获取ddl对象后执行CreateTable():
func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*coldef.ColumnDef, constraints []*coldef.TableConstraint) (err error) { is := d.GetInformationSchema() //获取Schema信息 if !is.SchemaExists(ident.Schema) {//检查Schema是否确实存在 return errors.Trace(qerror.ErrDatabaseNotExist) } if is.TableExists(ident.Schema, ident.Name) {//检查表名是否已经在此Schema存在 return errors.Trace(ErrExists) } if err = checkDuplicateColumn(colDefs); err != nil {//检查新建的表是否有相同的字段 return errors.Trace(err) } cols, newConstraints, err := d.buildColumnsAndConstraints(colDefs, constraints) if err != nil { return errors.Trace(err) } tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints) if err != nil { return errors.Trace(err) } log.Infof("New table: %+v", tbInfo) err = d.updateInfoSchema(ctx, ident.Schema, tbInfo) return errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L303-L327
各种检查完后进入d.buildColumnsAndConstraints(colDefs, constraints):
func (d *ddl) buildColumnAndConstraint(offset int, colDef *coldef.ColumnDef) (*column.Col, []*coldef.TableConstraint, error) { // set charset if len(colDef.Tp.Charset) == 0 { switch colDef.Tp.Tp { case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob: colDef.Tp.Charset, colDef.Tp.Collate = getDefaultCharsetAndCollate()//获取默认字符集和编码 default: colDef.Tp.Charset = charset.CharsetBin colDef.Tp.Collate = charset.CharsetBin } } // convert colDef into col col, cts, err := coldef.ColumnDefToCol(offset, colDef)//把解析出来的colDef转换成具有更多属性的col结构,为以后准备 if err != nil { return nil, nil, errors.Trace(err) } col.ID, err = meta.GenGlobalID(d.store)//获取列ID,和第一篇文章提到的分配schema的ID用的是同一个方法,递增前缀都相同 if err != nil { return nil, nil, errors.Trace(err) } return col, cts, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L225-L246
然后构造完列和约束后返回到:
... col, cts, err := d.buildColumnAndConstraint(i, colDef) if err != nil { return nil, nil, errors.Trace(err) } constraints = append(constraints, cts...) cols = append(cols, col) colMap[strings.ToLower(colDef.Name)] = col } // traverse table Constraints and set col.flag for _, v := range constraints {//这里有约束(如主键索引等)的话需要标记 setColumnFlagWithConstraint(colMap, v) } return cols, constraints, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L210-L223
接着继续回到:
... cols, newConstraints, err := d.buildColumnsAndConstraints(colDefs, constraints) if err != nil { return errors.Trace(err) } tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints) if err != nil { return errors.Trace(err) } log.Infof("New table: %+v", tbInfo) err = d.updateInfoSchema(ctx, ident.Schema, tbInfo) return errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L316-L316
构造完列信息后开始构造表信息变量d.buildTableInfo(ident.Name, cols, newConstraints):
func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constraints []*coldef.TableConstraint) (tbInfo *model.TableInfo, err error) { tbInfo = &model.TableInfo{ Name: tableName, } tbInfo.ID, err = meta.GenGlobalID(d.store)//以同样方法获得一个表的id if err != nil { return nil, errors.Trace(err) } for _, v := range cols { tbInfo.Columns = append(tbInfo.Columns, &v.ColumnInfo) } for _, constr := range constraints {//如果有则检查并添加索引 // 1. check if the column is exists // 2. add index indexColumns := make([]*model.IndexColumn, 0, len(constr.Keys)) for _, key := range constr.Keys { col := column.FindCol(cols, key.ColumnName) if col == nil { return nil, errors.Errorf("No such column: %v", key) } indexColumns = append(indexColumns, &model.IndexColumn{ Name: model.NewCIStr(key.ColumnName), Offset: col.Offset, Length: key.Length, }) } idxInfo := &model.IndexInfo{ Name: model.NewCIStr(constr.ConstrName), Columns: indexColumns, } switch constr.Tp { case coldef.ConstrPrimaryKey: idxInfo.Unique = true idxInfo.Primary = true idxInfo.Name = model.NewCIStr(column.PrimaryKeyName) case coldef.ConstrUniq, coldef.ConstrUniqKey, coldef.ConstrUniqIndex: idxInfo.Unique = true } tbInfo.Indices = append(tbInfo.Indices, idxInfo) } return }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L260-L301
然后又回到:
... tbInfo, err := d.buildTableInfo(ident.Name, cols, newConstraints) if err != nil { return errors.Trace(err) } log.Infof("New table: %+v", tbInfo) err = d.updateInfoSchema(ctx, ident.Schema, tbInfo) return errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L320-L327
然后进入d.updateInfoSchema开始把表信息更新到Schema并写入kv:
func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error { clonedInfo := d.GetInformationSchema().Clone()//克隆一个Schema防止影响到现有的 for _, info := range clonedInfo { if info.Name == schema { var match bool for i := range info.Tables {//检查修改的表是否已经存在,不存在则需要添加 if info.Tables[i].Name == tbInfo.Name { info.Tables[i] = tbInfo match = true } } if !match { info.Tables = append(info.Tables, tbInfo) } err := d.writeSchemaInfo(info) if err != nil { return errors.Trace(err) } } } d.infoHandle.Set(clonedInfo) return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L593-L615
然后运行d.writeSchemaInfo(info)把新的Schema信息写入到kv:
func (d *ddl) writeSchemaInfo(info *model.DBInfo) error { var b []byte b, err := json.Marshal(info) if err != nil { return errors.Trace(err) } err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { key := []byte(meta.DBMetaKey(info.ID)) if err := txn.LockKeys(key); err != nil { return errors.Trace(err) } return txn.Set(key, b) }) log.Warn("save schema", string(b)) return errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L576-L591
写完Schema信息则回到:
err := d.writeSchemaInfo(info) if err != nil { return errors.Trace(err) } } } d.infoHandle.Set(clonedInfo)//把新的DB(Schema)信息同步到这个session中 return nil }
刷完内存则一路回到:
if errors2.ErrorEqual(err, ddl.ErrExists) { if s.IfNotExists { return nil, nil } return nil, errors.Errorf("CREATE TABLE: table exists %s", s.Ident) } return nil, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L114-L121
然后回到结束事务:
rs, err = s.Exec(ctx) stmt.ClearExecArgs(ctx) } // MySQL DDL should be auto-commit if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) { err = ctx.FinishTxn(false) } return rs, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153-L161
接着这条sql执行完,返回执行结果:
r, err := runStmt(s, si) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) } if r != nil { rs = append(rs, r) } } return rs, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L130-L142
继续返回:
if _, err := c.s.Execute(query); err != nil { return nil, errors.Trace(err) } r := &driverResult{} r.lastInsertID, r.rowsAffected = int64(c.s.LastInsertID()), int64(c.s.AffectedRows())//由于返回的是nil,所以这个都是0 return r, nil } stmt, err := c.getStmt(query) if err != nil { return nil, errors.Trace(err) } return stmt.Exec(args) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L281-L293
接着就返回到了main.go:
err = executeLine(tx, l) if err != nil { log.Error(errors.ErrorStack(err)) tx.Rollback() } else { tx.Commit() } } } }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L207-L216
commit后就等待下一个SQL。。。
小结
这个语句还是比较简单的,现在这个版本的schema,table,col的ID都是共用同一个计数器累加的。