Skip to content

Commit

Permalink
Revert "Add granular COPY progress reporting for GPDB 7"
Browse files Browse the repository at this point in the history
Commit 6f71685 adds the GPDB7 feature where using --jobs will show N
number of progress bars with the progress of the table each job is
currently backing up. Unfortunately this commit also introduces the
issue where gpbackup will halt when run in the background using &.
e.g. `gpbackup --dbname test &`.

The halt happens when the progress bar library gopkg.in/cheggaaa/pb
starts printing a pool of progress bars, it will issue a system command
to turn off terminal echoing. The default behavior when a background
process tries to set a terminal mode is to halt the process [1].

Also included in this commit is a fix is a one-line fix to a --data-only
restore bug that surfaced as a result of reverting "granular progress
bars". Sequence data restore is a special case because the value of a
sequence is set using metadata sql statements. With the introduction of
granular progress bars. The number of default database connections was
changed from 1 to 2, (conn0 and conn1). This is because granular
progress bars requires an additional connection to continuously poll for
progress of table backup/restore. Because of this, parallel restore is
built with the assumption that conn0 is reserved for this polling. When
assigning statements to restore, parallel logic will not assign any
metadata statements be restored with conn0. When reverting granular
progress bars, the default number of connections came back down to 1 and
this causes restore to fail because sequence data restore statements
were being assigned to conn1, which now no longer exists.

The following gpbackup commits are reverted:
6f71685 - Add granular COPY progress reporting for GPDB 7
greenplum-db/gpbackup@6f716850

08339c0 - Fix unfinished copy progress bars
greenplum-db/gpbackup@08339c04

[1] https://www.gnu.org/software/libc/manual/html_node/Job-Control-Signals.html
  • Loading branch information
kyeap-vmware committed Feb 2, 2024
1 parent 8a02587 commit 80c4931
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 554 deletions.
92 changes: 21 additions & 71 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
"github.com/greenplum-db/gpbackup/options"
"github.com/greenplum-db/gpbackup/utils"
"github.com/jackc/pgconn"
"gopkg.in/cheggaaa/pb.v1"
)

var (
tableDelim = ","
progressBars *utils.MultiProgressBar
tableDelim = ","
)

func ConstructTableAttributesList(columnDefs []ColumnDefinition) string {
Expand Down Expand Up @@ -56,6 +56,12 @@ func AddTableDataEntriesToTOC(tables []Table, rowsCopiedMaps []map[uint32]int64)
}
}

type BackupProgressCounters struct {
NumRegTables int64
TotalRegTables int64
ProgressBar utils.ProgressBar
}

func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) {
checkPipeExistsCommand := ""
customPipeThroughCommand := utils.GetPipeThroughProgram().OutputCommand
Expand Down Expand Up @@ -83,70 +89,19 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite

query := fmt.Sprintf("COPY %s%s TO %s WITH CSV DELIMITER '%s' ON SEGMENT IGNORE EXTERNAL PARTITIONS;", table.FQN(), columnNames, copyCommand, tableDelim)
gplog.Verbose("Worker %d: %s", connNum, query)

var done chan bool
var trackerTimer *time.Timer
var endTime time.Time
shouldTrackProgress := connectionPool.Version.AtLeast("7") && progressBars != nil

var tableNum int
if shouldTrackProgress {
// A true on this channel means the COPY succeeded. We explicitly signal this on a success, instead
// of just closing the channel, to allow the progress goroutine to handle cleanup properly based on
// success or failure. We don't give the channel a size because we want the send to block, so that
// we don't stop printing any progress bars before all goroutines are done and possibly make it look
// like we failed to back up some number of tuples.
done = make(chan bool, 1)
defer close(done)

// Normally, we pass connNum-1 for tableNum because we index into progressBars.TuplesBars using
// that value (progressBars.TablesBar effectively corresponds to connection 0, so it's off by 1)
// and use connection 0 to query the copy progress table; however, when dealing with deferred
// tables, those *must* use connection 0, so we use connection 1 for the query in that case.
whichConn := 0
tableNum = connNum - 1
if connNum == 0 {
whichConn = 1
tableNum = 0
}

// To prevent undue overhead when backing up many small tables, we only begin tracking
// progress after 5 seconds have elapsed. If the copy finishes before then, we stop the
// timer and move on.
trackerDelay := 5 * time.Second
trackerTimer = time.AfterFunc(trackerDelay, func() {
progressBars.TrackCopyProgress(table.FQN(), table.Oid, -1, connectionPool, whichConn, tableNum, done)
})
endTime = time.Now().Add(trackerDelay)
}

result, err := connectionPool.Exec(query, connNum)
if err != nil {
return 0, err
}
numRows, _ := result.RowsAffected()

if shouldTrackProgress {
if time.Now().Before(endTime) {
trackerTimer.Stop()
}
// send signal to channel whether tracking or not, just to avoid race condition weirdness
done <- true

// Manually set the progress to maximum if COPY succeeded, as we won't be able to get the last few tuples
// from the view (or any tuples, for especially small tables) and we don't want users to worry that any
// tuples were missed.
progressBar := progressBars.TuplesBars[tableNum]
progressBar.Set(progressBars.TuplesCounts[tableNum])
}
return numRows, nil
}

func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, progressBars *utils.MultiProgressBar, whichConn int) error {
func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error {
logMessage := fmt.Sprintf("Worker %d: Writing data for table %s to file", whichConn, table.FQN())
// Avoid race condition by incrementing progressBars in call to sprintf
current := progressBars.TablesBar.GetBar().Get()
tableCount := fmt.Sprintf(" (table %d of %d)", atomic.AddInt64(&current, 1), progressBars.TablesBar.GetBar().Total)
// Avoid race condition by incrementing counters in call to sprintf
tableCount := fmt.Sprintf(" (table %d of %d)", atomic.AddInt64(&counters.NumRegTables, 1), counters.TotalRegTables)
if gplog.GetVerbosity() > gplog.LOGINFO {
// No progress bar at this log level, so we note table count here
gplog.Verbose(logMessage + tableCount)
Expand All @@ -165,7 +120,7 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, progress
return err
}
rowsCopiedMap[table.Oid] = rowsCopied
progressBars.TablesBar.Increment()
counters.ProgressBar.Increment()
return nil
}

Expand All @@ -184,15 +139,9 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, progress
* another without, then extract common portions into their own functions.
*/
func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
numTuplesBars := 0
if connectionPool.Version.AtLeast("7") {
numTuplesBars = connectionPool.NumConns - 1
}
progressBars = utils.NewMultiProgressBar(len(tables), "Tables backed up: ", numTuplesBars, MustGetFlagBool(options.VERBOSE))
defer progressBars.Finish()
err := progressBars.Start()
gplog.FatalOnError(err)

counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))}
counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO)
counters.ProgressBar.Start()
rowsCopiedMaps := make([]map[uint32]int64, connectionPool.NumConns)
/*
* We break when an interrupt is received and rely on
Expand Down Expand Up @@ -236,7 +185,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
*/
for table := range tasks {
if wasTerminated || isErroredBackup.Load() {
progressBars.TablesBar.GetBar().NotPrint = true
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
return
}
if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil {
Expand Down Expand Up @@ -284,11 +233,11 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
break
}
}
err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], progressBars, whichConn)
err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn)
if err != nil {
// if copy isn't working, skip remaining backups, and let downstream panic
// handling deal with it
progressBars.TablesBar.GetBar().NotPrint = true
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
isErroredBackup.Store(true)
gplog.Fatal(err, "")
} else {
Expand Down Expand Up @@ -324,7 +273,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
if state.(int) == Unknown {
time.Sleep(time.Millisecond * 50)
} else if state.(int) == Deferred {
err := BackupSingleTableData(table, rowsCopiedMaps[0], progressBars, 0)
err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0)
if err != nil {
isErroredBackup.Store(true)
gplog.Fatal(err, "")
Expand Down Expand Up @@ -358,7 +307,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
allWorkersTerminatedLogged := false
for _, table := range tables {
if wasTerminated || isErroredBackup.Load() {
progressBars.TablesBar.GetBar().NotPrint = true
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
break
}
state, _ := oidMap.Load(table.Oid)
Expand All @@ -379,6 +328,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
gplog.Fatal(agentErr, "")
}

counters.ProgressBar.Finish()
return rowsCopiedMaps
}

Expand Down
16 changes: 10 additions & 6 deletions backup/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/greenplum-db/gpbackup/report"
"github.com/greenplum-db/gpbackup/toc"
"github.com/greenplum-db/gpbackup/utils"
"gopkg.in/cheggaaa/pb.v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -156,7 +157,7 @@ var _ = Describe("backup/data tests", func() {
var (
testTable backup.Table
rowsCopiedMap map[uint32]int64
progressBars *utils.MultiProgressBar
counters backup.BackupProgressCounters
copyFmtStr = "COPY(.*)%s(.*)"
)
BeforeEach(func() {
Expand All @@ -166,31 +167,34 @@ var _ = Describe("backup/data tests", func() {
}
_ = cmdFlags.Set(options.SINGLE_DATA_FILE, "false")
rowsCopiedMap = make(map[uint32]int64)
progressBars = utils.NewMultiProgressBar(0, "", 0, false)
counters = backup.BackupProgressCounters{NumRegTables: 0, TotalRegTables: 1}
counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO)
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
counters.ProgressBar.Start()
})
It("backs up a single regular table with single data file", func() {
_ = cmdFlags.Set(options.SINGLE_DATA_FILE, "true")

backupFile := fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_20170101010101_pipe_(.*)_%d", testTable.Oid)
copyCmd := fmt.Sprintf(copyFmtStr, backupFile)
mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10))
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, progressBars, 0)
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0)

Expect(err).ShouldNot(HaveOccurred())
Expect(rowsCopiedMap[0]).To(Equal(int64(10)))
Expect(progressBars.TablesBar.GetBar().Get()).To(Equal(int64(1)))
Expect(counters.NumRegTables).To(Equal(int64(1)))
})
It("backs up a single regular table without a single data file", func() {
_ = cmdFlags.Set(options.SINGLE_DATA_FILE, "false")

backupFile := fmt.Sprintf("<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_%d", testTable.Oid)
copyCmd := fmt.Sprintf(copyFmtStr, backupFile)
mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10))
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, progressBars, 0)
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0)

Expect(err).ShouldNot(HaveOccurred())
Expect(rowsCopiedMap[0]).To(Equal(int64(10)))
Expect(progressBars.TablesBar.GetBar().Get()).To(Equal(int64(1)))
Expect(counters.NumRegTables).To(Equal(int64(1)))
})
})
Describe("GetBackupDataSet", func() {
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI=
github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/nightlyone/lockfile v1.0.0 h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAmxBiA=
github.com/nightlyone/lockfile v1.0.0/go.mod h1:rywoIealpdNse2r832aiD9jRk8ErCatROs6LzC841CI=
github.com/onsi/ginkgo/v2 v2.8.4 h1:gf5mIQ8cLFieruNLAdgijHF1PYfLphKm2dxxcUtcqK0=
Expand Down
50 changes: 11 additions & 39 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/greenplum-db/gp-common-go-libs/cluster"
"github.com/greenplum-db/gp-common-go-libs/dbconn"
Expand All @@ -19,13 +18,14 @@ import (
"github.com/greenplum-db/gpbackup/utils"
"github.com/jackc/pgconn"
"github.com/pkg/errors"
"gopkg.in/cheggaaa/pb.v1"
)

var (
tableDelim = ","
)

func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, entry toc.CoordinatorDataEntry, destinationToRead string, singleDataFile bool, whichConn int, dataProgressBar *utils.MultiProgressBar) (int64, error) {
func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) {
whichConn = connectionPool.ValidateConnNum(whichConn)
copyCommand := ""
readFromDestinationCommand := "cat"
Expand All @@ -41,22 +41,7 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, entry toc.Coor

copyCommand = fmt.Sprintf("PROGRAM '%s %s | %s'", readFromDestinationCommand, destinationToRead, customPipeThroughCommand)

query := fmt.Sprintf("COPY %s%s FROM %s WITH CSV DELIMITER '%s' ON SEGMENT;", tableName, entry.AttributeString, copyCommand, tableDelim)

var done chan bool
var trackerTimer *time.Timer
var endTime time.Time
shouldTrackProgress := connectionPool.Version.AtLeast("7") && dataProgressBar != nil

if shouldTrackProgress {
done = make(chan bool, 1)
defer close(done)
trackerDelay := 5 * time.Second
trackerTimer = time.AfterFunc(trackerDelay, func() {
dataProgressBar.TrackCopyProgress(tableName, 0, int(entry.RowsCopied), connectionPool, 0, whichConn-1, done)
})
endTime = time.Now().Add(trackerDelay)
}
query := fmt.Sprintf("COPY %s%s FROM %s WITH CSV DELIMITER '%s' ON SEGMENT;", tableName, tableAttributes, copyCommand, tableDelim)

var numRows int64
var err error
Expand Down Expand Up @@ -91,23 +76,10 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, entry toc.Coor
numRows += rowsLoaded
}

if shouldTrackProgress {
if time.Now().Before(endTime) {
trackerTimer.Stop()
}
// send signal to channel whether tracking or not, just to avoid race condition weirdness
done <- true

// Manually set the progress to maximum if COPY succeeded, as we won't be able to get the last few tuples
// from the view (or any tuples, for especially small tables) and we don't want users to worry that any
// tuples were missed.
progressBar := dataProgressBar.TuplesBars[whichConn-1]
progressBar.Set(dataProgressBar.TuplesCounts[whichConn-1])
}
return numRows, err
}

func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int, origSize int, destSize int, dataProgressBar *utils.MultiProgressBar) error {
func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int, origSize int, destSize int) error {
resizeCluster := MustGetFlagBool(options.RESIZE_CLUSTER)
destinationToRead := ""
if backupConfig.SingleDataFile || resizeCluster {
Expand All @@ -116,7 +88,7 @@ func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.Coordinator
destinationToRead = fpInfo.GetTableBackupFilePathForCopyCommand(entry.Oid, utils.GetPipeThroughProgram().Extension, backupConfig.SingleDataFile)
}
gplog.Debug("Reading from %s", destinationToRead)
numRowsRestored, err := CopyTableIn(connectionPool, tableName, entry, destinationToRead, backupConfig.SingleDataFile, whichConn, dataProgressBar)
numRowsRestored, err := CopyTableIn(connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn)
if err != nil {
return err
}
Expand Down Expand Up @@ -186,7 +158,7 @@ func RedistributeTableData(tableName string, whichConn int) error {
}

func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.CoordinatorDataEntry,
gucStatements []toc.StatementWithType, dataProgressBar *utils.MultiProgressBar) int32 {
gucStatements []toc.StatementWithType, dataProgressBar utils.ProgressBar) int32 {
totalTables := len(dataEntries)
if totalTables == 0 {
gplog.Verbose("No data to restore for timestamp = %s", fpInfo.Timestamp)
Expand Down Expand Up @@ -243,7 +215,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
var mutex = &sync.Mutex{}
panicChan := make(chan error)

for i := 1; i < connectionPool.NumConns; i++ {
for i := 0; i < connectionPool.NumConns; i++ {
workerPool.Add(1)
go func(whichConn int) {
defer func() {
Expand All @@ -256,7 +228,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
setGUCsForConnection(gucStatements, whichConn)
for entry := range tasks {
if wasTerminated {
dataProgressBar.TablesBar.GetBar().NotPrint = true
dataProgressBar.(*pb.ProgressBar).NotPrint = true
return
}
tableName := utils.MakeFQN(entry.Schema, entry.Name)
Expand All @@ -269,7 +241,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
err = TruncateTable(tableName, whichConn)
}
if err == nil {
err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn, origSize, destSize, dataProgressBar)
err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn, origSize, destSize)

if gplog.GetVerbosity() > gplog.LOGINFO {
// No progress bar at this log level, so we note table count here
Expand All @@ -283,7 +255,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
gplog.Error(err.Error())
atomic.AddInt32(&numErrors, 1)
if !MustGetFlagBool(options.ON_ERROR_CONTINUE) {
dataProgressBar.TablesBar.GetBar().NotPrint = true
dataProgressBar.(*pb.ProgressBar).NotPrint = true
return
} else if connectionPool.Version.AtLeast("6") && backupConfig.SingleDataFile {
// inform segment helpers to skip this entry
Expand All @@ -302,7 +274,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
}
}

dataProgressBar.TablesBar.Increment()
dataProgressBar.Increment()
}
}(i)
}
Expand Down
Loading

0 comments on commit 80c4931

Please sign in to comment.