接着说下一个SQL语句:
SELECT r FROM t WHERE c = ‘c1’;
这是个查询语句,和之前的Exec类型的语句不一定,所以从入口开始说起:
err = executeLine(tx, l) if err != nil { log.Error(errors.ErrorStack(err)) tx.Rollback() } else { tx.Commit() } } } }
接着进入:
func executeLine(tx *sql.Tx, txnLine string) error { if tidb.IsQuery(txnLine) { rows, err := tx.Query(txnLine)
进入到sql包的tx.Query(txnLine)会调用Query():
// Queryer is an optional interface that may be implemented by a Conn. // // If a Conn does not implement Queryer, the sql package's DB.Query will first // prepare a query, execute the statement, and then close the statement. // // Query may return driver.ErrSkip. func (c *driverConn) Query(query string, args []driver.Value) (driver.Rows, error) { return c.driverQuery(query, args) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L295-L303
然后来到:
func (c *driverConn) driverQuery(query string, args []driver.Value) (driver.Rows, error) { if len(args) == 0 { rss, err := c.s.Execute(query)
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L305-L307
接着执行Execute()这里根之前一样:
func (s *session) Execute(sql string) ([]rset.Recordset, error) { stmts, err := Compile(sql) if err != nil { log.Errorf("Compile sql error: %s - %s", sql, err) return nil, errors.Trace(err) } var rs []rset.Recordset for _, si := range stmts { r, err := runStmt(s, si) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) } if r != nil { rs = append(rs, r) } } return rs, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120-L142
然后进入runStmt,随后来到:
// Exec implements the stmt.Statement Exec interface. func (s *SelectStmt) Exec(ctx context.Context) (rs rset.Recordset, err error) { log.Info("SelectStmt trx:") r, err := s.Plan(ctx) if err != nil { return nil, err } return rsets.Recordset{ctx, r}, nil }
制定查询计划
进入s.Plan(ctx)开始指定查询计划(并不会正真去KV取数据):
// Plan implements the plan.Planner interface. // The whole phase for select is // `from -> where -> lock -> group by -> having -> select fields -> distinct -> order by -> limit -> final` func (s *SelectStmt) Plan(ctx context.Context) (plan.Plan, error) { var ( r plan.Plan err error )
首先根据from生成需要返回哪些字段:
if s.From != nil { r, err = s.From.Plan(ctx) if err != nil { return nil, err } } else if s.Fields != nil { // Only evaluate fields values. fr := &rsets.FieldRset{Fields: s.Fields} r, err = fr.Plan(ctx) if err != nil { return nil, err } }
由于这条SQL有from,所以进入r, err = s.From.Plan(ctx):
// Plan gets JoinPlan. func (r *JoinRset) Plan(ctx context.Context) (plan.Plan, error) { r.tableNames = make(map[string]bool) p := &plans.JoinPlan{} if err := r.buildJoinPlan(ctx, p, r); err != nil { return nil, errors.Trace(err) } return p, nil }
进入r.buildJoinPlan(ctx, p, r):
func (r *JoinRset) buildJoinPlan(ctx context.Context, p *plans.JoinPlan, s *JoinRset) error { left, leftFields, err := r.buildPlan(ctx, s.Left) if err != nil { return errors.Trace(err) }
这里由于是JoinPlan,所以有Left和Right表达式,并且会递归遍历这个语法树:
接着进入left, leftFields, err := r.buildPlan(ctx, s.Left):
func (r *JoinRset) buildPlan(ctx context.Context, node interface{}) (plan.Plan, []*field.ResultField, error) { switch t := node.(type) { case *JoinRset: return &plans.JoinPlan{}, nil, nil case *TableSource: return r.buildSourcePlan(ctx, t) case nil: return nil, nil, nil default: return nil, nil, errors.Errorf("invalid join node %T", t) } }
现在node类型为TableSource,所以继续进入r.buildSourcePlan(ctx, t):
func (r *JoinRset) buildSourcePlan(ctx context.Context, t *TableSource) (plan.Plan, []*field.ResultField, error) { var ( src interface{} tr *TableRset err error ) switch s := t.Source.(type) { case table.Ident: fullIdent := s.Full(ctx)//取出table.Ident对应的Schema tr, err = newTableRset(fullIdent.Schema.O, fullIdent.Name.O)//新建TableRset对象 if err != nil { return nil, nil, errors.Trace(err) } src = tr if t.Name == "" {//在r中标记出现过的表名 qualifiedTableName := tr.Schema + "." + tr.Name if r.tableNames[qualifiedTableName] { return nil, nil, errors.Errorf("%s: duplicate name %s", r.String(), s) } r.tableNames[qualifiedTableName] = true } case stmt.Statement: src = s default: return nil, nil, errors.Errorf("invalid table source %T", t.Source) }
接着往下:
var p plan.Plan switch x := src.(type) { case plan.Planner: if p, err = x.Plan(ctx); err != nil { return nil, nil, errors.Trace(err) } case plan.Plan: p = x default: return nil, nil, errors.Errorf("invalid table source %T, no Plan interface", t.Source) }
会进入到x.Plan(ctx),这里返回了一个plans.TableDefaultPlan对象赋值给p,并填充上了Schema,Table和cols的完整信息。
继续往下:
var fields []*field.ResultField dupNames := make(map[string]struct{}, len(p.GetFields())) for _, nf := range p.GetFields() {//检查是否有重复字段名 f := nf.Clone() if t.Name != "" { f.TableName = t.Name } // duplicate column name in one table is not allowed. name := strings.ToLower(f.Name) if _, ok := dupNames[name]; ok { return nil, nil, errors.Errorf("Duplicate column name '%s'", name) } dupNames[name] = struct{}{} fields = append(fields, f) } return p, fields, nil }
然后一直回到:
left, leftFields, err := r.buildPlan(ctx, s.Left) if err != nil { return errors.Trace(err) } right, rightFields, err := r.buildPlan(ctx, s.Right) if err != nil { return errors.Trace(err) }
由于这个SQL没有join所以right, rightFields, err := r.buildPlan(ctx, s.Right)返回nil,nil,nil。
接着往下
p.Left = left p.Right = right p.On = s.On p.Type = s.Type.String() // if the left is not JoinPlan, the left is the leaf node, // we can stop recursion. if pl, ok := left.(*plans.JoinPlan); ok { if err := r.buildJoinPlan(ctx, pl, s.Left.(*JoinRset)); err != nil { return errors.Trace(err) } // use left JoinPlan fields. p.Fields = append(p.Fields, pl.GetFields()...) } else { // use left fields directly. p.Fields = append(p.Fields, leftFields...) } // if the right is not JoinPlan, the right is the leaf node, // we can stop recursion. if pr, ok := right.(*plans.JoinPlan); ok { if err := r.buildJoinPlan(ctx, pr, s.Right.(*JoinRset)); err != nil { return errors.Trace(err) } // use right JoinPlan fields. p.Fields = append(p.Fields, pr.GetFields()...) } else { // use right fields directly. p.Fields = append(p.Fields, rightFields...) } return nil }
这里一直递归这棵树,直到叶子结点,并把所有Fields加入到p.Fields。
然后一直回到:
r, err = s.From.Plan(ctx) if err != nil { return nil, err } } else if s.Fields != nil { // Only evaluate fields values. fr := &rsets.FieldRset{Fields: s.Fields} r, err = fr.Plan(ctx) if err != nil { return nil, err } }
这里把s.From.Plan(ctx)生成的Plan复制给了前面定义的r。
接着往下:
if w := s.Where; w != nil { r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx) if err != nil { return nil, err } }
其中r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx),这里把上面得到的JoinPlan(r)作为rsets.WhereRset的src传入,然后再调用Plan()把返回结果又覆盖掉r。
然后进入Plan:
// Plan gets NullPlan/FilterDefaultPlan. func (r *WhereRset) Plan(ctx context.Context) (plan.Plan, error) { expr, err := r.Expr.Clone() if err != nil { return nil, err } if expr.IsStatic() { // IsStaic means we have a const value for where condition, and we don't need any index. return r.planStatic(ctx, expr) } switch x := expr.(type) { case *expressions.BinaryOperation: return r.planBinOp(ctx, x) case *expressions.Ident: return r.planIdent(ctx, x) case *expressions.IsNull: return r.planIsNull(ctx, x) case *expressions.PatternIn: // TODO: optimize // TODO: show plan case *expressions.PatternLike: // TODO: optimize case *expressions.PatternRegexp: // TODO: optimize case *expressions.UnaryOperation: return r.planUnaryOp(ctx, x) default: log.Warnf("%v not supported in where rset now", r.Expr) } return &plans.FilterDefaultPlan{r.Src, expr}, nil }
可以看到,这里还有些表达式(比如like)不支持,这类表达式会执行全表扫描。
接着进入到r.planBinOp(ctx, x):
func (r *WhereRset) planBinOp(ctx context.Context, x *expressions.BinaryOperation) (plan.Plan, error) { var err error var p2 plan.Plan var filtered bool p := r.Src//就是之前创建的joinPlan switch x.Op { case opcode.EQ, opcode.GE, opcode.GT, opcode.LE, opcode.LT, opcode.NE: if p2, filtered, err = p.Filter(ctx, x); err != nil { return nil, err } if filtered { return p2, nil } case opcode.AndAnd: var in []expression.Expression var f func(expression.Expression) f = func(e expression.Expression) { b, ok := e.(*expressions.BinaryOperation) if !ok || b.Op != opcode.AndAnd { in = append(in, e) return } f(b.L) f(b.R) } f(x) out := []expression.Expression{} p := r.Src isNewPlan := false for _, e := range in { p2, filtered, err = p.Filter(ctx, e) if err != nil { return nil, err } if !filtered { out = append(out, e) continue } p = p2 isNewPlan = true } if !isNewPlan { break } if len(out) == 0 { return p, nil } for len(out) > 1 { n := len(out) e := expressions.NewBinaryOperation(opcode.AndAnd, out[n-2], out[n-1]) out = out[:n-1] out[n-2] = e } return &plans.FilterDefaultPlan{p, out[0]}, nil default: // TODO: better plan for `OR`. log.Warn("TODO: better plan for", x.Op) } return &plans.FilterDefaultPlan{p, x}, nil }
由于条件是EQ类型的,所以进入p.Filter(ctx, x):
func (r *JoinPlan) Filter(ctx context.Context, expr expression.Expression) (plan.Plan, bool, error) { // TODO: do more optimization for join plan // now we only use where expression for Filter, but for join // we must use On expression too. p, filtered, err := r.filterNode(ctx, expr, r.Left) if err != nil { return nil, false, err } if filtered { r.Left = p return r, true, nil } p, filtered, err = r.filterNode(ctx, expr, r.Right) if err != nil { return nil, false, err } if filtered { r.Right = p return r, true, nil } return r, false, nil }
继续进入p, filtered, err := r.filterNode(ctx, expr, r.Left):
func (r *JoinPlan) filterNode(ctx context.Context, expr expression.Expression, node plan.Plan) (plan.Plan, bool, error) { if node == nil { return r, false, nil } e2, err := expr.Clone() if err != nil { return nil, false, err } return node.Filter(ctx, e2) }
继续进入node.Filter(ctx, e2):
// Filter implements plan.Plan Filter interface. func (r *TableDefaultPlan) Filter(ctx context.Context, expr expression.Expression) (plan.Plan, bool, error) { return r.filter(ctx, expr, true) }
继续:
func (r *TableDefaultPlan) filter(ctx context.Context, expr expression.Expression, checkColumns bool) (plan.Plan, bool, error) { if checkColumns {//保证SQL中的所有条件字段都合法 colNames := expressions.MentionedColumns(expr) // make sure all mentioned column names are in Fields // if not, e.g. the expr has two table like t1.c1 = t2.c2, we can't use filter if !field.ContainAllFieldNames(colNames, r.Fields, field.DefaultFieldFlag) { return r, false, nil } } switch x := expr.(type) { case *expressions.BinaryOperation: return r.filterBinOp(ctx, x) case *expressions.Ident: return r.filterIdent(ctx, x, true) case *expressions.IsNull: return r.filterIsNull(ctx, x) case *expressions.UnaryOperation: if x.Op != '!' { break } if operand, ok := x.V.(*expressions.Ident); ok { return r.filterIdent(ctx, operand, false) } } return r, false, nil }
根据类型进入r.filterBinOp(ctx, x):
func (r *TableDefaultPlan) filterBinOp(ctx context.Context, x *expressions.BinaryOperation) (plan.Plan, bool, error) { ok, cn, rval, err := x.IsIdentRelOpVal()//检查并拆分出当前where条件 if err != nil { return r, false, err } if !ok { return r, false, nil }
然后往下:
t := r.T c := column.FindCol(t.Cols(), cn) if c == nil { return nil, false, errors.Errorf("No such column: %s", cn) }
column.FindCol(t.Cols(), cn):在表中根据cn(where条件的列名)查询出对应col信息。
继续往下:
ix := t.FindIndexByColName(cn) if ix == nil { // Column cn has no index. return r, false, nil }
这里根据cn找出表中的索引字段。
继续往下:
if rval, err = c.CastValue(ctx, rval); err != nil {//根据列的类型转换值 return nil, false, err } if rval == nil {//过滤掉某些操作的nil值 // if nil, any <, <=, >, >=, =, != operator will do nothing // any value compared null returns null // TODO: if we support <=> later, we must handle null return &NullPlan{r.GetFields()}, true, nil } return &indexPlan{ src: t, colName: cn, idxName: ix.Name.O, idx: ix.X, spans: toSpans(x.Op, rval), }, true, nil }
接着会构造一个indexPlan的索引计划,然后spans是记录了不重叠的扫描范围(获取数据时会执行表扫描,这里定义了表扫描的各个startkey和endkey)
进入toSpans(x.Op, rval):
// generate a slice of span from operator and value. func toSpans(op opcode.Op, val interface{}) []*indexSpan { var spans []*indexSpan switch op { case opcode.EQ: spans = append(spans, &indexSpan{ lowVal: val, lowExclude: false, highVal: val, highExclude: false, }) case opcode.LT: spans = append(spans, &indexSpan{ lowVal: minNotNullVal, highVal: val, highExclude: true, }) case opcode.LE: spans = append(spans, &indexSpan{ lowVal: minNotNullVal, highVal: val, highExclude: false, }) case opcode.GE: spans = append(spans, &indexSpan{ lowVal: val, lowExclude: false, highVal: maxVal, }) case opcode.GT: spans = append(spans, &indexSpan{ lowVal: val, lowExclude: true, highVal: maxVal, }) case opcode.NE: spans = append(spans, &indexSpan{ lowVal: minNotNullVal, highVal: val, highExclude: true, }) spans = append(spans, &indexSpan{ lowVal: val, lowExclude: true, highVal: maxVal, }) default: panic("should never happen") } return spans }
这里定义了不同操作的各种范围,我们这里是EQ,所以lowVal(startkey)和highVal(endkey)都是val(其实就是点查询)。
然后一直回到:
p, filtered, err := r.filterNode(ctx, expr, r.Left) if err != nil { return nil, false, err } if filtered { r.Left = p return r, true, nil } p, filtered, err = r.filterNode(ctx, expr, r.Right) if err != nil { return nil, false, err } if filtered { r.Right = p return r, true, nil } return r, false, nil }
这里由于没有right了,所以直接回到了:
r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx) if err != nil { return nil, err } }
接着往下:
lock := s.Lock if variable.IsAutocommit(ctx) {//是否启用了Autocommit // Locking of rows for update using SELECT FOR UPDATE only applies when autocommit // is disabled (either by beginning transaction with START TRANSACTION or by setting // autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked. // See: https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-reads.html lock = coldef.SelectLockNone } r, err = (&rsets.SelectLockRset{Src: r, Lock: lock}).Plan(ctx) if err != nil { return nil, err }
然后把之前得到的计划r继续作为SelectLockRset的src传入,然后这个Plan什么也没做:
// Plan gets SelectLockPlan. func (r *SelectLockRset) Plan(ctx context.Context) (plan.Plan, error) { return &plans.SelectLockPlan{Src: r.Src, Lock: r.Lock}, nil }
然后继续往下:
// Get select list for futher field values evaluation. selectList, err := plans.ResolveSelectList(s.Fields, r.GetFields()) if err != nil { return nil, errors.Trace(err) }
plans.ResolveSelectList(s.Fields, r.GetFields())用来获取字段和结果字段,并检查合法性和处理通配符。
接着:
var groupBy []expression.Expression if s.GroupBy != nil { groupBy = s.GroupBy.By } if s.Having != nil { // `having` may contain aggregate functions, and we will add this to hidden fields. if err = s.Having.CheckAndUpdateSelectList(selectList, groupBy, r.GetFields()); err != nil { return nil, errors.Trace(err) } } if s.OrderBy != nil { // `order by` may contain aggregate functions, and we will add this to hidden fields. if err = s.OrderBy.CheckAndUpdateSelectList(selectList, r.GetFields()); err != nil { return nil, errors.Trace(err) } }
这里检查groupby和having以及OrderBy ,这里我这条SQL都没有,所以先不展开了。
然后继续:
switch { case !rsets.HasAggFields(selectList.Fields) && s.GroupBy == nil: // If no group by and no aggregate functions, we will use SelectFieldsPlan. if r, err = (&rsets.SelectFieldsRset{Src: r, SelectList: selectList}).Plan(ctx); err != nil { return nil, err } default: if r, err = (&rsets.GroupByRset{By: groupBy, Src: r, SelectList: selectList}).Plan(ctx); err != nil { return nil, err } }
这里再套上一层SelectFieldsRset后会进入(&rsets.SelectFieldsRset{Src: r, SelectList: selectList}).Plan(ctx):
// Plan gets SrcPlan/SelectFieldsDefaultPlan. // If all fields are equal to src plan fields, then gets SrcPlan. // Default gets SelectFieldsDefaultPlan. func (r *SelectFieldsRset) Plan(ctx context.Context) (plan.Plan, error) { fields := r.SelectList.Fields srcFields := r.Src.GetFields() if len(fields) == len(srcFields) { match := true for i, v := range fields { // TODO: is it this check enough? e.g, the ident field is t.c. if x, ok := v.Expr.(*expressions.Ident); ok && strings.EqualFold(x.L, srcFields[i].Name) && strings.EqualFold(v.Name, srcFields[i].Name) { continue } match = false break } if match { return r.Src, nil } } src := r.Src if x, ok := src.(*plans.TableDefaultPlan); ok {//检查是否可以删除对表的plan // check whether src plan will be set TableNilPlan, like `select 1, 2 from t`. isConst := true for _, v := range fields { if expressions.FastEval(v.Expr) == nil { isConst = false break } } if isConst { src = &plans.TableNilPlan{x.T} } } p := &plans.SelectFieldsDefaultPlan{Src: src, SelectList: r.SelectList} return p, nil }
这里根据需要返回字段fields是否等于srcFields来决定使用SrcPlan(上一个计划)或者SelectFieldsDefaultPlan。
然后回到:
SelectList: selectList}).Plan(ctx); err != nil { return nil, err } default: if r, err = (&rsets.GroupByRset{By: groupBy, Src: r, SelectList: selectList}).Plan(ctx); err != nil { return nil, err } } if s := s.Having; s != nil { if r, err = (&rsets.HavingRset{ Src: r, Expr: s.Expr}).Plan(ctx); err != nil { return nil, err } } if s.Distinct { if r, err = (&rsets.DistinctRset{Src: r, SelectList: selectList}).Plan(ctx); err != nil { return nil, err } } if s := s.OrderBy; s != nil { if r, err = (&rsets.OrderByRset{By: s.By, Src: r, SelectList: selectList}).Plan(ctx); err != nil { return nil, err } } if s := s.Offset; s != nil { if r, err = (&rsets.OffsetRset{s.Count, r}).Plan(ctx); err != nil { return nil, err } } if s := s.Limit; s != nil { if r, err = (&rsets.LimitRset{s.Count, r}).Plan(ctx); err != nil { return nil, err } }
这些在本SQL都没用到,都先不展开了。
然后:
if r, err = (&rsets.SelectFinalRset{Src: r, SelectList: selectList}).Plan(ctx); err != nil { return nil, err } return r, nil }
在这里给人再套上一层SelectFinalRset,这个Plan什么也没做。
然后计划制定完成(基本上就是把缺的东西填上),然后回到:
r, err := s.Plan(ctx) if err != nil { return nil, err } return rsets.Recordset{ctx, r}, nil }
这里给制定的r放进rsets.Recordset,然后继续返回:
rs, err = s.Exec(ctx) stmt.ClearExecArgs(ctx) } // MySQL DDL should be auto-commit if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) { err = ctx.FinishTxn(false) } return rs, errors.Trace(err) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153-L161
进一步返回:
r, err := runStmt(s, si) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) } if r != nil { rs = append(rs, r) } } return rs, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L130-L142
这里把每条SQL语句结果r添加到rs中,然后继续返回:
rss, err := c.s.Execute(query) if err != nil { return nil, errors.Trace(err) } if len(rss) == 0 { return nil, errors.Trace(errNoResult) } return newdriverRows(rss[0]), nil } stmt, err := c.getStmt(query) if err != nil { return nil, errors.Trace(err) } return stmt.Query(args) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L307-L321
至此,并没用正真的从kv中取出任何数据,只是做了一些计划,以及给定了需要返回的字段,并返回了一个似乎是用来获取数据的对象,真正的获取数据是从newdriverRows(rss[0])这里开始的:
func newdriverRows(rs rset.Recordset) *driverRows { r := &driverRows{ rs: rs, done: make(chan int), rows: make(chan interface{}, 500), } go func() { err := io.EOF if e := r.rs.Do(func(data []interface{}) (bool, error) { vv, err := types.Clone(data) if err != nil { return false, errors.Trace(err) } select { case r.rows <- vv: return true, nil case <-r.done: return false, nil } }); e != nil { err = e } select { case r.rows <- err: case <-r.done: } }() return r }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L347-L376
这里面通过启动一个goroutine来异步获取数据,并通过channel来发送数据,然后在这里通过调用Next等待数据:
for rows.Next() { err := rows.Scan(scanArgs...) if err != nil { return errors.Trace(err) } data := make([]string, len(cols)) for i, value := range values { if value == nil { data[i] = "NULL" } else { data[i] = string(value) } } datas = append(datas, data) } // For `cols` and `datas[i]` always has the same length, // no need to check return validity. result, _ := printer.GetPrintResult(cols, datas) fmt.Printf("%s", result) if err := rows.Err(); err != nil { return errors.Trace(err) } } else { // TODO: rows affected and last insert id _, err := tx.Exec(txnLine) if err != nil { return errors.Trace(err) } } return nil }
具体Next等待在这里:
func (r *driverRows) Next(dest []driver.Value) error { select { case rx := <-r.rows: switch x := rx.(type) { case error: return x case []interface{}: if g, e := len(x), len(dest); g != e { return errors.Errorf("field count mismatch: got %d, need %d", g, e) } for i, xi := range x { switch v := xi.(type) { case nil, int64, float32, float64, bool, []byte, string: dest[i] = v case int8: dest[i] = int64(v) case int16: dest[i] = int64(v) case int32: dest[i] = int64(v) case int: dest[i] = int64(v) case uint8: dest[i] = uint64(v) case uint16: dest[i] = uint64(v) case uint32: dest[i] = uint64(v) case uint64: dest[i] = uint64(v) case uint: dest[i] = uint64(v) case mysql.Duration: dest[i] = v.String() case mysql.Time: dest[i] = v.String() case mysql.Decimal: dest[i] = v.String() default: return errors.Errorf("unable to handle type %T", xi) } } return nil default: return errors.Errorf("unable to handle type %T", rx) } case <-r.done: return io.EOF } }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L403-L453
从KV取数据
然后回到前面的goroutine,分析一下具体取数据的流程:
r := &driverRows{ rs: rs, done: make(chan int), rows: make(chan interface{}, 500), } go func() { err := io.EOF if e := r.rs.Do(func(data []interface{}) (bool, error) { vv, err := types.Clone(data) if err != nil { return false, errors.Trace(err) } select { case r.rows <- vv: return true, nil case <-r.done: return false, nil } }); e != nil { err = e } select { case r.rows <- err: case <-r.done: } }()
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L348-L374
首先,这里的r.rs就是之前指定的计划。
然后看到r.rs.Do传递了一个函数作为参数,进入DO看看:
// Do implements rset.Recordset. func (r Recordset) Do(f func(data []interface{}) (bool, error)) error { return r.Plan.Do(r.Ctx, func(ID interface{}, data []interface{}) (bool, error) { return f(data) }) }
这是最后嵌套的一层对象,这里有把之前传入的函数放入里面那层DO了:
// Do implements the plan.Plan Do interface, and sets result info field. func (r *SelectFinalPlan) Do(ctx context.Context, f plan.RowIterFunc) error { // Reset infered. For prepared statements, this plan may run many times. r.infered = false return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) { // we should not output hidden fields to client out := in[0:r.HiddenFieldOffset] for i, o := range out { switch v := o.(type) { case bool: // Convert bool field to int if v { out[i] = uint8(1) } else { out[i] = uint8(0) } } } if !r.infered { setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], out) r.infered = true } return f(rid, out) }) }
这个SelectFinalPlan就是之前计划的最后套的那层,这里面每一层都会传入一个函数,相当于入栈的调用,所以直接进入最底层后分析:
// Do implements plan.Plan Do interface, it executes join method // accourding to given type. func (r *JoinPlan) Do(ctx context.Context, f plan.RowIterFunc) error { if r.Right == nil { return r.Left.Do(ctx, f) } switch r.Type { case LeftJoin: return r.doLeftJoin(ctx, f) case RightJoin: return r.doRightJoin(ctx, f) case FullJoin: return r.doFullJoin(ctx, f) default: return r.doCrossJoin(ctx, f) } }
然后进入r.Left.Do(ctx, f):
// Do implements plan.Plan Do interface. // It scans a span from the lower bound to upper bound. func (r *indexPlan) Do(ctx context.Context, f plan.RowIterFunc) error { txn, err := ctx.GetTxn(false) if err != nil { return err } for _, span := range r.spans { err := r.doSpan(ctx, txn, span, f) if err != nil { return err } } return nil }
获取当前session的事务txn后根据之前的spans(扫描范围)来开始扫描,进入doSpan():
func (r *indexPlan) doSpan(ctx context.Context, txn kv.Transaction, span *indexSpan, f plan.RowIterFunc) error { seekVal := span.lowVal if span.lowVal == minNotNullVal { seekVal = []byte{} } it, _, err := r.idx.Seek(txn, []interface{}{seekVal}) if err != nil { return types.EOFAsNil(err) } defer it.Close()
首先seekVal := span.lowVal获得开始扫描的起始位置,然后r.idx.Seek(txn, []interface{}{seekVal})从KV查找>=seekVal 的迭代器it。
接着:
var skipLowCompare bool for { k, h, err := it.Next() if err != nil { return types.EOFAsNil(err) } val := k[0] if !skipLowCompare { if span.lowExclude && indexCompare(span.lowVal, val) == 0 {//不包含lowVal,现在的val==lowVal,则忽略改行数据 continue } skipLowCompare = true } cmp := indexCompare(span.highVal, val) if cmp < 0 || (cmp == 0 && span.highExclude) {//当前行已经超出highVal,则直接退出 return nil } data, err := r.src.Row(ctx, h) if err != nil { return err } if more, err := f(h, data); err != nil || !more { return err } } }
这里通过迭代器it.Next(),每次取出一行数据的行号h,以及未经过编码的key值k(就是索引字段的值),然后扫描到当前行的key >=(是否包含取决于span.highExclude)span.highVal就完成扫描,且已经包含全部数据,如何保证?前面已经说过KV库对Key是递增存储的,所以startkey可以log(n)时间获取到,且第一个>=span.highVal的行就说明已经结束扫描。
接下来r.src.Row(ctx, h)用行号获取这一行的数据,并且直接查询需要返回的列的数据即可(因为一行数据按照每一列一个KV对存储的,所以可以很方便的过滤列数据,但是现在没有这样做,把所有列都查出来后在后面过滤)。
接着把行号和当前行数据传递给之前传入进来的函数f(h, data):
// Do implements plan.Plan Do interface, acquiring locks. func (r *SelectLockPlan) Do(ctx context.Context, f plan.RowIterFunc) error { return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) { var rowKeys *RowKeyList if in != nil && len(in) > 0 { t := in[len(in)-1] switch vt := t.(type) { case *RowKeyList: rowKeys = vt // Remove row key list from data tail in = in[:len(in)-1] } } if rowKeys != nil && r.Lock == coldef.SelectLockForUpdate { txn, err := ctx.GetTxn(false) if err != nil { return false, errors.Trace(err) } for _, k := range rowKeys.Keys { err = txn.LockKeys([]byte(k.Key)) if err != nil { return false, errors.Trace(err) } } } return f(rid, in) }) }
这里是用来对行加锁(比如SQL中的for update等),我们的SQL没有用到,所以继续返回上一层的f(rid, in):
// Do implements the plan.Plan Do interface, extracts specific values by // r.Fields from row data. func (r *SelectFieldsDefaultPlan) Do(ctx context.Context, f plan.RowIterFunc) error { fields := r.Src.GetFields() m := map[interface{}]interface{}{} return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) { m[expressions.ExprEvalIdentFunc] = func(name string) (interface{}, error) { return getIdentValue(name, fields, in, field.DefaultFieldFlag) } out := make([]interface{}, len(r.Fields)) for i, fld := range r.Fields { var err error if out[i], err = fld.Expr.Eval(ctx, m); err != nil { return false, err } } return f(rid, out) }) }
这里用于取出需要返回的列放入out,然后传递给上一个f(rid, out):
// Do implements the plan.Plan Do interface, and sets result info field. func (r *SelectFinalPlan) Do(ctx context.Context, f plan.RowIterFunc) error { // Reset infered. For prepared statements, this plan may run many times. r.infered = false return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) { // we should not output hidden fields to client out := in[0:r.HiddenFieldOffset] for i, o := range out { switch v := o.(type) { case bool: // Convert bool field to int if v { out[i] = uint8(1) } else { out[i] = uint8(0) } } } if !r.infered { setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], out) r.infered = true } return f(rid, out) }) }
这里过滤掉隐藏字段(比如没有主键时会增加的autoid的列):
接着会把数据传到:
if e := r.rs.Do(func(data []interface{}) (bool, error) { vv, err := types.Clone(data) if err != nil { return false, errors.Trace(err) } select { case r.rows <- vv: return true, nil case <-r.done: return false, nil } }); e != nil { err = e } select { case r.rows <- err: case <-r.done: }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L355-L373
clone一份数据后发送到r.rows的channel。如果有错误也会被传导r.rows。
然后之前在等待数据的地方就收到了。
至此,整个select大致介绍了一遍,还有一些细节可能没讲,以后会通过分析每次的commit来覆盖这些内容,现在先有个大体上的把握。
然后SELECT r FROM t WHERE c >= ‘c1’ and c <= ‘c5’;语句也基本上一样,只是spans的上限会改成c5。所以就不具体介绍了。
小结
整个流程大致为:编译sql到stsms->然后制定查询计划(根据各种条件选择查询范围等)->异步执行查询计划->通过channel接收查询结果。