继续说下一个SQL语句(昨天博客出了点问题,修了一天,今天补上):
CREATE INDEX index_name ON t(c);
入口和上一个语句一样,这里就不在描述了。
直接进入Exec():
// Exec implements the stmt.Statement Exec interface.
func (s *CreateIndexStmt) Exec(ctx context.Context) (rset.Recordset, error) {
err := sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, s.TableIdent.Full(ctx), s.Unique, model.NewCIStr(s.IndexName), s.IndexColNames)
if err != nil {
return nil, errors.Trace(err)
}
return nil, nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L154-L161
获取session中的ddl以及表信息后进入:
func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, indexName model.CIStr, idxColNames []*coldef.IndexColName) error {
is := d.infoHandle.Get()//获取ddl中的Handel对象
t, err := is.TableByName(ti.Schema, ti.Name)//检查是否存在,并获取表信息
if err != nil {
return errors.Trace(err)
}
if _, ok := is.IndexByName(ti.Schema, ti.Name, indexName); ok {//检查是否存在这个index
return errors.Errorf("CREATE INDEX: index already exist %s", indexName)
}
if is.ColumnExists(ti.Schema, ti.Name, indexName) {//确保索引名称不存在
return errors.Errorf("CREATE INDEX: index name collision with existing column: %s", indexName)
}
tbInfo := t.Meta()//把当前Table对象转换成TableInfo{}元数据
...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L464-L477
接下来把SQL语句的索引对应到具体字段:
...
// build offsets
idxColumns := make([]*model.IndexColumn, 0, len(idxColNames))
for i, ic := range idxColNames {
col := column.FindCol(t.Cols(), ic.ColumnName)//获取对应的表字段信息
if col == nil {
return errors.Errorf("CREATE INDEX: column does not exist: %s", ic.ColumnName)
}
idxColumns = append(idxColumns, &model.IndexColumn{
Name: col.Name,
Offset: col.Offset,
Length: ic.Length,
})
// Set ColumnInfo flag
if i == 0 {
if unique && len(idxColNames) == 1 {
tbInfo.Columns[col.Offset].Flag |= mysql.UniqueKeyFlag
} else {
tbInfo.Columns[col.Offset].Flag |= mysql.MultipleKeyFlag
}
}
}
...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L479-L499
然后新建索引对象:
...
// create index info
idxInfo := &model.IndexInfo{
Name: indexName,
Columns: idxColumns,
Unique: unique,
}
tbInfo.Indices = append(tbInfo.Indices, idxInfo)
...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L500-L506
接着构建索引:
...
// build index
err = d.buildIndex(ctx, t, idxInfo, unique)
if err != nil {
return errors.Trace(err)
}
...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L508-L512
传入表信息,索引信息和是否唯一索引后进入buildIndex:
func (d *ddl) buildIndex(ctx context.Context, t table.Table, idxInfo *model.IndexInfo, unique bool) error {
firstKey := t.FirstKey()//获取表的第一行数据的Key值
prefix := t.KeyPrefix()//获取表Key的前缀
txn, err := ctx.GetTxn(false)//获得当前事务
if err != nil {
return errors.Trace(err)
}
it, err := txn.Seek([]byte(firstKey), nil)//通过firstKey找到表第一行数据的迭代器
if err != nil {
return errors.Trace(err)
}
defer it.Close()
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L518-L530
然后遍历数据行,分别对索引字段建立索引:
for it.Valid() && strings.HasPrefix(it.Key(), prefix) {
var err error
h, err := util.DecodeHandleFromRowKey(it.Key())
log.Info("building index...", h)
if err != nil {
return errors.Trace(err)
}
// TODO: v is timestamp ?
// fetch datas
cols := t.Cols()
var vals []interface{}
for _, v := range idxInfo.Columns {
col := cols[v.Offset]
k := t.RecordKey(h, col)
data, err := txn.Get([]byte(k))
if err != nil {
return errors.Trace(err)
}
val, err := t.DecodeValue(data, col)
if err != nil {
return errors.Trace(err)
}
vals = append(vals, val)
}
// build index
kvX := kv.NewKVIndex(t.IndexPrefix(), idxInfo.Name.L, unique)
err = kvX.Create(txn, vals, h)
if err != nil {
return errors.Trace(err)
}
rk := []byte(t.RecordKey(h, nil))
it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
if err != nil {
return errors.Trace(err)
}
}
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L531-L569
都创建完后回到这里:
// build index
err = d.buildIndex(ctx, t, idxInfo, unique)
if err != nil {
return errors.Trace(err)
}
// update InfoSchema
return d.updateInfoSchema(ctx, ti.Schema, tbInfo)
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L508-L516
然后通过d.updateInfoSchema把新增的索引信息更新到session.ddl.infoHandle以及把更新通过txn写入到当前事务(txn.UnionStore.Set):
func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error {
clonedInfo := d.GetInformationSchema().Clone()
for _, info := range clonedInfo {
if info.Name == schema {
var match bool
for i := range info.Tables {
if info.Tables[i].Name == tbInfo.Name {
info.Tables[i] = tbInfo
match = true
}
}
if !match {
info.Tables = append(info.Tables, tbInfo)
}
err := d.writeSchemaInfo(info)//通过txn.UnionStore.Set写入到事务(还未提交/落盘)
if err != nil {
return errors.Trace(err)
}
}
}
d.infoHandle.Set(clonedInfo)//更新到session.ddl.infoHandle
return nil
}
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L593-L615
然后又回到:
// update InfoSchema return d.updateInfoSchema(ctx, ti.Schema, tbInfo) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L514-L516
接着这个exec完成了:
err := sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, s.TableIdent.Full(ctx), s.Unique, model.NewCIStr(s.IndexName), s.IndexColNames)
if err != nil {
return nil, errors.Trace(err)
}
return nil, nil
}
小结
添加索引的步骤基本就是:检查索引是否存在->所有表中的数据行对应字段都加上索引->提交索引信息,并更新到内存->最终落盘索引信息