Skip to content

Commit

Permalink
statistics: do not copy and paste the code for saving statistics (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Jul 31, 2024
1 parent 383643f commit 73e3425
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 52 deletions.
78 changes: 26 additions & 52 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,70 +393,44 @@ func (e *AnalyzeExec) handleResultsError(
partitionStatsConcurrency = min(taskNum, partitionStatsConcurrency)
// If partitionStatsConcurrency > 1, we will try to demand extra session from Domain to save Analyze results in concurrency.
// If there is no extra session we can use, we will save analyze results in single-thread.
dom := domain.GetDomain(e.Ctx())
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
if partitionStatsConcurrency > 1 {
dom := domain.GetDomain(e.Ctx())
// FIXME: Since we don't use it either to save analysis results or to store job history, it has no effect. Please remove this :(
subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
warningMessage := "Insufficient sessions to save analyze results. Consider increasing the 'analyze-partition-concurrency-quota' configuration to improve analyze performance. " +
"This value should typically be greater than or equal to the 'tidb_analyze_partition_concurrency' variable."
if len(subSctxs) < partitionStatsConcurrency {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError(warningMessage))
logutil.BgLogger().Warn(
warningMessage,
zap.Int("sessionCount", len(subSctxs)),
zap.Int("needSessionCount", partitionStatsConcurrency),
)
}
if len(subSctxs) > 0 {
sessionCount := len(subSctxs)
logutil.BgLogger().Info("use multiple sessions to save analyze results", zap.Int("sessionCount", sessionCount))
defer func() {
dom.ReleaseAnalyzeExec(subSctxs)
}()
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
return err
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
}
}
logutil.BgLogger().Info("use single session to save analyze results")
failpoint.Inject("handleResultsErrorSingleThreadPanic", nil)
tableIDs := map[int64]struct{}{}

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
panicCnt := 0
for panicCnt < concurrency {
results, ok := <-resultsCh
if !ok {
break
}
if results.Err != nil {
err = results.Err
if isAnalyzeWorkerPanic(err) {
panicCnt++
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLog(e.Ctx(), results.Job, err)
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}

if err1 := statsHandle.SaveTableStatsToStorage(results, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot, handleutil.StatsMetaHistorySourceAnalyze); err1 != nil {
tableID := results.TableID.TableID
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err), zap.Int64("tableID", tableID))
finishJobWithLog(e.Ctx(), results.Job, err)
} else {
finishJobWithLog(e.Ctx(), results.Job, nil)
}
if err := e.Ctx().GetSessionVars().SQLKiller.HandleSignal(); err != nil {
finishJobWithLog(e.Ctx(), results.Job, err)
results.DestroyAndPutToPool()
return err
}
results.DestroyAndPutToPool()
}
// Dump stats to historical storage.
for tableID := range tableIDs {
if err := recordHistoricalStats(e.Ctx(), tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}

return err
subSctxs := []sessionctx.Context{e.Ctx()}
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
}

func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool,
func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
ctx context.Context,
statsConcurrency int,
needGlobalStats bool,
subSctxs []sessionctx.Context,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
globalStatsMap globalStatsMap,
resultsCh <-chan *statistics.AnalyzeResults,
) error {
partitionStatsConcurrency := len(subSctxs)

wg := util.NewWaitGroupPool(e.gp)
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
}()
for results := range worker.resultsCh {
if err := worker.killer.HandleSignal(); err != nil {
finishJobWithLog(worker.sctx, results.Job, err)
results.DestroyAndPutToPool()
worker.errCh <- err
return
}
Expand Down

0 comments on commit 73e3425

Please sign in to comment.