接着说下一个SQL语句:
SELECT r FROM t WHERE c = ‘c1’;
这是个查询语句,和之前的Exec类型的语句不一定,所以从入口开始说起:
err = executeLine(tx, l)
if err != nil {
log.Error(errors.ErrorStack(err))
tx.Rollback()
} else {
tx.Commit()
}
}
}
}
接着进入:
func executeLine(tx *sql.Tx, txnLine string) error {
if tidb.IsQuery(txnLine) {
rows, err := tx.Query(txnLine)
进入到sql包的tx.Query(txnLine)会调用Query():
// Queryer is an optional interface that may be implemented by a Conn.
//
// If a Conn does not implement Queryer, the sql package's DB.Query will first
// prepare a query, execute the statement, and then close the statement.
//
// Query may return driver.ErrSkip.
func (c *driverConn) Query(query string, args []driver.Value) (driver.Rows, error) {
return c.driverQuery(query, args)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L295-L303
然后来到:
func (c *driverConn) driverQuery(query string, args []driver.Value) (driver.Rows, error) {
if len(args) == 0 {
rss, err := c.s.Execute(query)
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L305-L307
接着执行Execute()这里根之前一样:
func (s *session) Execute(sql string) ([]rset.Recordset, error) {
stmts, err := Compile(sql)
if err != nil {
log.Errorf("Compile sql error: %s - %s", sql, err)
return nil, errors.Trace(err)
}
var rs []rset.Recordset
for _, si := range stmts {
r, err := runStmt(s, si)
if err != nil {
log.Warnf("session:%v, err:%v", s, err)
return nil, errors.Trace(err)
}
if r != nil {
rs = append(rs, r)
}
}
return rs, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120-L142
然后进入runStmt,随后来到:
// Exec implements the stmt.Statement Exec interface.
func (s *SelectStmt) Exec(ctx context.Context) (rs rset.Recordset, err error) {
log.Info("SelectStmt trx:")
r, err := s.Plan(ctx)
if err != nil {
return nil, err
}
return rsets.Recordset{ctx, r}, nil
}
制定查询计划
进入s.Plan(ctx)开始指定查询计划(并不会正真去KV取数据):
// Plan implements the plan.Planner interface.
// The whole phase for select is
// `from -> where -> lock -> group by -> having -> select fields -> distinct -> order by -> limit -> final`
func (s *SelectStmt) Plan(ctx context.Context) (plan.Plan, error) {
var (
r plan.Plan
err error
)
首先根据from生成需要返回哪些字段:
if s.From != nil {
r, err = s.From.Plan(ctx)
if err != nil {
return nil, err
}
} else if s.Fields != nil {
// Only evaluate fields values.
fr := &rsets.FieldRset{Fields: s.Fields}
r, err = fr.Plan(ctx)
if err != nil {
return nil, err
}
}
由于这条SQL有from,所以进入r, err = s.From.Plan(ctx):
// Plan gets JoinPlan.
func (r *JoinRset) Plan(ctx context.Context) (plan.Plan, error) {
r.tableNames = make(map[string]bool)
p := &plans.JoinPlan{}
if err := r.buildJoinPlan(ctx, p, r); err != nil {
return nil, errors.Trace(err)
}
return p, nil
}
进入r.buildJoinPlan(ctx, p, r):
func (r *JoinRset) buildJoinPlan(ctx context.Context, p *plans.JoinPlan, s *JoinRset) error {
left, leftFields, err := r.buildPlan(ctx, s.Left)
if err != nil {
return errors.Trace(err)
}
这里由于是JoinPlan,所以有Left和Right表达式,并且会递归遍历这个语法树:
接着进入left, leftFields, err := r.buildPlan(ctx, s.Left):
func (r *JoinRset) buildPlan(ctx context.Context, node interface{}) (plan.Plan, []*field.ResultField, error) {
switch t := node.(type) {
case *JoinRset:
return &plans.JoinPlan{}, nil, nil
case *TableSource:
return r.buildSourcePlan(ctx, t)
case nil:
return nil, nil, nil
default:
return nil, nil, errors.Errorf("invalid join node %T", t)
}
}
现在node类型为TableSource,所以继续进入r.buildSourcePlan(ctx, t):
func (r *JoinRset) buildSourcePlan(ctx context.Context, t *TableSource) (plan.Plan, []*field.ResultField, error) {
var (
src interface{}
tr *TableRset
err error
)
switch s := t.Source.(type) {
case table.Ident:
fullIdent := s.Full(ctx)//取出table.Ident对应的Schema
tr, err = newTableRset(fullIdent.Schema.O, fullIdent.Name.O)//新建TableRset对象
if err != nil {
return nil, nil, errors.Trace(err)
}
src = tr
if t.Name == "" {//在r中标记出现过的表名
qualifiedTableName := tr.Schema + "." + tr.Name
if r.tableNames[qualifiedTableName] {
return nil, nil, errors.Errorf("%s: duplicate name %s", r.String(), s)
}
r.tableNames[qualifiedTableName] = true
}
case stmt.Statement:
src = s
default:
return nil, nil, errors.Errorf("invalid table source %T", t.Source)
}
接着往下:
var p plan.Plan
switch x := src.(type) {
case plan.Planner:
if p, err = x.Plan(ctx); err != nil {
return nil, nil, errors.Trace(err)
}
case plan.Plan:
p = x
default:
return nil, nil, errors.Errorf("invalid table source %T, no Plan interface", t.Source)
}
会进入到x.Plan(ctx),这里返回了一个plans.TableDefaultPlan对象赋值给p,并填充上了Schema,Table和cols的完整信息。
继续往下:
var fields []*field.ResultField
dupNames := make(map[string]struct{}, len(p.GetFields()))
for _, nf := range p.GetFields() {//检查是否有重复字段名
f := nf.Clone()
if t.Name != "" {
f.TableName = t.Name
}
// duplicate column name in one table is not allowed.
name := strings.ToLower(f.Name)
if _, ok := dupNames[name]; ok {
return nil, nil, errors.Errorf("Duplicate column name '%s'", name)
}
dupNames[name] = struct{}{}
fields = append(fields, f)
}
return p, fields, nil
}
然后一直回到:
left, leftFields, err := r.buildPlan(ctx, s.Left)
if err != nil {
return errors.Trace(err)
}
right, rightFields, err := r.buildPlan(ctx, s.Right)
if err != nil {
return errors.Trace(err)
}
由于这个SQL没有join所以right, rightFields, err := r.buildPlan(ctx, s.Right)返回nil,nil,nil。
接着往下
p.Left = left
p.Right = right
p.On = s.On
p.Type = s.Type.String()
// if the left is not JoinPlan, the left is the leaf node,
// we can stop recursion.
if pl, ok := left.(*plans.JoinPlan); ok {
if err := r.buildJoinPlan(ctx, pl, s.Left.(*JoinRset)); err != nil {
return errors.Trace(err)
}
// use left JoinPlan fields.
p.Fields = append(p.Fields, pl.GetFields()...)
} else {
// use left fields directly.
p.Fields = append(p.Fields, leftFields...)
}
// if the right is not JoinPlan, the right is the leaf node,
// we can stop recursion.
if pr, ok := right.(*plans.JoinPlan); ok {
if err := r.buildJoinPlan(ctx, pr, s.Right.(*JoinRset)); err != nil {
return errors.Trace(err)
}
// use right JoinPlan fields.
p.Fields = append(p.Fields, pr.GetFields()...)
} else {
// use right fields directly.
p.Fields = append(p.Fields, rightFields...)
}
return nil
}
这里一直递归这棵树,直到叶子结点,并把所有Fields加入到p.Fields。
然后一直回到:
r, err = s.From.Plan(ctx)
if err != nil {
return nil, err
}
} else if s.Fields != nil {
// Only evaluate fields values.
fr := &rsets.FieldRset{Fields: s.Fields}
r, err = fr.Plan(ctx)
if err != nil {
return nil, err
}
}
这里把s.From.Plan(ctx)生成的Plan复制给了前面定义的r。
接着往下:
if w := s.Where; w != nil {
r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx)
if err != nil {
return nil, err
}
}
其中r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx),这里把上面得到的JoinPlan(r)作为rsets.WhereRset的src传入,然后再调用Plan()把返回结果又覆盖掉r。
然后进入Plan:
// Plan gets NullPlan/FilterDefaultPlan.
func (r *WhereRset) Plan(ctx context.Context) (plan.Plan, error) {
expr, err := r.Expr.Clone()
if err != nil {
return nil, err
}
if expr.IsStatic() {
// IsStaic means we have a const value for where condition, and we don't need any index.
return r.planStatic(ctx, expr)
}
switch x := expr.(type) {
case *expressions.BinaryOperation:
return r.planBinOp(ctx, x)
case *expressions.Ident:
return r.planIdent(ctx, x)
case *expressions.IsNull:
return r.planIsNull(ctx, x)
case *expressions.PatternIn:
// TODO: optimize
// TODO: show plan
case *expressions.PatternLike:
// TODO: optimize
case *expressions.PatternRegexp:
// TODO: optimize
case *expressions.UnaryOperation:
return r.planUnaryOp(ctx, x)
default:
log.Warnf("%v not supported in where rset now", r.Expr)
}
return &plans.FilterDefaultPlan{r.Src, expr}, nil
}
可以看到,这里还有些表达式(比如like)不支持,这类表达式会执行全表扫描。
接着进入到r.planBinOp(ctx, x):
func (r *WhereRset) planBinOp(ctx context.Context, x *expressions.BinaryOperation) (plan.Plan, error) {
var err error
var p2 plan.Plan
var filtered bool
p := r.Src//就是之前创建的joinPlan
switch x.Op {
case opcode.EQ, opcode.GE, opcode.GT, opcode.LE, opcode.LT, opcode.NE:
if p2, filtered, err = p.Filter(ctx, x); err != nil {
return nil, err
}
if filtered {
return p2, nil
}
case opcode.AndAnd:
var in []expression.Expression
var f func(expression.Expression)
f = func(e expression.Expression) {
b, ok := e.(*expressions.BinaryOperation)
if !ok || b.Op != opcode.AndAnd {
in = append(in, e)
return
}
f(b.L)
f(b.R)
}
f(x)
out := []expression.Expression{}
p := r.Src
isNewPlan := false
for _, e := range in {
p2, filtered, err = p.Filter(ctx, e)
if err != nil {
return nil, err
}
if !filtered {
out = append(out, e)
continue
}
p = p2
isNewPlan = true
}
if !isNewPlan {
break
}
if len(out) == 0 {
return p, nil
}
for len(out) > 1 {
n := len(out)
e := expressions.NewBinaryOperation(opcode.AndAnd, out[n-2], out[n-1])
out = out[:n-1]
out[n-2] = e
}
return &plans.FilterDefaultPlan{p, out[0]}, nil
default:
// TODO: better plan for `OR`.
log.Warn("TODO: better plan for", x.Op)
}
return &plans.FilterDefaultPlan{p, x}, nil
}
由于条件是EQ类型的,所以进入p.Filter(ctx, x):
func (r *JoinPlan) Filter(ctx context.Context, expr expression.Expression) (plan.Plan, bool, error) {
// TODO: do more optimization for join plan
// now we only use where expression for Filter, but for join
// we must use On expression too.
p, filtered, err := r.filterNode(ctx, expr, r.Left)
if err != nil {
return nil, false, err
}
if filtered {
r.Left = p
return r, true, nil
}
p, filtered, err = r.filterNode(ctx, expr, r.Right)
if err != nil {
return nil, false, err
}
if filtered {
r.Right = p
return r, true, nil
}
return r, false, nil
}
继续进入p, filtered, err := r.filterNode(ctx, expr, r.Left):
func (r *JoinPlan) filterNode(ctx context.Context, expr expression.Expression, node plan.Plan) (plan.Plan, bool, error) {
if node == nil {
return r, false, nil
}
e2, err := expr.Clone()
if err != nil {
return nil, false, err
}
return node.Filter(ctx, e2)
}
继续进入node.Filter(ctx, e2):
// Filter implements plan.Plan Filter interface.
func (r *TableDefaultPlan) Filter(ctx context.Context, expr expression.Expression) (plan.Plan, bool, error) {
return r.filter(ctx, expr, true)
}
继续:
func (r *TableDefaultPlan) filter(ctx context.Context, expr expression.Expression, checkColumns bool) (plan.Plan, bool, error) {
if checkColumns {//保证SQL中的所有条件字段都合法
colNames := expressions.MentionedColumns(expr)
// make sure all mentioned column names are in Fields
// if not, e.g. the expr has two table like t1.c1 = t2.c2, we can't use filter
if !field.ContainAllFieldNames(colNames, r.Fields, field.DefaultFieldFlag) {
return r, false, nil
}
}
switch x := expr.(type) {
case *expressions.BinaryOperation:
return r.filterBinOp(ctx, x)
case *expressions.Ident:
return r.filterIdent(ctx, x, true)
case *expressions.IsNull:
return r.filterIsNull(ctx, x)
case *expressions.UnaryOperation:
if x.Op != '!' {
break
}
if operand, ok := x.V.(*expressions.Ident); ok {
return r.filterIdent(ctx, operand, false)
}
}
return r, false, nil
}
根据类型进入r.filterBinOp(ctx, x):
func (r *TableDefaultPlan) filterBinOp(ctx context.Context, x *expressions.BinaryOperation) (plan.Plan, bool, error) {
ok, cn, rval, err := x.IsIdentRelOpVal()//检查并拆分出当前where条件
if err != nil {
return r, false, err
}
if !ok {
return r, false, nil
}
然后往下:
t := r.T
c := column.FindCol(t.Cols(), cn)
if c == nil {
return nil, false, errors.Errorf("No such column: %s", cn)
}
column.FindCol(t.Cols(), cn):在表中根据cn(where条件的列名)查询出对应col信息。
继续往下:
ix := t.FindIndexByColName(cn)
if ix == nil { // Column cn has no index.
return r, false, nil
}
这里根据cn找出表中的索引字段。
继续往下:
if rval, err = c.CastValue(ctx, rval); err != nil {//根据列的类型转换值
return nil, false, err
}
if rval == nil {//过滤掉某些操作的nil值
// if nil, any <, <=, >, >=, =, != operator will do nothing
// any value compared null returns null
// TODO: if we support <=> later, we must handle null
return &NullPlan{r.GetFields()}, true, nil
}
return &indexPlan{
src: t,
colName: cn,
idxName: ix.Name.O,
idx: ix.X,
spans: toSpans(x.Op, rval),
}, true, nil
}
接着会构造一个indexPlan的索引计划,然后spans是记录了不重叠的扫描范围(获取数据时会执行表扫描,这里定义了表扫描的各个startkey和endkey)
进入toSpans(x.Op, rval):
// generate a slice of span from operator and value.
func toSpans(op opcode.Op, val interface{}) []*indexSpan {
var spans []*indexSpan
switch op {
case opcode.EQ:
spans = append(spans, &indexSpan{
lowVal: val,
lowExclude: false,
highVal: val,
highExclude: false,
})
case opcode.LT:
spans = append(spans, &indexSpan{
lowVal: minNotNullVal,
highVal: val,
highExclude: true,
})
case opcode.LE:
spans = append(spans, &indexSpan{
lowVal: minNotNullVal,
highVal: val,
highExclude: false,
})
case opcode.GE:
spans = append(spans, &indexSpan{
lowVal: val,
lowExclude: false,
highVal: maxVal,
})
case opcode.GT:
spans = append(spans, &indexSpan{
lowVal: val,
lowExclude: true,
highVal: maxVal,
})
case opcode.NE:
spans = append(spans, &indexSpan{
lowVal: minNotNullVal,
highVal: val,
highExclude: true,
})
spans = append(spans, &indexSpan{
lowVal: val,
lowExclude: true,
highVal: maxVal,
})
default:
panic("should never happen")
}
return spans
}
这里定义了不同操作的各种范围,我们这里是EQ,所以lowVal(startkey)和highVal(endkey)都是val(其实就是点查询)。
然后一直回到:
p, filtered, err := r.filterNode(ctx, expr, r.Left)
if err != nil {
return nil, false, err
}
if filtered {
r.Left = p
return r, true, nil
}
p, filtered, err = r.filterNode(ctx, expr, r.Right)
if err != nil {
return nil, false, err
}
if filtered {
r.Right = p
return r, true, nil
}
return r, false, nil
}
这里由于没有right了,所以直接回到了:
r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx)
if err != nil {
return nil, err
}
}
接着往下:
lock := s.Lock
if variable.IsAutocommit(ctx) {//是否启用了Autocommit
// Locking of rows for update using SELECT FOR UPDATE only applies when autocommit
// is disabled (either by beginning transaction with START TRANSACTION or by setting
// autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked.
// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-reads.html
lock = coldef.SelectLockNone
}
r, err = (&rsets.SelectLockRset{Src: r, Lock: lock}).Plan(ctx)
if err != nil {
return nil, err
}
然后把之前得到的计划r继续作为SelectLockRset的src传入,然后这个Plan什么也没做:
// Plan gets SelectLockPlan.
func (r *SelectLockRset) Plan(ctx context.Context) (plan.Plan, error) {
return &plans.SelectLockPlan{Src: r.Src, Lock: r.Lock}, nil
}
然后继续往下:
// Get select list for futher field values evaluation.
selectList, err := plans.ResolveSelectList(s.Fields, r.GetFields())
if err != nil {
return nil, errors.Trace(err)
}
plans.ResolveSelectList(s.Fields, r.GetFields())用来获取字段和结果字段,并检查合法性和处理通配符。
接着:
var groupBy []expression.Expression
if s.GroupBy != nil {
groupBy = s.GroupBy.By
}
if s.Having != nil {
// `having` may contain aggregate functions, and we will add this to hidden fields.
if err = s.Having.CheckAndUpdateSelectList(selectList, groupBy, r.GetFields()); err != nil {
return nil, errors.Trace(err)
}
}
if s.OrderBy != nil {
// `order by` may contain aggregate functions, and we will add this to hidden fields.
if err = s.OrderBy.CheckAndUpdateSelectList(selectList, r.GetFields()); err != nil {
return nil, errors.Trace(err)
}
}
这里检查groupby和having以及OrderBy ,这里我这条SQL都没有,所以先不展开了。
然后继续:
switch {
case !rsets.HasAggFields(selectList.Fields) && s.GroupBy == nil:
// If no group by and no aggregate functions, we will use SelectFieldsPlan.
if r, err = (&rsets.SelectFieldsRset{Src: r,
SelectList: selectList}).Plan(ctx); err != nil {
return nil, err
}
default:
if r, err = (&rsets.GroupByRset{By: groupBy,
Src: r,
SelectList: selectList}).Plan(ctx); err != nil {
return nil, err
}
}
这里再套上一层SelectFieldsRset后会进入(&rsets.SelectFieldsRset{Src: r, SelectList: selectList}).Plan(ctx):
// Plan gets SrcPlan/SelectFieldsDefaultPlan.
// If all fields are equal to src plan fields, then gets SrcPlan.
// Default gets SelectFieldsDefaultPlan.
func (r *SelectFieldsRset) Plan(ctx context.Context) (plan.Plan, error) {
fields := r.SelectList.Fields
srcFields := r.Src.GetFields()
if len(fields) == len(srcFields) {
match := true
for i, v := range fields {
// TODO: is it this check enough? e.g, the ident field is t.c.
if x, ok := v.Expr.(*expressions.Ident); ok && strings.EqualFold(x.L, srcFields[i].Name) && strings.EqualFold(v.Name, srcFields[i].Name) {
continue
}
match = false
break
}
if match {
return r.Src, nil
}
}
src := r.Src
if x, ok := src.(*plans.TableDefaultPlan); ok {//检查是否可以删除对表的plan
// check whether src plan will be set TableNilPlan, like `select 1, 2 from t`.
isConst := true
for _, v := range fields {
if expressions.FastEval(v.Expr) == nil {
isConst = false
break
}
}
if isConst {
src = &plans.TableNilPlan{x.T}
}
}
p := &plans.SelectFieldsDefaultPlan{Src: src, SelectList: r.SelectList}
return p, nil
}
这里根据需要返回字段fields是否等于srcFields来决定使用SrcPlan(上一个计划)或者SelectFieldsDefaultPlan。
然后回到:
SelectList: selectList}).Plan(ctx); err != nil {
return nil, err
}
default:
if r, err = (&rsets.GroupByRset{By: groupBy,
Src: r,
SelectList: selectList}).Plan(ctx); err != nil {
return nil, err
}
}
if s := s.Having; s != nil {
if r, err = (&rsets.HavingRset{
Src: r,
Expr: s.Expr}).Plan(ctx); err != nil {
return nil, err
}
}
if s.Distinct {
if r, err = (&rsets.DistinctRset{Src: r,
SelectList: selectList}).Plan(ctx); err != nil {
return nil, err
}
}
if s := s.OrderBy; s != nil {
if r, err = (&rsets.OrderByRset{By: s.By,
Src: r,
SelectList: selectList}).Plan(ctx); err != nil {
return nil, err
}
}
if s := s.Offset; s != nil {
if r, err = (&rsets.OffsetRset{s.Count, r}).Plan(ctx); err != nil {
return nil, err
}
}
if s := s.Limit; s != nil {
if r, err = (&rsets.LimitRset{s.Count, r}).Plan(ctx); err != nil {
return nil, err
}
}
这些在本SQL都没用到,都先不展开了。
然后:
if r, err = (&rsets.SelectFinalRset{Src: r,
SelectList: selectList}).Plan(ctx); err != nil {
return nil, err
}
return r, nil
}
在这里给人再套上一层SelectFinalRset,这个Plan什么也没做。
然后计划制定完成(基本上就是把缺的东西填上),然后回到:
r, err := s.Plan(ctx)
if err != nil {
return nil, err
}
return rsets.Recordset{ctx, r}, nil
}
这里给制定的r放进rsets.Recordset,然后继续返回:
rs, err = s.Exec(ctx)
stmt.ClearExecArgs(ctx)
}
// MySQL DDL should be auto-commit
if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
err = ctx.FinishTxn(false)
}
return rs, errors.Trace(err)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153-L161
进一步返回:
r, err := runStmt(s, si)
if err != nil {
log.Warnf("session:%v, err:%v", s, err)
return nil, errors.Trace(err)
}
if r != nil {
rs = append(rs, r)
}
}
return rs, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L130-L142
这里把每条SQL语句结果r添加到rs中,然后继续返回:
rss, err := c.s.Execute(query)
if err != nil {
return nil, errors.Trace(err)
}
if len(rss) == 0 {
return nil, errors.Trace(errNoResult)
}
return newdriverRows(rss[0]), nil
}
stmt, err := c.getStmt(query)
if err != nil {
return nil, errors.Trace(err)
}
return stmt.Query(args)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L307-L321
至此,并没用正真的从kv中取出任何数据,只是做了一些计划,以及给定了需要返回的字段,并返回了一个似乎是用来获取数据的对象,真正的获取数据是从newdriverRows(rss[0])这里开始的:
func newdriverRows(rs rset.Recordset) *driverRows {
r := &driverRows{
rs: rs,
done: make(chan int),
rows: make(chan interface{}, 500),
}
go func() {
err := io.EOF
if e := r.rs.Do(func(data []interface{}) (bool, error) {
vv, err := types.Clone(data)
if err != nil {
return false, errors.Trace(err)
}
select {
case r.rows <- vv:
return true, nil
case <-r.done:
return false, nil
}
}); e != nil {
err = e
}
select {
case r.rows <- err:
case <-r.done:
}
}()
return r
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L347-L376
这里面通过启动一个goroutine来异步获取数据,并通过channel来发送数据,然后在这里通过调用Next等待数据:
for rows.Next() {
err := rows.Scan(scanArgs...)
if err != nil {
return errors.Trace(err)
}
data := make([]string, len(cols))
for i, value := range values {
if value == nil {
data[i] = "NULL"
} else {
data[i] = string(value)
}
}
datas = append(datas, data)
}
// For `cols` and `datas[i]` always has the same length,
// no need to check return validity.
result, _ := printer.GetPrintResult(cols, datas)
fmt.Printf("%s", result)
if err := rows.Err(); err != nil {
return errors.Trace(err)
}
} else {
// TODO: rows affected and last insert id
_, err := tx.Exec(txnLine)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
具体Next等待在这里:
func (r *driverRows) Next(dest []driver.Value) error {
select {
case rx := <-r.rows:
switch x := rx.(type) {
case error:
return x
case []interface{}:
if g, e := len(x), len(dest); g != e {
return errors.Errorf("field count mismatch: got %d, need %d", g, e)
}
for i, xi := range x {
switch v := xi.(type) {
case nil, int64, float32, float64, bool, []byte, string:
dest[i] = v
case int8:
dest[i] = int64(v)
case int16:
dest[i] = int64(v)
case int32:
dest[i] = int64(v)
case int:
dest[i] = int64(v)
case uint8:
dest[i] = uint64(v)
case uint16:
dest[i] = uint64(v)
case uint32:
dest[i] = uint64(v)
case uint64:
dest[i] = uint64(v)
case uint:
dest[i] = uint64(v)
case mysql.Duration:
dest[i] = v.String()
case mysql.Time:
dest[i] = v.String()
case mysql.Decimal:
dest[i] = v.String()
default:
return errors.Errorf("unable to handle type %T", xi)
}
}
return nil
default:
return errors.Errorf("unable to handle type %T", rx)
}
case <-r.done:
return io.EOF
}
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L403-L453
从KV取数据
然后回到前面的goroutine,分析一下具体取数据的流程:
r := &driverRows{
rs: rs,
done: make(chan int),
rows: make(chan interface{}, 500),
}
go func() {
err := io.EOF
if e := r.rs.Do(func(data []interface{}) (bool, error) {
vv, err := types.Clone(data)
if err != nil {
return false, errors.Trace(err)
}
select {
case r.rows <- vv:
return true, nil
case <-r.done:
return false, nil
}
}); e != nil {
err = e
}
select {
case r.rows <- err:
case <-r.done:
}
}()
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L348-L374
首先,这里的r.rs就是之前指定的计划。
然后看到r.rs.Do传递了一个函数作为参数,进入DO看看:
// Do implements rset.Recordset.
func (r Recordset) Do(f func(data []interface{}) (bool, error)) error {
return r.Plan.Do(r.Ctx, func(ID interface{}, data []interface{}) (bool, error) {
return f(data)
})
}
这是最后嵌套的一层对象,这里有把之前传入的函数放入里面那层DO了:
// Do implements the plan.Plan Do interface, and sets result info field.
func (r *SelectFinalPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
// Reset infered. For prepared statements, this plan may run many times.
r.infered = false
return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
// we should not output hidden fields to client
out := in[0:r.HiddenFieldOffset]
for i, o := range out {
switch v := o.(type) {
case bool:
// Convert bool field to int
if v {
out[i] = uint8(1)
} else {
out[i] = uint8(0)
}
}
}
if !r.infered {
setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], out)
r.infered = true
}
return f(rid, out)
})
}
这个SelectFinalPlan就是之前计划的最后套的那层,这里面每一层都会传入一个函数,相当于入栈的调用,所以直接进入最底层后分析:
// Do implements plan.Plan Do interface, it executes join method
// accourding to given type.
func (r *JoinPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
if r.Right == nil {
return r.Left.Do(ctx, f)
}
switch r.Type {
case LeftJoin:
return r.doLeftJoin(ctx, f)
case RightJoin:
return r.doRightJoin(ctx, f)
case FullJoin:
return r.doFullJoin(ctx, f)
default:
return r.doCrossJoin(ctx, f)
}
}
然后进入r.Left.Do(ctx, f):
// Do implements plan.Plan Do interface.
// It scans a span from the lower bound to upper bound.
func (r *indexPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
txn, err := ctx.GetTxn(false)
if err != nil {
return err
}
for _, span := range r.spans {
err := r.doSpan(ctx, txn, span, f)
if err != nil {
return err
}
}
return nil
}
获取当前session的事务txn后根据之前的spans(扫描范围)来开始扫描,进入doSpan():
func (r *indexPlan) doSpan(ctx context.Context, txn kv.Transaction, span *indexSpan, f plan.RowIterFunc) error {
seekVal := span.lowVal
if span.lowVal == minNotNullVal {
seekVal = []byte{}
}
it, _, err := r.idx.Seek(txn, []interface{}{seekVal})
if err != nil {
return types.EOFAsNil(err)
}
defer it.Close()
首先seekVal := span.lowVal获得开始扫描的起始位置,然后r.idx.Seek(txn, []interface{}{seekVal})从KV查找>=seekVal 的迭代器it。
接着:
var skipLowCompare bool
for {
k, h, err := it.Next()
if err != nil {
return types.EOFAsNil(err)
}
val := k[0]
if !skipLowCompare {
if span.lowExclude && indexCompare(span.lowVal, val) == 0 {//不包含lowVal,现在的val==lowVal,则忽略改行数据
continue
}
skipLowCompare = true
}
cmp := indexCompare(span.highVal, val)
if cmp < 0 || (cmp == 0 && span.highExclude) {//当前行已经超出highVal,则直接退出
return nil
}
data, err := r.src.Row(ctx, h)
if err != nil {
return err
}
if more, err := f(h, data); err != nil || !more {
return err
}
}
}
这里通过迭代器it.Next(),每次取出一行数据的行号h,以及未经过编码的key值k(就是索引字段的值),然后扫描到当前行的key >=(是否包含取决于span.highExclude)span.highVal就完成扫描,且已经包含全部数据,如何保证?前面已经说过KV库对Key是递增存储的,所以startkey可以log(n)时间获取到,且第一个>=span.highVal的行就说明已经结束扫描。
接下来r.src.Row(ctx, h)用行号获取这一行的数据,并且直接查询需要返回的列的数据即可(因为一行数据按照每一列一个KV对存储的,所以可以很方便的过滤列数据,但是现在没有这样做,把所有列都查出来后在后面过滤)。
接着把行号和当前行数据传递给之前传入进来的函数f(h, data):
// Do implements plan.Plan Do interface, acquiring locks.
func (r *SelectLockPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
var rowKeys *RowKeyList
if in != nil && len(in) > 0 {
t := in[len(in)-1]
switch vt := t.(type) {
case *RowKeyList:
rowKeys = vt
// Remove row key list from data tail
in = in[:len(in)-1]
}
}
if rowKeys != nil && r.Lock == coldef.SelectLockForUpdate {
txn, err := ctx.GetTxn(false)
if err != nil {
return false, errors.Trace(err)
}
for _, k := range rowKeys.Keys {
err = txn.LockKeys([]byte(k.Key))
if err != nil {
return false, errors.Trace(err)
}
}
}
return f(rid, in)
})
}
这里是用来对行加锁(比如SQL中的for update等),我们的SQL没有用到,所以继续返回上一层的f(rid, in):
// Do implements the plan.Plan Do interface, extracts specific values by
// r.Fields from row data.
func (r *SelectFieldsDefaultPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
fields := r.Src.GetFields()
m := map[interface{}]interface{}{}
return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
m[expressions.ExprEvalIdentFunc] = func(name string) (interface{}, error) {
return getIdentValue(name, fields, in, field.DefaultFieldFlag)
}
out := make([]interface{}, len(r.Fields))
for i, fld := range r.Fields {
var err error
if out[i], err = fld.Expr.Eval(ctx, m); err != nil {
return false, err
}
}
return f(rid, out)
})
}
这里用于取出需要返回的列放入out,然后传递给上一个f(rid, out):
// Do implements the plan.Plan Do interface, and sets result info field.
func (r *SelectFinalPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
// Reset infered. For prepared statements, this plan may run many times.
r.infered = false
return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
// we should not output hidden fields to client
out := in[0:r.HiddenFieldOffset]
for i, o := range out {
switch v := o.(type) {
case bool:
// Convert bool field to int
if v {
out[i] = uint8(1)
} else {
out[i] = uint8(0)
}
}
}
if !r.infered {
setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], out)
r.infered = true
}
return f(rid, out)
})
}
这里过滤掉隐藏字段(比如没有主键时会增加的autoid的列):
接着会把数据传到:
if e := r.rs.Do(func(data []interface{}) (bool, error) {
vv, err := types.Clone(data)
if err != nil {
return false, errors.Trace(err)
}
select {
case r.rows <- vv:
return true, nil
case <-r.done:
return false, nil
}
}); e != nil {
err = e
}
select {
case r.rows <- err:
case <-r.done:
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L355-L373
clone一份数据后发送到r.rows的channel。如果有错误也会被传导r.rows。
然后之前在等待数据的地方就收到了。
至此,整个select大致介绍了一遍,还有一些细节可能没讲,以后会通过分析每次的commit来覆盖这些内容,现在先有个大体上的把握。
然后SELECT r FROM t WHERE c >= ‘c1’ and c <= ‘c5’;语句也基本上一样,只是spans的上限会改成c5。所以就不具体介绍了。
小结
整个流程大致为:编译sql到stsms->然后制定查询计划(根据各种条件选择查询范围等)->异步执行查询计划->通过channel接收查询结果。