TIDB源码分析-从github第一次提交说起(3)

继续说下一个SQL语句(昨天博客出了点问题,修了一天,今天补上):

CREATE INDEX index_name ON t(c);

入口和上一个语句一样,这里就不在描述了。

直接进入Exec():

// Exec implements the stmt.Statement Exec interface.
func (s *CreateIndexStmt) Exec(ctx context.Context) (rset.Recordset, error) {
	err := sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, s.TableIdent.Full(ctx), s.Unique, model.NewCIStr(s.IndexName), s.IndexColNames)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return nil, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L154-L161

获取session中的ddl以及表信息后进入:

func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, indexName model.CIStr, idxColNames []*coldef.IndexColName) error {
	is := d.infoHandle.Get()//获取ddl中的Handel对象
	t, err := is.TableByName(ti.Schema, ti.Name)//检查是否存在,并获取表信息
	if err != nil {
		return errors.Trace(err)
	}
	if _, ok := is.IndexByName(ti.Schema, ti.Name, indexName); ok {//检查是否存在这个index
		return errors.Errorf("CREATE INDEX: index already exist %s", indexName)
	}
	if is.ColumnExists(ti.Schema, ti.Name, indexName) {//确保索引名称不存在
		return errors.Errorf("CREATE INDEX: index name collision with existing column: %s", indexName)
	}

	tbInfo := t.Meta()//把当前Table对象转换成TableInfo{}元数据
...

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L464-L477

接下来把SQL语句的索引对应到具体字段:

...
	// build offsets
	idxColumns := make([]*model.IndexColumn, 0, len(idxColNames))
	for i, ic := range idxColNames {
		col := column.FindCol(t.Cols(), ic.ColumnName)//获取对应的表字段信息
		if col == nil {
			return errors.Errorf("CREATE INDEX: column does not exist: %s", ic.ColumnName)
		}
		idxColumns = append(idxColumns, &model.IndexColumn{
			Name:   col.Name,
			Offset: col.Offset,
			Length: ic.Length,
		})
		// Set ColumnInfo flag
		if i == 0 {
			if unique && len(idxColNames) == 1 {
				tbInfo.Columns[col.Offset].Flag |= mysql.UniqueKeyFlag
			} else {
				tbInfo.Columns[col.Offset].Flag |= mysql.MultipleKeyFlag
			}
		}
	}
...

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L479-L499

然后新建索引对象:

...
	// create index info
	idxInfo := &model.IndexInfo{
		Name:    indexName,
		Columns: idxColumns,
		Unique:  unique,
	}
	tbInfo.Indices = append(tbInfo.Indices, idxInfo)
...

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L500-L506

接着构建索引:

...
	// build index
	err = d.buildIndex(ctx, t, idxInfo, unique)
	if err != nil {
		return errors.Trace(err)
	}
...

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L508-L512

传入表信息,索引信息和是否唯一索引后进入buildIndex:

func (d *ddl) buildIndex(ctx context.Context, t table.Table, idxInfo *model.IndexInfo, unique bool) error {
	firstKey := t.FirstKey()//获取表的第一行数据的Key值
	prefix := t.KeyPrefix()//获取表Key的前缀

	txn, err := ctx.GetTxn(false)//获得当前事务
	if err != nil {
		return errors.Trace(err)
	}
	it, err := txn.Seek([]byte(firstKey), nil)//通过firstKey找到表第一行数据的迭代器
	if err != nil {
		return errors.Trace(err)
	}
	defer it.Close()

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L518-L530

然后遍历数据行,分别对索引字段建立索引:

	for it.Valid() && strings.HasPrefix(it.Key(), prefix) {
		var err error
		h, err := util.DecodeHandleFromRowKey(it.Key())
		log.Info("building index...", h)
		if err != nil {
			return errors.Trace(err)
		}
		// TODO: v is timestamp ?
		// fetch datas
		cols := t.Cols()
		var vals []interface{}
		for _, v := range idxInfo.Columns {
			col := cols[v.Offset]
			k := t.RecordKey(h, col)
			data, err := txn.Get([]byte(k))
			if err != nil {
				return errors.Trace(err)
			}
			val, err := t.DecodeValue(data, col)
			if err != nil {
				return errors.Trace(err)
			}
			vals = append(vals, val)
		}
		// build index
		kvX := kv.NewKVIndex(t.IndexPrefix(), idxInfo.Name.L, unique)
		err = kvX.Create(txn, vals, h)
		if err != nil {
			return errors.Trace(err)
		}

		rk := []byte(t.RecordKey(h, nil))
		it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L531-L569

都创建完后回到这里:

	// build index
	err = d.buildIndex(ctx, t, idxInfo, unique)
	if err != nil {
		return errors.Trace(err)
	}

	// update InfoSchema
	return d.updateInfoSchema(ctx, ti.Schema, tbInfo)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L508-L516

然后通过d.updateInfoSchema把新增的索引信息更新到session.ddl.infoHandle以及把更新通过txn写入到当前事务(txn.UnionStore.Set):

func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error {
	clonedInfo := d.GetInformationSchema().Clone()
	for _, info := range clonedInfo {
		if info.Name == schema {
			var match bool
			for i := range info.Tables {
				if info.Tables[i].Name == tbInfo.Name {
					info.Tables[i] = tbInfo
					match = true
				}
			}
			if !match {
				info.Tables = append(info.Tables, tbInfo)
			}
			err := d.writeSchemaInfo(info)//通过txn.UnionStore.Set写入到事务(还未提交/落盘)
			if err != nil {
				return errors.Trace(err)
			}
		}
	}
	d.infoHandle.Set(clonedInfo)//更新到session.ddl.infoHandle
	return nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L593-L615

然后又回到:

	// update InfoSchema
	return d.updateInfoSchema(ctx, ti.Schema, tbInfo)
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L514-L516

接着这个exec完成了:

	err := sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, s.TableIdent.Full(ctx), s.Unique, model.NewCIStr(s.IndexName), s.IndexColNames)
	if err != nil {
		return nil, errors.Trace(err)
	}
	return nil, nil
}

https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L156-L161

小结

添加索引的步骤基本就是:检查索引是否存在->所有表中的数据行对应字段都加上索引->提交索引信息,并更新到内存->最终落盘索引信息

发表评论

邮箱地址不会被公开。 必填项已用*标注

请输入正确的验证码