Skip to content

Commit

Permalink
ddl: decouple executor part out from ddl (pingcap#54858)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Jul 24, 2024
1 parent 7ebd781 commit 8f98b4e
Show file tree
Hide file tree
Showing 55 changed files with 1,311 additions and 1,315 deletions.
2 changes: 1 addition & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo)

// CreatePlacementPolicy implements glue.Session.
func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
d := domain.GetDomain(gs.se).DDL()
d := domain.GetDomain(gs.se).DDLExecutor()
gs.se.SetValue(sessionctx.QueryString, gs.showCreatePlacementPolicy(policy))
// the default behaviour is ignoring duplicated policy during restore.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"constraint.go",
"ddl.go",
"ddl_algorithm.go",
"ddl_api.go",
"ddl_history.go",
"ddl_running_jobs.go",
"ddl_tiflash_api.go",
Expand All @@ -38,6 +37,7 @@ go_library(
"delete_range_util.go",
"dist_owner.go",
"doc.go",
"executor.go",
"foreign_key.go",
"generated_column.go",
"index.go",
Expand Down Expand Up @@ -220,7 +220,6 @@ go_test(
"db_table_test.go",
"db_test.go",
"ddl_algorithm_test.go",
"ddl_api_test.go",
"ddl_error_test.go",
"ddl_history_test.go",
"ddl_running_jobs_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(

// TODO getTableByTxn is using DDL ctx which is never cancelled except when shutdown.
// we should move this operation out of GetStepExecutor, and put into Init.
_, tblIface, err := ddlObj.getTableByTxn((*asAutoIDRequirement)(ddlObj.ddlCtx), jobMeta.SchemaID, jobMeta.TableID)
_, tblIface, err := ddlObj.getTableByTxn(ddlObj.ddlCtx.getAutoIDRequirement(), jobMeta.SchemaID, jobMeta.TableID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func generateNonPartitionPlan(
useCloud bool,
instanceCnt int,
) (metas [][]byte, err error) {
tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo)
tbl, err := getTable(d.ddlCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
if err != nil {
return nil, err
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ func (w *worker) doModifyColumnTypeWithData(
job.SnapshotVer = 0
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
tbl, err := getTable((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo)
tbl, err := getTable(d.getAutoIDRequirement(), dbInfo.ID, tblInfo)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -1672,7 +1672,17 @@ func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.Colu
return nil
}

type asAutoIDRequirement ddlCtx
func (d *ddlCtx) getAutoIDRequirement() autoid.Requirement {
return &asAutoIDRequirement{
store: d.store,
autoidCli: d.autoidCli,
}
}

type asAutoIDRequirement struct {
store kv.Storage
autoidCli *autoid.ClientDiscover
}

var _ autoid.Requirement = &asAutoIDRequirement{}

Expand All @@ -1693,7 +1703,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
if !needMigrateFromAutoIncToAutoRand {
return nil
}
autoRandAlloc := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType)
autoRandAlloc := autoid.NewAllocatorsFromTblInfo(d.getAutoIDRequirement(), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType)
if autoRandAlloc == nil {
errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/db_change_failpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestParallelUpdateTableReplica(t *testing.T) {
var wg util.WaitGroupWrapper
wg.Run(func() {
// Mock for table tiflash replica was available.
err1 = domain.GetDomain(tk1.Session()).DDL().UpdateTableReplicaInfo(tk1.Session(), t1.Meta().ID, true)
err1 = domain.GetDomain(tk1.Session()).DDLExecutor().UpdateTableReplicaInfo(tk1.Session(), t1.Meta().ID, true)
})
wg.Run(func() {
<-ch
// Mock for table tiflash replica was available.
err2 = domain.GetDomain(tk2.Session()).DDL().UpdateTableReplicaInfo(tk2.Session(), t1.Meta().ID, true)
err2 = domain.GetDomain(tk2.Session()).DDLExecutor().UpdateTableReplicaInfo(tk2.Session(), t1.Meta().ID, true)
})
wg.Wait()
require.NoError(t, err1)
Expand Down
10 changes: 3 additions & 7 deletions pkg/ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1527,24 +1527,20 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) {
sessionToStart.Add(2)
firstDDLFinished := make(chan struct{})

intercept.OnGetInfoSchemaExported = func(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterGetSchemaAndTableByIdent", func(ctx sessionctx.Context) {
// The following code is for testing.
// Make sure the two sessions get the same information schema before executing DDL.
// After the first session executes its DDL, then the second session executes its DDL.
var info infoschema.InfoSchema
sessionToStart.Done()
sessionToStart.Wait()
info = is

// Make sure the two session have got the same information schema. And the first session can continue to go on,
// or the first session finished this SQL(seCnt = finishedCnt), then other sessions can continue to go on.
currID := ctx.GetSessionVars().ConnectionID
if currID != 1 {
<-firstDDLFinished
}

return info
}
})
d := dom.DDL()
d.(ddl.DDLForTest).SetInterceptor(intercept)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestCreateTableWithInfo(t *testing.T) {
tk.MustExec("use test")
tk.Session().SetValue(sessionctx.QueryString, "skip")

d := dom.DDL()
d := dom.DDLExecutor()
require.NotNil(t, d)
info := []*model.TableInfo{{
ID: 42042, // Note, we must ensure the table ID is globally unique!
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestBatchCreateTable(t *testing.T) {
tk.MustExec("drop table if exists tables_2")
tk.MustExec("drop table if exists tables_3")

d := dom.DDL()
d := dom.DDLExecutor()
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_1"),
Expand Down
Loading

0 comments on commit 8f98b4e

Please sign in to comment.