Skip to content

Commit

Permalink
client/tso: fix the bug that TSO may hang when switching mode (#7937)
Browse files Browse the repository at this point in the history
ref #7849

Because the old `(*tsoClient).dispatchRequest` only checked the request's context,
this could potentially cause a `tsoRequest` to be sent to a batch channel that has already
finished `revokePendingTokenRequest`, thus being left there forever without returning.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Mar 19, 2024
1 parent f06401e commit d3b94c9
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 21 deletions.
39 changes: 29 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,23 +782,42 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
req.start = time.Now()
req.dcLocation = dcLocation

if tsoClient == nil {
req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
return req
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.done <- err
}
return req
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
req.done <- err
func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
var (
retryable bool
err error
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
if i > 0 {
time.Sleep(dispatchRetryDelay)
}
// Get the tsoClient each time, as it may be initialized or switched during the process.
tsoClient := c.getTSOClient()
if tsoClient == nil {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return req
return err
}

func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
Expand Down
2 changes: 1 addition & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type tsoClient struct {

// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding dc-location TSO channel.
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
Expand Down
27 changes: 18 additions & 9 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,31 @@ func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() {
}
}

func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, request *tsoRequest) error {
dispatcher, ok := c.tsoDispatcher.Load(dcLocation)
func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) {
dispatcher, ok := c.tsoDispatcher.Load(request.dcLocation)
if !ok {
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation))
log.Error("[tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err))
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation))
log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
return err
// New dispatcher could be created in the meantime, which is retryable.
return true, err
}

defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End()
select {
case <-ctx.Done():
return ctx.Err()
case dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request:
case <-request.requestCtx.Done():
// Caller cancelled the request, no need to retry.
return false, request.requestCtx.Err()
case <-request.clientCtx.Done():
// Client is closed, no need to retry.
return false, request.clientCtx.Err()
case <-c.ctx.Done():
// tsoClient is closed due to the PD service mode switch, which is retryable.
return true, c.ctx.Err()
default:
dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request
}
return nil
return false, nil
}

// TSFuture is a future which promises to return a TSO.
Expand Down
16 changes: 15 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,9 +1062,23 @@ func TestCloseClient(t *testing.T) {
defer cluster.Destroy()
endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
cli.GetTSAsync(context.TODO())
ts := cli.GetTSAsync(context.TODO())
time.Sleep(time.Second)
cli.Close()
physical, logical, err := ts.Wait()
if err == nil {
re.Greater(physical, int64(0))
re.Greater(logical, int64(0))
} else {
re.ErrorIs(err, context.Canceled)
re.Zero(physical)
re.Zero(logical)
}
ts = cli.GetTSAsync(context.TODO())
physical, logical, err = ts.Wait()
re.ErrorIs(err, context.Canceled)
re.Zero(physical)
re.Zero(logical)
}

type idAllocator struct {
Expand Down

0 comments on commit d3b94c9

Please sign in to comment.