Skip to content

Commit

Permalink
ATTACH partition indexes to its parent index
Browse files Browse the repository at this point in the history
In GPDB 7+, partition tables have been heavily changed. Along with the
partition tables themselves, the partition indexes are also now
structured differently. We now must attach partition indexes to parent
partition index. Topological sorting of indexes is done to ensure
that a parent index is printed before its child partition indexes.

Co-authored-by: Kevin Yeap <kyeap@vmware.com>
Co-authored-by: Andrew Repp <reppa@vmware.com>
  • Loading branch information
kyeap-vmware and AJR-VMware committed Nov 22, 2022
1 parent c314ea4 commit e28433d
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 21 deletions.
10 changes: 7 additions & 3 deletions backup/postdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ func PrintCreateIndexStatements(metadataFile *utils.FileWithByteCount, toc *toc.
toc.AddMetadataEntry(section, entry, start, metadataFile.ByteCount)

// Start INDEX metadata
indexFQN := utils.MakeFQN(index.OwningSchema, index.Name)
entry.ReferenceObject = indexFQN
entry.ReferenceObject = index.FQN()
entry.ObjectType = "INDEX METADATA"
if index.Tablespace != "" {
start := metadataFile.ByteCount
metadataFile.MustPrintf("\nALTER INDEX %s SET TABLESPACE %s;", indexFQN, index.Tablespace)
metadataFile.MustPrintf("\nALTER INDEX %s SET TABLESPACE %s;", index.FQN(), index.Tablespace)
toc.AddMetadataEntry(section, entry, start, metadataFile.ByteCount)
}
if index.ParentIndexFQN != "" && connectionPool.Version.AtLeast("7") {
start := metadataFile.ByteCount
metadataFile.MustPrintf("\nALTER INDEX %s ATTACH PARTITION %s;", index.ParentIndexFQN, index.FQN())
toc.AddMetadataEntry(section, entry, start, metadataFile.ByteCount)
}
tableFQN := utils.MakeFQN(index.OwningSchema, index.OwningTable)
Expand Down
15 changes: 15 additions & 0 deletions backup/postdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ var _ = Describe("backup/postdata tests", func() {
"ALTER TABLE public.testtable REPLICA IDENTITY USING INDEX testindex;",
)
})
It("can print an index of a partition that is attached to a parent partition index", func() {
testutils.SkipIfBefore7(connectionPool)
parentIndex := backup.IndexDefinition{Oid: 2, Name: "parenttestindex", OwningSchema: "public", OwningTable: "parenttesttable", Def: sql.NullString{String: "CREATE INDEX parenttestindex ON public.parenttesttable USING btree(i)", Valid: true}}
index.ParentIndexFQN = "public.parenttestindex"
indexes := []backup.IndexDefinition{parentIndex, index}
backup.PrintCreateIndexStatements(backupfile, tocfile, indexes, emptyMetadataMap)
testutils.ExpectEntry(tocfile.PostdataEntries, 0, "public", "public.parenttesttable", "parenttestindex", "INDEX")
testutils.ExpectEntry(tocfile.PostdataEntries, 1, "public", "public.testtable", "testindex", "INDEX")
testutils.ExpectEntry(tocfile.PostdataEntries, 2, "public", "public.testtable", "testindex", "INDEX")
testutils.AssertBufferContents(tocfile.PostdataEntries, buffer,
"CREATE INDEX parenttestindex ON public.parenttesttable USING btree(i);",
"CREATE INDEX testindex ON public.testtable USING btree(i);",
"ALTER INDEX public.parenttestindex ATTACH PARTITION public.testindex;",
)
})
})
Context("PrintCreateRuleStatements", func() {
rule := backup.RuleDefinition{Oid: 1, Name: "testrule", OwningSchema: "public", OwningTable: "testtable", Def: sql.NullString{String: "CREATE RULE update_notify AS ON UPDATE TO testtable DO NOTIFY testtable;", Valid: true}}
Expand Down
108 changes: 90 additions & 18 deletions backup/queries_postdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type IndexDefinition struct {
IsReplicaIdentity bool
StatisticsColumns string
StatisticsValues string
ParentIndex uint32
ParentIndexFQN string
}

func (i IndexDefinition) GetMetadataEntry() (string, toc.MetadataEntry) {
Expand Down Expand Up @@ -79,7 +81,6 @@ func (i IndexDefinition) FQN() string {
* e.g. comments on implicitly created indexes
*/
func GetIndexes(connectionPool *dbconn.DBConn) []IndexDefinition {

var query string
if connectionPool.Version.Before("6") {
indexOidList := ConstructImplicitIndexOidList(connectionPool)
Expand Down Expand Up @@ -111,13 +112,7 @@ func GetIndexes(connectionPool *dbconn.DBConn) []IndexDefinition {
ORDER BY name`,
implicitIndexStr, relationAndSchemaFilterClause(), ExtensionFilterClause("c"))

} else {
// TODO: fix for gpdb7 partitioning
partitionRuleExcludeClause := ""
if connectionPool.Version.Before("7") {
partitionRuleExcludeClause = "AND NOT EXISTS (SELECT 1 FROM pg_partition_rule r WHERE r.parchildrelid = c.oid)"
}

} else if connectionPool.Version.Is("6") {
query = fmt.Sprintf(`
SELECT DISTINCT i.indexrelid AS oid,
quote_ident(ic.relname) AS name,
Expand All @@ -130,9 +125,7 @@ func GetIndexes(connectionPool *dbconn.DBConn) []IndexDefinition {
CASE
WHEN conindid > 0 THEN 't'
ELSE 'f'
END as supportsconstraint,
coalesce(array_to_string((SELECT pg_catalog.array_agg(attnum ORDER BY attnum) FROM pg_catalog.pg_attribute WHERE attrelid = i.indexrelid AND attstattarget >= 0), ','), '') as statisticscolumns,
coalesce(array_to_string((SELECT pg_catalog.array_agg(attstattarget ORDER BY attnum) FROM pg_catalog.pg_attribute WHERE attrelid = i.indexrelid AND attstattarget >= 0), ','), '') as statisticsvalues
END as supportsconstraint
FROM pg_index i
JOIN pg_class ic ON ic.oid = i.indexrelid
JOIN pg_namespace n ON ic.relnamespace = n.oid
Expand All @@ -143,29 +136,108 @@ func GetIndexes(connectionPool *dbconn.DBConn) []IndexDefinition {
AND i.indisvalid
AND i.indisready
AND i.indisprimary = 'f'
%s
AND NOT EXISTS (SELECT 1 FROM pg_partition_rule r WHERE r.parchildrelid = c.oid)
AND %s
ORDER BY name`,
relationAndSchemaFilterClause(), partitionRuleExcludeClause, ExtensionFilterClause("c")) // The index itself does not have a dependency on the extension, but the index's table does
relationAndSchemaFilterClause(), ExtensionFilterClause("c")) // The index itself does not have a dependency on the extension, but the index's table does

} else {
query = fmt.Sprintf(`
SELECT DISTINCT i.indexrelid AS oid,
coalesce(inh.inhparent, '0') AS parentindex,
quote_ident(ic.relname) AS name,
quote_ident(n.nspname) AS owningschema,
quote_ident(c.relname) AS owningtable,
coalesce(quote_ident(s.spcname), '') AS tablespace,
pg_get_indexdef(i.indexrelid) AS def,
i.indisclustered AS isclustered,
i.indisreplident AS isreplicaidentity,
CASE
WHEN conindid > 0 THEN 't'
ELSE 'f'
END as supportsconstraint,
coalesce(array_to_string((SELECT pg_catalog.array_agg(attnum ORDER BY attnum) FROM pg_catalog.pg_attribute WHERE attrelid = i.indexrelid AND attstattarget >= 0), ','), '') as statisticscolumns,
coalesce(array_to_string((SELECT pg_catalog.array_agg(attstattarget ORDER BY attnum) FROM pg_catalog.pg_attribute WHERE attrelid = i.indexrelid AND attstattarget >= 0), ','), '') as statisticsvalues
FROM pg_index i
JOIN pg_class ic ON ic.oid = i.indexrelid
JOIN pg_namespace n ON ic.relnamespace = n.oid
JOIN pg_class c ON c.oid = i.indrelid
LEFT JOIN pg_tablespace s ON ic.reltablespace = s.oid
LEFT JOIN pg_constraint con ON i.indexrelid = con.conindid
LEFT JOIN pg_catalog.pg_inherits inh ON inh.inhrelid = i.indexrelid
WHERE %s
AND i.indisvalid
AND i.indisready
AND i.indisprimary = 'f'
AND i.indexrelid >= %d
AND %s
ORDER BY name`,
relationAndSchemaFilterClause(), FIRST_NORMAL_OBJECT_ID, ExtensionFilterClause("c"))
}

resultIndexes := make([]IndexDefinition, 0)
err := connectionPool.Select(&resultIndexes, query)
gplog.FatalOnError(err)

// Remove all indexes that have NULL definitions. This can happen
// if a concurrent index drop happens before the associated table
// lock is acquired earlier during gpbackup execution.
verifiedResultIndexes := make([]IndexDefinition, 0)
for _, resultIndex := range resultIndexes {
if resultIndex.Def.Valid {
verifiedResultIndexes = append(verifiedResultIndexes, resultIndex)
indexMap := make(map[uint32]IndexDefinition, 0)
for _, index := range resultIndexes {
if index.Def.Valid {
verifiedResultIndexes = append(verifiedResultIndexes, index)
if connectionPool.Version.AtLeast("7") {
indexMap[index.Oid] = index // hash index for topological sort
}
} else {
gplog.Warn("Index '%s' on table '%s.%s' not backed up, most likely dropped after gpbackup had begun.",
resultIndex.Name, resultIndex.OwningSchema, resultIndex.OwningTable)
index.Name, index.OwningSchema, index.OwningTable)
}
}

if connectionPool.Version.Before("7") {
return verifiedResultIndexes
}

// Since GPDB 7+ partition indexes can now be ALTERED to attach to a parent
// index. Topological sort indexes to ensure parent indexes are printed
// before their child indexes.
visited := make(map[uint32]struct{})
sortedIndexes := make([]IndexDefinition, 0)
stack := make([]uint32, 0)
var seen struct{}
for _, index := range verifiedResultIndexes {
currIndex := index
// Depth-first search loop. Store visited indexes to a stack
for {
if _, indexWasVisited := visited[currIndex.Oid]; indexWasVisited {
break // exit DFS if a visited index is found.
}

stack = append(stack, currIndex.Oid)
visited[currIndex.Oid] = seen
if currIndex.ParentIndex == 0 {
break // exit DFS if index has no parent.
} else {
currIndex = indexMap[currIndex.ParentIndex]
}
}

// "Pop" indexes found by DFS
for i := len(stack) - 1; i >= 0; i-- {
indexOid := stack[i]
popIndex := indexMap[indexOid]
if popIndex.ParentIndex != 0 {
// Preprocess parent index FQN for GPDB 7+ partition indexes
popIndex.ParentIndexFQN = indexMap[popIndex.ParentIndex].FQN()
}
sortedIndexes = append(sortedIndexes, popIndex)
}
stack = stack[:0] // empty slice but keep memory allocation
}

return verifiedResultIndexes
return sortedIndexes
}

type RuleDefinition struct {
Expand Down
24 changes: 24 additions & 0 deletions integration/postdata_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,30 @@ var _ = Describe("backup integration create statement tests", func() {
Expect(resultIndexes).To(HaveLen(1))
structmatcher.ExpectStructsToMatchExcluding(&resultIndexes[0], &indexes[0], "Oid")
})
It("creates a parition index and attaches it to the parent index", func() {
testutils.SkipIfBefore7(connectionPool)

testhelper.AssertQueryRuns(connectionPool, "CREATE TABLE public.foopart_new (a integer, b integer) PARTITION BY RANGE (b) DISTRIBUTED BY (a)")
defer testhelper.AssertQueryRuns(connectionPool, "DROP TABLE public.foopart_new")
testhelper.AssertQueryRuns(connectionPool, "CREATE TABLE public.foopart_new_p1 (a integer, b integer) DISTRIBUTED BY (a)")
testhelper.AssertQueryRuns(connectionPool, "ALTER TABLE ONLY public.foopart_new ATTACH PARTITION public.foopart_new_p1 FOR VALUES FROM (0) TO (1)")
testhelper.AssertQueryRuns(connectionPool, "CREATE INDEX fooidx ON ONLY public.foopart_new USING btree (b)")

partitionIndex := backup.IndexDefinition{Oid: 0, Name: "foopart_new_p1_b_idx", OwningSchema: "public", OwningTable: "foopart_new_p1", Def: sql.NullString{String: "CREATE INDEX foopart_new_p1_b_idx ON public.foopart_new_p1 USING btree (b)", Valid: true}, ParentIndexFQN: "public.fooidx"}

indexes := []backup.IndexDefinition{partitionIndex}
backup.PrintCreateIndexStatements(backupfile, tocfile, indexes, indexMetadataMap)

testhelper.AssertQueryRuns(connectionPool, buffer.String())
partitionIndex.Oid = testutils.OidFromObjectName(connectionPool, "", "foopart_new_p1_b_idx", backup.TYPE_INDEX)
partitionIndex.ParentIndex = testutils.OidFromObjectName(connectionPool, "", "fooidx", backup.TYPE_INDEX)

resultIndexes := backup.GetIndexes(connectionPool)
Expect(resultIndexes).To(HaveLen(2))
resultIndex := resultIndexes[1]

structmatcher.ExpectStructsToMatch(&resultIndex, &partitionIndex)
})
})
Describe("PrintCreateRuleStatements", func() {
var (
Expand Down
23 changes: 23 additions & 0 deletions integration/postdata_queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,29 @@ PARTITION BY RANGE (date)

structmatcher.ExpectStructsToMatchExcluding(&index1, &results[0], "Oid")
})
It("returns a sorted slice of partition indexes ", func() {
testutils.SkipIfBefore7(connectionPool)
testhelper.AssertQueryRuns(connectionPool, "CREATE TABLE public.foopart_new (a integer, b integer) PARTITION BY RANGE (b) DISTRIBUTED BY (a)")
defer testhelper.AssertQueryRuns(connectionPool, "DROP TABLE public.foopart_new")
testhelper.AssertQueryRuns(connectionPool, "CREATE TABLE public.foopart_new_p1 (a integer, b integer) DISTRIBUTED BY (a); ALTER TABLE ONLY public.foopart_new ATTACH PARTITION public.foopart_new_p1 FOR VALUES FROM (0) TO (1);")
testhelper.AssertQueryRuns(connectionPool, "CREATE INDEX fooidx ON ONLY public.foopart_new USING btree (b)")
testhelper.AssertQueryRuns(connectionPool, "CREATE INDEX foopart_new_p1_b_idx ON public.foopart_new_p1 USING btree (b)")
testhelper.AssertQueryRuns(connectionPool, "ALTER INDEX public.fooidx ATTACH PARTITION public.foopart_new_p1_b_idx;")

index0 := backup.IndexDefinition{Oid: 0, Name: "fooidx", OwningSchema: "public", OwningTable: "foopart_new", Def: sql.NullString{String: "CREATE INDEX fooidx ON ONLY public.foopart_new USING btree (b)", Valid: true}}
index1 := backup.IndexDefinition{Oid: 0, Name: "foopart_new_p1_b_idx", OwningSchema: "public", OwningTable: "foopart_new_p1", Def: sql.NullString{String: "CREATE INDEX foopart_new_p1_b_idx ON public.foopart_new_p1 USING btree (b)", Valid: true}, ParentIndexFQN: "public.fooidx"}
index0.Oid = testutils.OidFromObjectName(connectionPool, "", "fooidx", backup.TYPE_INDEX)
index1.Oid = testutils.OidFromObjectName(connectionPool, "", "foopart_new_p1_b_idx", backup.TYPE_INDEX)
index1.ParentIndex = index0.Oid

results := backup.GetIndexes(connectionPool)

Expect(results).To(HaveLen(2))

structmatcher.ExpectStructsToMatchExcluding(&index0, &results[0])
structmatcher.ExpectStructsToMatchExcluding(&index1, &results[1])
})

})
Describe("GetRules", func() {
var (
Expand Down

0 comments on commit e28433d

Please sign in to comment.