TIDB源码分析-从github第一次提交说起(5)

接着说下一个SQL语句:

SELECT r FROM t WHERE c = ‘c1’;

这是个查询语句,和之前的Exec类型的语句不一定,所以从入口开始说起:

			err = executeLine(tx, l)
			if err != nil {
				log.Error(errors.ErrorStack(err))
				tx.Rollback()
			} else {
				tx.Commit()
			}
		}
	}
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L210-L219

接着进入:

func executeLine(tx *sql.Tx, txnLine string) error {
	if tidb.IsQuery(txnLine) {
		rows, err := tx.Query(txnLine)

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L56-L58

进入到sql包的tx.Query(txnLine)会调用Query():

// Queryer is an optional interface that may be implemented by a Conn.
//
// If a Conn does not implement Queryer, the sql package's DB.Query will first
// prepare a query, execute the statement, and then close the statement.
//
// Query may return driver.ErrSkip.
func (c *driverConn) Query(query string, args []driver.Value) (driver.Rows, error) {
	return c.driverQuery(query, args)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L295-L303

然后来到:

func (c *driverConn) driverQuery(query string, args []driver.Value) (driver.Rows, error) {
	if len(args) == 0 {
		rss, err := c.s.Execute(query)

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L305-L307

接着执行Execute()这里根之前一样:

func (s *session) Execute(sql string) ([]rset.Recordset, error) {
	stmts, err := Compile(sql)
	if err != nil {
		log.Errorf("Compile sql error: %s - %s", sql, err)
		return nil, errors.Trace(err)
	}

	var rs []rset.Recordset

	for _, si := range stmts {
		r, err := runStmt(s, si)
		if err != nil {
			log.Warnf("session:%v, err:%v", s, err)
			return nil, errors.Trace(err)
		}

		if r != nil {
			rs = append(rs, r)
		}
	}

	return rs, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L120-L142

然后进入runStmt,随后来到:

// Exec implements the stmt.Statement Exec interface.
func (s *SelectStmt) Exec(ctx context.Context) (rs rset.Recordset, err error) {
	log.Info("SelectStmt trx:")
	r, err := s.Plan(ctx)
	if err != nil {
		return nil, err
	}

	return rsets.Recordset{ctx, r}, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L203-L212

制定查询计划

进入s.Plan(ctx)开始指定查询计划(并不会正真去KV取数据):

// Plan implements the plan.Planner interface.
// The whole phase for select is
// `from -> where -> lock -> group by -> having -> select fields -> distinct -> order by -> limit -> final`
func (s *SelectStmt) Plan(ctx context.Context) (plan.Plan, error) {
	var (
		r   plan.Plan
		err error
	)

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L78-L85

首先根据from生成需要返回哪些字段:

	if s.From != nil {
		r, err = s.From.Plan(ctx)
		if err != nil {
			return nil, err
		}
	} else if s.Fields != nil {
		// Only evaluate fields values.
		fr := &rsets.FieldRset{Fields: s.Fields}
		r, err = fr.Plan(ctx)
		if err != nil {
			return nil, err
		}

	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L87-L100

由于这条SQL有from,所以进入r, err = s.From.Plan(ctx):

// Plan gets JoinPlan.
func (r *JoinRset) Plan(ctx context.Context) (plan.Plan, error) {
	r.tableNames = make(map[string]bool)
	p := &plans.JoinPlan{}
	if err := r.buildJoinPlan(ctx, p, r); err != nil {
		return nil, errors.Trace(err)
	}

	return p, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L220-L229

进入r.buildJoinPlan(ctx, p, r):

func (r *JoinRset) buildJoinPlan(ctx context.Context, p *plans.JoinPlan, s *JoinRset) error {
	left, leftFields, err := r.buildPlan(ctx, s.Left)
	if err != nil {
		return errors.Trace(err)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L101-L105

这里由于是JoinPlan,所以有Left和Right表达式,并且会递归遍历这个语法树:

接着进入left, leftFields, err := r.buildPlan(ctx, s.Left):

func (r *JoinRset) buildPlan(ctx context.Context, node interface{}) (plan.Plan, []*field.ResultField, error) {
	switch t := node.(type) {
	case *JoinRset:
		return &plans.JoinPlan{}, nil, nil
	case *TableSource:
		return r.buildSourcePlan(ctx, t)
	case nil:
		return nil, nil, nil
	default:
		return nil, nil, errors.Errorf("invalid join node %T", t)
	}
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L147-L158

现在node类型为TableSource,所以继续进入r.buildSourcePlan(ctx, t):

func (r *JoinRset) buildSourcePlan(ctx context.Context, t *TableSource) (plan.Plan, []*field.ResultField, error) {
	var (
		src interface{}
		tr  *TableRset
		err error
	)
	switch s := t.Source.(type) {
	case table.Ident:
		fullIdent := s.Full(ctx)//取出table.Ident对应的Schema
		tr, err = newTableRset(fullIdent.Schema.O, fullIdent.Name.O)//新建TableRset对象
		if err != nil {
			return nil, nil, errors.Trace(err)
		}
		src = tr
		if t.Name == "" {//在r中标记出现过的表名
			qualifiedTableName := tr.Schema + "." + tr.Name
			if r.tableNames[qualifiedTableName] {
				return nil, nil, errors.Errorf("%s: duplicate name %s", r.String(), s)
			}
			r.tableNames[qualifiedTableName] = true
		}
	case stmt.Statement:
		src = s
	default:
		return nil, nil, errors.Errorf("invalid table source %T", t.Source)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L160-L185

接着往下:

	var p plan.Plan
	switch x := src.(type) {
	case plan.Planner:
		if p, err = x.Plan(ctx); err != nil {
			return nil, nil, errors.Trace(err)
		}
	case plan.Plan:
		p = x
	default:
		return nil, nil, errors.Errorf("invalid table source %T, no Plan interface", t.Source)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L187-L197

会进入到x.Plan(ctx),这里返回了一个plans.TableDefaultPlan对象赋值给p,并填充上了Schema,Table和cols的完整信息。

继续往下:

	var fields []*field.ResultField
	dupNames := make(map[string]struct{}, len(p.GetFields()))
	for _, nf := range p.GetFields() {//检查是否有重复字段名
		f := nf.Clone()
		if t.Name != "" {
			f.TableName = t.Name
		}

		// duplicate column name in one table is not allowed.
		name := strings.ToLower(f.Name)
		if _, ok := dupNames[name]; ok {
			return nil, nil, errors.Errorf("Duplicate column name '%s'", name)
		}
		dupNames[name] = struct{}{}

		fields = append(fields, f)
	}

	return p, fields, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L199-L218

然后一直回到:

	left, leftFields, err := r.buildPlan(ctx, s.Left)
	if err != nil {
		return errors.Trace(err)
	}

	right, rightFields, err := r.buildPlan(ctx, s.Right)
	if err != nil {
		return errors.Trace(err)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L102-L110

由于这个SQL没有join所以right, rightFields, err := r.buildPlan(ctx, s.Right)返回nil,nil,nil。

接着往下

	p.Left = left
	p.Right = right
	p.On = s.On
	p.Type = s.Type.String()

	// if the left is not JoinPlan, the left is the leaf node,
	// we can stop recursion.
	if pl, ok := left.(*plans.JoinPlan); ok {
		if err := r.buildJoinPlan(ctx, pl, s.Left.(*JoinRset)); err != nil {
			return errors.Trace(err)
		}

		// use left JoinPlan fields.
		p.Fields = append(p.Fields, pl.GetFields()...)
	} else {
		// use left fields directly.
		p.Fields = append(p.Fields, leftFields...)
	}

	// if the right is not JoinPlan, the right is the leaf node,
	// we can stop recursion.
	if pr, ok := right.(*plans.JoinPlan); ok {
		if err := r.buildJoinPlan(ctx, pr, s.Right.(*JoinRset)); err != nil {
			return errors.Trace(err)
		}
		// use right JoinPlan fields.
		p.Fields = append(p.Fields, pr.GetFields()...)
	} else {
		// use right fields directly.
		p.Fields = append(p.Fields, rightFields...)
	}

	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/join.go#L112-L145

这里一直递归这棵树,直到叶子结点,并把所有Fields加入到p.Fields。

然后一直回到:

		r, err = s.From.Plan(ctx)
		if err != nil {
			return nil, err
		}
	} else if s.Fields != nil {
		// Only evaluate fields values.
		fr := &rsets.FieldRset{Fields: s.Fields}
		r, err = fr.Plan(ctx)
		if err != nil {
			return nil, err
		}

	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L88-L100

这里把s.From.Plan(ctx)生成的Plan复制给了前面定义的r。

接着往下:

	if w := s.Where; w != nil {
		r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx)
		if err != nil {
			return nil, err
		}
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L102-L107

其中r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx),这里把上面得到的JoinPlan(r)作为rsets.WhereRset的src传入,然后再调用Plan()把返回结果又覆盖掉r。

然后进入Plan:

// Plan gets NullPlan/FilterDefaultPlan.
func (r *WhereRset) Plan(ctx context.Context) (plan.Plan, error) {
	expr, err := r.Expr.Clone()
	if err != nil {
		return nil, err
	}

	if expr.IsStatic() {
		// IsStaic means we have a const value for where condition, and we don't need any index.
		return r.planStatic(ctx, expr)
	}

	switch x := expr.(type) {
	case *expressions.BinaryOperation:
		return r.planBinOp(ctx, x)
	case *expressions.Ident:
		return r.planIdent(ctx, x)
	case *expressions.IsNull:
		return r.planIsNull(ctx, x)
	case *expressions.PatternIn:
		// TODO: optimize
		// TODO: show plan
	case *expressions.PatternLike:
		// TODO: optimize
	case *expressions.PatternRegexp:
		// TODO: optimize
	case *expressions.UnaryOperation:
		return r.planUnaryOp(ctx, x)
	default:
		log.Warnf("%v not supported in where rset now", r.Expr)
	}

	return &plans.FilterDefaultPlan{r.Src, expr}, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/where.go#L180-L213

可以看到,这里还有些表达式(比如like)不支持,这类表达式会执行全表扫描。

接着进入到r.planBinOp(ctx, x):

func (r *WhereRset) planBinOp(ctx context.Context, x *expressions.BinaryOperation) (plan.Plan, error) {
	var err error
	var p2 plan.Plan
	var filtered bool

	p := r.Src//就是之前创建的joinPlan
	switch x.Op {
	case opcode.EQ, opcode.GE, opcode.GT, opcode.LE, opcode.LT, opcode.NE:
		if p2, filtered, err = p.Filter(ctx, x); err != nil {
			return nil, err
		}
		if filtered {
			return p2, nil
		}
	case opcode.AndAnd:
		var in []expression.Expression
		var f func(expression.Expression)
		f = func(e expression.Expression) {
			b, ok := e.(*expressions.BinaryOperation)
			if !ok || b.Op != opcode.AndAnd {
				in = append(in, e)
				return
			}

			f(b.L)
			f(b.R)
		}
		f(x)
		out := []expression.Expression{}
		p := r.Src
		isNewPlan := false
		for _, e := range in {
			p2, filtered, err = p.Filter(ctx, e)
			if err != nil {
				return nil, err
			}

			if !filtered {
				out = append(out, e)
				continue
			}

			p = p2
			isNewPlan = true
		}

		if !isNewPlan {
			break
		}

		if len(out) == 0 {
			return p, nil
		}

		for len(out) > 1 {
			n := len(out)
			e := expressions.NewBinaryOperation(opcode.AndAnd, out[n-2], out[n-1])

			out = out[:n-1]
			out[n-2] = e
		}

		return &plans.FilterDefaultPlan{p, out[0]}, nil
	default:
		// TODO: better plan for `OR`.
		log.Warn("TODO: better plan for", x.Op)
	}

	return &plans.FilterDefaultPlan{p, x}, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/where.go#L37-L106

由于条件是EQ类型的,所以进入p.Filter(ctx, x):

func (r *JoinPlan) Filter(ctx context.Context, expr expression.Expression) (plan.Plan, bool, error) {
	// TODO: do more optimization for join plan
	// now we only use where expression for Filter, but for join
	// we must use On expression too.

	p, filtered, err := r.filterNode(ctx, expr, r.Left)
	if err != nil {
		return nil, false, err
	}
	if filtered {
		r.Left = p
		return r, true, nil
	}

	p, filtered, err = r.filterNode(ctx, expr, r.Right)
	if err != nil {
		return nil, false, err
	}
	if filtered {
		r.Right = p
		return r, true, nil
	}
	return r, false, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/join.go#L106-L129

继续进入p, filtered, err := r.filterNode(ctx, expr, r.Left):

func (r *JoinPlan) filterNode(ctx context.Context, expr expression.Expression, node plan.Plan) (plan.Plan, bool, error) {
	if node == nil {
		return r, false, nil
	}

	e2, err := expr.Clone()
	if err != nil {
		return nil, false, err
	}

	return node.Filter(ctx, e2)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/join.go#L91-L102

继续进入node.Filter(ctx, e2):

// Filter implements plan.Plan Filter interface.
func (r *TableDefaultPlan) Filter(ctx context.Context, expr expression.Expression) (plan.Plan, bool, error) {
	return r.filter(ctx, expr, true)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/from.go#L216-L219

继续:

func (r *TableDefaultPlan) filter(ctx context.Context, expr expression.Expression, checkColumns bool) (plan.Plan, bool, error) {
	if checkColumns {//保证SQL中的所有条件字段都合法
		colNames := expressions.MentionedColumns(expr)
		// make sure all mentioned column names are in Fields
		// if not, e.g. the expr has two table like t1.c1 = t2.c2, we can't use filter
		if !field.ContainAllFieldNames(colNames, r.Fields, field.DefaultFieldFlag) {
			return r, false, nil
		}
	}

	switch x := expr.(type) {
	case *expressions.BinaryOperation:
		return r.filterBinOp(ctx, x)
	case *expressions.Ident:
		return r.filterIdent(ctx, x, true)
	case *expressions.IsNull:
		return r.filterIsNull(ctx, x)
	case *expressions.UnaryOperation:
		if x.Op != '!' {
			break
		}
		if operand, ok := x.V.(*expressions.Ident); ok {
			return r.filterIdent(ctx, operand, false)
		}
	}
	return r, false, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/from.go#L221-L247

根据类型进入r.filterBinOp(ctx, x):

func (r *TableDefaultPlan) filterBinOp(ctx context.Context, x *expressions.BinaryOperation) (plan.Plan, bool, error) {
	ok, cn, rval, err := x.IsIdentRelOpVal()//检查并拆分出当前where条件
	if err != nil {
		return r, false, err
	}
	if !ok {
		return r, false, nil
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/from.go#L104-L111

然后往下:

	t := r.T
	c := column.FindCol(t.Cols(), cn)
	if c == nil {
		return nil, false, errors.Errorf("No such column: %s", cn)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/from.go#L113-L117

column.FindCol(t.Cols(), cn):在表中根据cn(where条件的列名)查询出对应col信息。

继续往下:

	ix := t.FindIndexByColName(cn)
	if ix == nil { // Column cn has no index.
		return r, false, nil
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/from.go#L119-L122

这里根据cn找出表中的索引字段。

继续往下:

	if rval, err = c.CastValue(ctx, rval); err != nil {//根据列的类型转换值
		return nil, false, err
	}

	if rval == nil {//过滤掉某些操作的nil值
		// if nil, any <, <=, >, >=, =, != operator will do nothing
		// any value compared null returns null
		// TODO: if we support <=> later, we must handle null
		return &NullPlan{r.GetFields()}, true, nil
	}
	return &indexPlan{
		src:     t,
		colName: cn,
		idxName: ix.Name.O,
		idx:     ix.X,
		spans:   toSpans(x.Op, rval),
	}, true, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/from.go#L124-L141

接着会构造一个indexPlan的索引计划,然后spans是记录了不重叠的扫描范围(获取数据时会执行表扫描,这里定义了表扫描的各个startkey和endkey)

进入toSpans(x.Op, rval):

// generate a slice of span from operator and value.
func toSpans(op opcode.Op, val interface{}) []*indexSpan {
	var spans []*indexSpan
	switch op {
	case opcode.EQ:
		spans = append(spans, &indexSpan{
			lowVal:      val,
			lowExclude:  false,
			highVal:     val,
			highExclude: false,
		})
	case opcode.LT:
		spans = append(spans, &indexSpan{
			lowVal:      minNotNullVal,
			highVal:     val,
			highExclude: true,
		})
	case opcode.LE:
		spans = append(spans, &indexSpan{
			lowVal:      minNotNullVal,
			highVal:     val,
			highExclude: false,
		})
	case opcode.GE:
		spans = append(spans, &indexSpan{
			lowVal:     val,
			lowExclude: false,
			highVal:    maxVal,
		})
	case opcode.GT:
		spans = append(spans, &indexSpan{
			lowVal:     val,
			lowExclude: true,
			highVal:    maxVal,
		})
	case opcode.NE:
		spans = append(spans, &indexSpan{
			lowVal:      minNotNullVal,
			highVal:     val,
			highExclude: true,
		})
		spans = append(spans, &indexSpan{
			lowVal:     val,
			lowExclude: true,
			highVal:    maxVal,
		})
	default:
		panic("should never happen")
	}
	return spans
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/index.go#L303-L353

这里定义了不同操作的各种范围,我们这里是EQ,所以lowVal(startkey)和highVal(endkey)都是val(其实就是点查询)。

然后一直回到:

p, filtered, err := r.filterNode(ctx, expr, r.Left)
	if err != nil {
		return nil, false, err
	}
	if filtered {
		r.Left = p
		return r, true, nil
	}

	p, filtered, err = r.filterNode(ctx, expr, r.Right)
	if err != nil {
		return nil, false, err
	}
	if filtered {
		r.Right = p
		return r, true, nil
	}
	return r, false, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/join.go#L111-L129

这里由于没有right了,所以直接回到了:

		r, err = (&rsets.WhereRset{Expr: w.Expr, Src: r}).Plan(ctx)
		if err != nil {
			return nil, err
		}
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L103-L107

接着往下:

	lock := s.Lock
	if variable.IsAutocommit(ctx) {//是否启用了Autocommit
		// Locking of rows for update using SELECT FOR UPDATE only applies when autocommit
		// is disabled (either by beginning transaction with START TRANSACTION or by setting
		// autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked.
		// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-reads.html
		lock = coldef.SelectLockNone
	}
	r, err = (&rsets.SelectLockRset{Src: r, Lock: lock}).Plan(ctx)
	if err != nil {
		return nil, err
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L108-L119

然后把之前得到的计划r继续作为SelectLockRset的src传入,然后这个Plan什么也没做:

// Plan gets SelectLockPlan.
func (r *SelectLockRset) Plan(ctx context.Context) (plan.Plan, error) {
	return &plans.SelectLockPlan{Src: r.Src, Lock: r.Lock}, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/lock.go#L33-L36

然后继续往下:

	// Get select list for futher field values evaluation.
	selectList, err := plans.ResolveSelectList(s.Fields, r.GetFields())
	if err != nil {
		return nil, errors.Trace(err)
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L121-L125

plans.ResolveSelectList(s.Fields, r.GetFields())用来获取字段和结果字段,并检查合法性和处理通配符。

接着:

	var groupBy []expression.Expression
	if s.GroupBy != nil {
		groupBy = s.GroupBy.By
	}

	if s.Having != nil {
		// `having` may contain aggregate functions, and we will add this to hidden fields.
		if err = s.Having.CheckAndUpdateSelectList(selectList, groupBy, r.GetFields()); err != nil {
			return nil, errors.Trace(err)
		}
	}

	if s.OrderBy != nil {
		// `order by` may contain aggregate functions, and we will add this to hidden fields.
		if err = s.OrderBy.CheckAndUpdateSelectList(selectList, r.GetFields()); err != nil {
			return nil, errors.Trace(err)
		}
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L127-L144

这里检查groupby和having以及OrderBy ,这里我这条SQL都没有,所以先不展开了。

然后继续:

	switch {
	case !rsets.HasAggFields(selectList.Fields) && s.GroupBy == nil:
		// If no group by and no aggregate functions, we will use SelectFieldsPlan.
		if r, err = (&rsets.SelectFieldsRset{Src: r,
			SelectList: selectList}).Plan(ctx); err != nil {
			return nil, err
		}
	default:
		if r, err = (&rsets.GroupByRset{By: groupBy,
			Src:        r,
			SelectList: selectList}).Plan(ctx); err != nil {
			return nil, err
		}
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L146-L159

这里再套上一层SelectFieldsRset后会进入(&rsets.SelectFieldsRset{Src: r, SelectList: selectList}).Plan(ctx):

// Plan gets SrcPlan/SelectFieldsDefaultPlan.
// If all fields are equal to src plan fields, then gets SrcPlan.
// Default gets SelectFieldsDefaultPlan.
func (r *SelectFieldsRset) Plan(ctx context.Context) (plan.Plan, error) {
	fields := r.SelectList.Fields
	srcFields := r.Src.GetFields()
	if len(fields) == len(srcFields) {
		match := true
		for i, v := range fields {
			// TODO: is it this check enough? e.g, the ident field is t.c.
			if x, ok := v.Expr.(*expressions.Ident); ok && strings.EqualFold(x.L, srcFields[i].Name) && strings.EqualFold(v.Name, srcFields[i].Name) {
				continue
			}

			match = false
			break
		}

		if match {
			return r.Src, nil
		}
	}

	src := r.Src
	if x, ok := src.(*plans.TableDefaultPlan); ok {//检查是否可以删除对表的plan
		// check whether src plan will be set TableNilPlan, like `select 1, 2 from t`.
		isConst := true
		for _, v := range fields {
			if expressions.FastEval(v.Expr) == nil {
				isConst = false
				break
			}
		}
		if isConst {
			src = &plans.TableNilPlan{x.T}
		}
	}

	p := &plans.SelectFieldsDefaultPlan{Src: src, SelectList: r.SelectList}
	return p, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/fields.go#L37-L77

这里根据需要返回字段fields是否等于srcFields来决定使用SrcPlan(上一个计划)或者SelectFieldsDefaultPlan。

然后回到:

			SelectList: selectList}).Plan(ctx); err != nil {
			return nil, err
		}
	default:
		if r, err = (&rsets.GroupByRset{By: groupBy,
			Src:        r,
			SelectList: selectList}).Plan(ctx); err != nil {
			return nil, err
		}
	}

	if s := s.Having; s != nil {
		if r, err = (&rsets.HavingRset{
			Src:  r,
			Expr: s.Expr}).Plan(ctx); err != nil {
			return nil, err
		}
	}

	if s.Distinct {
		if r, err = (&rsets.DistinctRset{Src: r,
			SelectList: selectList}).Plan(ctx); err != nil {
			return nil, err
		}
	}

	if s := s.OrderBy; s != nil {
		if r, err = (&rsets.OrderByRset{By: s.By,
			Src:        r,
			SelectList: selectList}).Plan(ctx); err != nil {
			return nil, err
		}
	}

	if s := s.Offset; s != nil {
		if r, err = (&rsets.OffsetRset{s.Count, r}).Plan(ctx); err != nil {
			return nil, err
		}
	}
	if s := s.Limit; s != nil {
		if r, err = (&rsets.LimitRset{s.Count, r}).Plan(ctx); err != nil {
			return nil, err
		}
	}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L150-L193

这些在本SQL都没用到,都先不展开了。

然后:

	if r, err = (&rsets.SelectFinalRset{Src: r,
		SelectList: selectList}).Plan(ctx); err != nil {
		return nil, err
	}

	return r, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L195-L201

在这里给人再套上一层SelectFinalRset,这个Plan什么也没做。

然后计划制定完成(基本上就是把缺的东西填上),然后回到:

	r, err := s.Plan(ctx)
	if err != nil {
		return nil, err
	}

	return rsets.Recordset{ctx, r}, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/select.go#L206-L212

这里给制定的r放进rsets.Recordset,然后继续返回:

		rs, err = s.Exec(ctx)
		stmt.ClearExecArgs(ctx)
	}
	// MySQL DDL should be auto-commit
	if err == nil && (s.IsDDL() || variable.IsAutocommit(ctx)) {
		err = ctx.FinishTxn(false)
	}
	return rs, errors.Trace(err)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/tidb.go#L153-L161

进一步返回:

		r, err := runStmt(s, si)
		if err != nil {
			log.Warnf("session:%v, err:%v", s, err)
			return nil, errors.Trace(err)
		}

		if r != nil {
			rs = append(rs, r)
		}
	}

	return rs, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/session.go#L130-L142

这里把每条SQL语句结果r添加到rs中,然后继续返回:

		rss, err := c.s.Execute(query)
		if err != nil {
			return nil, errors.Trace(err)
		}
		if len(rss) == 0 {
			return nil, errors.Trace(errNoResult)
		}
		return newdriverRows(rss[0]), nil
	}
	stmt, err := c.getStmt(query)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return stmt.Query(args)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L307-L321

至此,并没用正真的从kv中取出任何数据,只是做了一些计划,以及给定了需要返回的字段,并返回了一个似乎是用来获取数据的对象,真正的获取数据是从newdriverRows(rss[0])这里开始的:

func newdriverRows(rs rset.Recordset) *driverRows {
	r := &driverRows{
		rs:   rs,
		done: make(chan int),
		rows: make(chan interface{}, 500),
	}
	go func() {
		err := io.EOF
		if e := r.rs.Do(func(data []interface{}) (bool, error) {
			vv, err := types.Clone(data)
			if err != nil {
				return false, errors.Trace(err)
			}
			select {
			case r.rows <- vv:
				return true, nil
			case <-r.done:
				return false, nil
			}
		}); e != nil {
			err = e
		}

		select {
		case r.rows <- err:
		case <-r.done:
		}
	}()
	return r
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L347-L376

这里面通过启动一个goroutine来异步获取数据,并通过channel来发送数据,然后在这里通过调用Next等待数据:

for rows.Next() {
			err := rows.Scan(scanArgs...)
			if err != nil {
				return errors.Trace(err)
			}

			data := make([]string, len(cols))
			for i, value := range values {
				if value == nil {
					data[i] = "NULL"
				} else {
					data[i] = string(value)
				}
			}

			datas = append(datas, data)
		}

		// For `cols` and `datas[i]` always has the same length,
		// no need to check return validity.
		result, _ := printer.GetPrintResult(cols, datas)
		fmt.Printf("%s", result)

		if err := rows.Err(); err != nil {
			return errors.Trace(err)
		}
	} else {
		// TODO: rows affected and last insert id
		_, err := tx.Exec(txnLine)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/interpreter/main.go#L75-L109

具体Next等待在这里:

func (r *driverRows) Next(dest []driver.Value) error {
	select {
	case rx := <-r.rows:
		switch x := rx.(type) {
		case error:
			return x
		case []interface{}:
			if g, e := len(x), len(dest); g != e {
				return errors.Errorf("field count mismatch: got %d, need %d", g, e)
			}

			for i, xi := range x {
				switch v := xi.(type) {
				case nil, int64, float32, float64, bool, []byte, string:
					dest[i] = v
				case int8:
					dest[i] = int64(v)
				case int16:
					dest[i] = int64(v)
				case int32:
					dest[i] = int64(v)
				case int:
					dest[i] = int64(v)
				case uint8:
					dest[i] = uint64(v)
				case uint16:
					dest[i] = uint64(v)
				case uint32:
					dest[i] = uint64(v)
				case uint64:
					dest[i] = uint64(v)
				case uint:
					dest[i] = uint64(v)
				case mysql.Duration:
					dest[i] = v.String()
				case mysql.Time:
					dest[i] = v.String()
				case mysql.Decimal:
					dest[i] = v.String()
				default:
					return errors.Errorf("unable to handle type %T", xi)
				}
			}
			return nil
		default:
			return errors.Errorf("unable to handle type %T", rx)
		}
	case <-r.done:
		return io.EOF
	}
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L403-L453

从KV取数据

然后回到前面的goroutine,分析一下具体取数据的流程:

	r := &driverRows{
		rs:   rs,
		done: make(chan int),
		rows: make(chan interface{}, 500),
	}
	go func() {
		err := io.EOF
		if e := r.rs.Do(func(data []interface{}) (bool, error) {
			vv, err := types.Clone(data)
			if err != nil {
				return false, errors.Trace(err)
			}
			select {
			case r.rows <- vv:
				return true, nil
			case <-r.done:
				return false, nil
			}
		}); e != nil {
			err = e
		}

		select {
		case r.rows <- err:
		case <-r.done:
		}
	}()

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L348-L374

首先,这里的r.rs就是之前指定的计划。

然后看到r.rs.Do传递了一个函数作为参数,进入DO看看:

// Do implements rset.Recordset.
func (r Recordset) Do(f func(data []interface{}) (bool, error)) error {
	return r.Plan.Do(r.Ctx, func(ID interface{}, data []interface{}) (bool, error) {
		return f(data)
	})
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/rset/rsets/rsets.go#L38-L43

这是最后嵌套的一层对象,这里有把之前传入的函数放入里面那层DO了:

// Do implements the plan.Plan Do interface, and sets result info field.
func (r *SelectFinalPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
	// Reset infered. For prepared statements, this plan may run many times.
	r.infered = false
	return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
		// we should not output hidden fields to client
		out := in[0:r.HiddenFieldOffset]
		for i, o := range out {
			switch v := o.(type) {
			case bool:
				// Convert bool field to int
				if v {
					out[i] = uint8(1)
				} else {
					out[i] = uint8(0)
				}
			}
		}
		if !r.infered {
			setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], out)
			r.infered = true
		}
		return f(rid, out)
	})
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/final.go#L36-L60

这个SelectFinalPlan就是之前计划的最后套的那层,这里面每一层都会传入一个函数,相当于入栈的调用,所以直接进入最底层后分析:

// Do implements plan.Plan Do interface, it executes join method
// accourding to given type.
func (r *JoinPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
	if r.Right == nil {
		return r.Left.Do(ctx, f)
	}

	switch r.Type {
	case LeftJoin:
		return r.doLeftJoin(ctx, f)
	case RightJoin:
		return r.doRightJoin(ctx, f)
	case FullJoin:
		return r.doFullJoin(ctx, f)
	default:
		return r.doCrossJoin(ctx, f)
	}
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/join.go#L136-L153

然后进入r.Left.Do(ctx, f):

// Do implements plan.Plan Do interface.
// It scans a span from the lower bound to upper bound.
func (r *indexPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
	txn, err := ctx.GetTxn(false)
	if err != nil {
		return err
	}
	for _, span := range r.spans {
		err := r.doSpan(ctx, txn, span, f)
		if err != nil {
			return err
		}
	}
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/index.go#L195-L209

获取当前session的事务txn后根据之前的spans(扫描范围)来开始扫描,进入doSpan():

func (r *indexPlan) doSpan(ctx context.Context, txn kv.Transaction, span *indexSpan, f plan.RowIterFunc) error {
	seekVal := span.lowVal
	if span.lowVal == minNotNullVal {
		seekVal = []byte{}
	}
	it, _, err := r.idx.Seek(txn, []interface{}{seekVal})
	if err != nil {
		return types.EOFAsNil(err)
	}
	defer it.Close()

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/index.go#L157-L166

首先seekVal := span.lowVal获得开始扫描的起始位置,然后r.idx.Seek(txn, []interface{}{seekVal})从KV查找>=seekVal 的迭代器it。

接着:

var skipLowCompare bool
	for {
		k, h, err := it.Next()
		if err != nil {
			return types.EOFAsNil(err)
		}
		val := k[0]
		if !skipLowCompare {
			if span.lowExclude && indexCompare(span.lowVal, val) == 0 {//不包含lowVal,现在的val==lowVal,则忽略改行数据
				continue
			}
			skipLowCompare = true
		}
		cmp := indexCompare(span.highVal, val)
		if cmp < 0 || (cmp == 0 && span.highExclude) {//当前行已经超出highVal,则直接退出
			return nil
		}
		data, err := r.src.Row(ctx, h)
		if err != nil {
			return err
		}

		if more, err := f(h, data); err != nil || !more {
			return err
		}
	}
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/index.go#L167-L193

这里通过迭代器it.Next(),每次取出一行数据的行号h,以及未经过编码的key值k(就是索引字段的值),然后扫描到当前行的key >=(是否包含取决于span.highExclude)span.highVal就完成扫描,且已经包含全部数据,如何保证?前面已经说过KV库对Key是递增存储的,所以startkey可以log(n)时间获取到,且第一个>=span.highVal的行就说明已经结束扫描。

接下来r.src.Row(ctx, h)用行号获取这一行的数据,并且直接查询需要返回的列的数据即可(因为一行数据按照每一列一个KV对存储的,所以可以很方便的过滤列数据,但是现在没有这样做,把所有列都查出来后在后面过滤)。

接着把行号和当前行数据传递给之前传入进来的函数f(h, data):

// Do implements plan.Plan Do interface, acquiring locks.
func (r *SelectLockPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
	return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
		var rowKeys *RowKeyList
		if in != nil && len(in) > 0 {
			t := in[len(in)-1]
			switch vt := t.(type) {
			case *RowKeyList:
				rowKeys = vt
				// Remove row key list from data tail
				in = in[:len(in)-1]
			}
		}
		if rowKeys != nil && r.Lock == coldef.SelectLockForUpdate {
			txn, err := ctx.GetTxn(false)
			if err != nil {
				return false, errors.Trace(err)
			}
			for _, k := range rowKeys.Keys {
				err = txn.LockKeys([]byte(k.Key))
				if err != nil {
					return false, errors.Trace(err)
				}
			}
		}
		return f(rid, in)
	})
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/lock.go#L37-L64

这里是用来对行加锁(比如SQL中的for update等),我们的SQL没有用到,所以继续返回上一层的f(rid, in):

// Do implements the plan.Plan Do interface, extracts specific values by
// r.Fields from row data.
func (r *SelectFieldsDefaultPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
	fields := r.Src.GetFields()
	m := map[interface{}]interface{}{}
	return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
		m[expressions.ExprEvalIdentFunc] = func(name string) (interface{}, error) {
			return getIdentValue(name, fields, in, field.DefaultFieldFlag)
		}

		out := make([]interface{}, len(r.Fields))
		for i, fld := range r.Fields {
			var err error
			if out[i], err = fld.Expr.Eval(ctx, m); err != nil {
				return false, err
			}
		}
		return f(rid, out)
	})
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/fields.go#L54-L73

这里用于取出需要返回的列放入out,然后传递给上一个f(rid, out):

// Do implements the plan.Plan Do interface, and sets result info field.
func (r *SelectFinalPlan) Do(ctx context.Context, f plan.RowIterFunc) error {
	// Reset infered. For prepared statements, this plan may run many times.
	r.infered = false
	return r.Src.Do(ctx, func(rid interface{}, in []interface{}) (bool, error) {
		// we should not output hidden fields to client
		out := in[0:r.HiddenFieldOffset]
		for i, o := range out {
			switch v := o.(type) {
			case bool:
				// Convert bool field to int
				if v {
					out[i] = uint8(1)
				} else {
					out[i] = uint8(0)
				}
			}
		}
		if !r.infered {
			setResultFieldInfo(r.ResultFields[0:r.HiddenFieldOffset], out)
			r.infered = true
		}
		return f(rid, out)
	})
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/plan/plans/final.go#L36-L60

这里过滤掉隐藏字段(比如没有主键时会增加的autoid的列):

接着会把数据传到:

		if e := r.rs.Do(func(data []interface{}) (bool, error) {
			vv, err := types.Clone(data)
			if err != nil {
				return false, errors.Trace(err)
			}
			select {
			case r.rows <- vv:
				return true, nil
			case <-r.done:
				return false, nil
			}
		}); e != nil {
			err = e
		}

		select {
		case r.rows <- err:
		case <-r.done:
		}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/driver.go#L355-L373

clone一份数据后发送到r.rows的channel。如果有错误也会被传导r.rows。

然后之前在等待数据的地方就收到了。

至此,整个select大致介绍了一遍,还有一些细节可能没讲,以后会通过分析每次的commit来覆盖这些内容,现在先有个大体上的把握。

然后SELECT r FROM t WHERE c >= ‘c1’ and c <= ‘c5’;语句也基本上一样,只是spans的上限会改成c5。所以就不具体介绍了。

小结

整个流程大致为:编译sql到stsms->然后制定查询计划(根据各种条件选择查询范围等)->异步执行查询计划->通过channel接收查询结果。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

请输入正确的验证码