Skip to content

Commit

Permalink
ddl: add ddl_reorg_batch_size variable to control ddl worker batch si…
Browse files Browse the repository at this point in the history
…ze and enlarge default batch size. (pingcap#8365)
  • Loading branch information
crazycs520 committed Dec 7, 2018
1 parent 4dad722 commit d1ff903
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 10 deletions.
14 changes: 9 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
return &addIndexWorker{
id: id,
ddlWorker: worker,
batchCnt: DefaultTaskHandleCnt,
batchCnt: int(variable.GetDDLReorgBatchSize()),
sessCtx: sessCtx,
taskCh: make(chan *reorgIndexTask, 1),
resultCh: make(chan *addIndexResult, 1),
Expand Down Expand Up @@ -775,7 +775,8 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
handleRange := *task
result := &addIndexResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil}
lastLogCount := 0
startTime := time.Now()
lastLogTime := time.Now()
startTime := lastLogTime

for {
taskCtx, err := w.backfillIndexInTxn(handleRange)
Expand All @@ -792,10 +793,11 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
mergeAddIndexCtxToResult(&taskCtx, result)
w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount))

if result.scanCount-lastLogCount >= 30000 {
if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v",
w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle)
log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v, avg row time(ms):%v",
w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle, time.Since(lastLogTime).Seconds()*1000/float64(num))
lastLogTime = time.Now()
}

handleRange.startHandle = taskCtx.nextHandle
Expand Down Expand Up @@ -841,6 +843,8 @@ func (w *addIndexWorker) run(d *ddlCtx) {
// continue
//}

// Dynamic change batch size.
w.batchCnt = int(variable.GetDDLReorgBatchSize())
result := w.handleBackfillTask(d, task)
w.resultCh <- result
}
Expand Down
29 changes: 29 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,32 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) {
res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))
}

func (s *testSuite) TestSetDDLReorgBatchSize(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.DefTiDBDDLReorgBatchSize))

tk.MustExec("set tidb_ddl_reorg_batch_size = 1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '1'"))
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MinDDLReorgBatchSize))
tk.MustExec(fmt.Sprintf("set tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1))
tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '%d'", variable.MaxDDLReorgBatchSize+1)))
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MaxDDLReorgBatchSize))
_, err := tk.Exec("set tidb_ddl_reorg_batch_size = invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))
tk.MustExec("set tidb_ddl_reorg_batch_size = 100")
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100))
tk.MustExec("set tidb_ddl_reorg_batch_size = -1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '-1'"))

tk.MustExec("set tidb_ddl_reorg_batch_size = 100")
res := tk.MustQuery("select @@tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows("100"))

res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgBatchSize)))
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000")
res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows("1000"))
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBDDLReorgWorkerCount + quoteCommaQuote +
variable.TiDBOptInSubqUnFolding + quoteCommaQuote +
variable.TiDBDDLReorgBatchSize + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.EnableTablePartition = TiDBOptOn(val)
case TiDBDDLReorgWorkerCount:
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgBatchSize:
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
case TiDBDDLReorgPriority:
s.setDDLReorgPriority(val)
case TiDBForcePriority:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBQueryLogMaxLen, strconv.Itoa(logutil.DefaultQueryLogMaxLen)},
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeGlobal | ScopeSession, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
}
Expand Down
18 changes: 13 additions & 5 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ const (
// tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers.
TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt"

// tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers.
TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size"

// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"
Expand Down Expand Up @@ -239,6 +242,7 @@ const (
DefTiDBProjectionConcurrency = 4
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBDDLReorgWorkerCount = 16
DefTiDBDDLReorgBatchSize = 1024
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
Expand All @@ -247,9 +251,13 @@ const (
// Process global variables.
var (
ProcessGeneralLog uint32
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
)
19 changes: 19 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ func GetDDLReorgWorkerCounter() int32 {
return atomic.LoadInt32(&ddlReorgWorkerCounter)
}

// SetDDLReorgBatchSize sets ddlReorgBatchSize size.
// Max batch size is MaxDDLReorgBatchSize.
func SetDDLReorgBatchSize(cnt int32) {
if cnt > MaxDDLReorgBatchSize {
cnt = MaxDDLReorgBatchSize
}
if cnt < MinDDLReorgBatchSize {
cnt = MinDDLReorgBatchSize
}
atomic.StoreInt32(&ddlReorgBatchSize, cnt)
}

// GetDDLReorgBatchSize gets ddlReorgBatchSize.
func GetDDLReorgBatchSize() int32 {
return atomic.LoadInt32(&ddlReorgBatchSize)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down Expand Up @@ -303,6 +320,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBDDLReorgBatchSize:
return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars)
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize,
TiDBIndexLookupSize,
TiDBHashJoinConcurrency,
Expand Down

0 comments on commit d1ff903

Please sign in to comment.