继续说下一个SQL语句(昨天博客出了点问题,修了一天,今天补上):
CREATE INDEX index_name ON t(c);
入口和上一个语句一样,这里就不在描述了。
直接进入Exec():
// Exec implements the stmt.Statement Exec interface. func (s *CreateIndexStmt) Exec(ctx context.Context) (rset.Recordset, error) { err := sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, s.TableIdent.Full(ctx), s.Unique, model.NewCIStr(s.IndexName), s.IndexColNames) if err != nil { return nil, errors.Trace(err) } return nil, nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/stmt/stmts/create.go#L154-L161
获取session中的ddl以及表信息后进入:
func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, indexName model.CIStr, idxColNames []*coldef.IndexColName) error { is := d.infoHandle.Get()//获取ddl中的Handel对象 t, err := is.TableByName(ti.Schema, ti.Name)//检查是否存在,并获取表信息 if err != nil { return errors.Trace(err) } if _, ok := is.IndexByName(ti.Schema, ti.Name, indexName); ok {//检查是否存在这个index return errors.Errorf("CREATE INDEX: index already exist %s", indexName) } if is.ColumnExists(ti.Schema, ti.Name, indexName) {//确保索引名称不存在 return errors.Errorf("CREATE INDEX: index name collision with existing column: %s", indexName) } tbInfo := t.Meta()//把当前Table对象转换成TableInfo{}元数据 ...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L464-L477
接下来把SQL语句的索引对应到具体字段:
... // build offsets idxColumns := make([]*model.IndexColumn, 0, len(idxColNames)) for i, ic := range idxColNames { col := column.FindCol(t.Cols(), ic.ColumnName)//获取对应的表字段信息 if col == nil { return errors.Errorf("CREATE INDEX: column does not exist: %s", ic.ColumnName) } idxColumns = append(idxColumns, &model.IndexColumn{ Name: col.Name, Offset: col.Offset, Length: ic.Length, }) // Set ColumnInfo flag if i == 0 { if unique && len(idxColNames) == 1 { tbInfo.Columns[col.Offset].Flag |= mysql.UniqueKeyFlag } else { tbInfo.Columns[col.Offset].Flag |= mysql.MultipleKeyFlag } } } ...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L479-L499
然后新建索引对象:
... // create index info idxInfo := &model.IndexInfo{ Name: indexName, Columns: idxColumns, Unique: unique, } tbInfo.Indices = append(tbInfo.Indices, idxInfo) ...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L500-L506
接着构建索引:
... // build index err = d.buildIndex(ctx, t, idxInfo, unique) if err != nil { return errors.Trace(err) } ...
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L508-L512
传入表信息,索引信息和是否唯一索引后进入buildIndex:
func (d *ddl) buildIndex(ctx context.Context, t table.Table, idxInfo *model.IndexInfo, unique bool) error { firstKey := t.FirstKey()//获取表的第一行数据的Key值 prefix := t.KeyPrefix()//获取表Key的前缀 txn, err := ctx.GetTxn(false)//获得当前事务 if err != nil { return errors.Trace(err) } it, err := txn.Seek([]byte(firstKey), nil)//通过firstKey找到表第一行数据的迭代器 if err != nil { return errors.Trace(err) } defer it.Close()
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L518-L530
然后遍历数据行,分别对索引字段建立索引:
for it.Valid() && strings.HasPrefix(it.Key(), prefix) { var err error h, err := util.DecodeHandleFromRowKey(it.Key()) log.Info("building index...", h) if err != nil { return errors.Trace(err) } // TODO: v is timestamp ? // fetch datas cols := t.Cols() var vals []interface{} for _, v := range idxInfo.Columns { col := cols[v.Offset] k := t.RecordKey(h, col) data, err := txn.Get([]byte(k)) if err != nil { return errors.Trace(err) } val, err := t.DecodeValue(data, col) if err != nil { return errors.Trace(err) } vals = append(vals, val) } // build index kvX := kv.NewKVIndex(t.IndexPrefix(), idxInfo.Name.L, unique) err = kvX.Create(txn, vals, h) if err != nil { return errors.Trace(err) } rk := []byte(t.RecordKey(h, nil)) it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk)) if err != nil { return errors.Trace(err) } } return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L531-L569
都创建完后回到这里:
// build index err = d.buildIndex(ctx, t, idxInfo, unique) if err != nil { return errors.Trace(err) } // update InfoSchema return d.updateInfoSchema(ctx, ti.Schema, tbInfo) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L508-L516
然后通过d.updateInfoSchema把新增的索引信息更新到session.ddl.infoHandle以及把更新通过txn写入到当前事务(txn.UnionStore.Set):
func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error { clonedInfo := d.GetInformationSchema().Clone() for _, info := range clonedInfo { if info.Name == schema { var match bool for i := range info.Tables { if info.Tables[i].Name == tbInfo.Name { info.Tables[i] = tbInfo match = true } } if !match { info.Tables = append(info.Tables, tbInfo) } err := d.writeSchemaInfo(info)//通过txn.UnionStore.Set写入到事务(还未提交/落盘) if err != nil { return errors.Trace(err) } } } d.infoHandle.Set(clonedInfo)//更新到session.ddl.infoHandle return nil }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L593-L615
然后又回到:
// update InfoSchema return d.updateInfoSchema(ctx, ti.Schema, tbInfo) }
https://github.com/pingcap/tidb/blob/0d6f270068e8ff2aedc1c314e907771b6a508ebd/ddl/ddl.go#L514-L516
接着这个exec完成了:
err := sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, s.TableIdent.Full(ctx), s.Unique, model.NewCIStr(s.IndexName), s.IndexColNames) if err != nil { return nil, errors.Trace(err) } return nil, nil }
小结
添加索引的步骤基本就是:检查索引是否存在->所有表中的数据行对应字段都加上索引->提交索引信息,并更新到内存->最终落盘索引信息