Skip to content

Commit

Permalink
Support parallelism in user-defined functions
Browse files Browse the repository at this point in the history
In GPDB 7+, PostgreSQL can devise query plans which can leverage
multiple CPUs in order to answer queries faster. Support backup of
Parallelism capabilities of user-defined functions.

Co-Authored-by: Kevin Yeap <kyeap@vmware.com>
Co-Authored-by: Marbin Tan
  • Loading branch information
kyeap-vmware authored and AJR-VMware committed Nov 22, 2022
1 parent 04c31a8 commit 08e1f84
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 1 deletion.
11 changes: 11 additions & 0 deletions backup/predata_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ func PrintFunctionModifiers(metadataFile *utils.FileWithByteCount, funcDef Funct
if funcDef.Config != "" {
metadataFile.MustPrintf("\n%s", funcDef.Config)
}

switch funcDef.Parallel {
case "u":
metadataFile.MustPrintf(" PARALLEL UNSAFE")
case "s":
metadataFile.MustPrintf(" PARALLEL SAFE")
case "r":
metadataFile.MustPrintf(" PARALLEL RESTRICTED")
default:
gplog.Fatal(errors.Errorf("unrecognized proparallel value for function %s", funcDef.FQN()), "")
}
}

func PrintCreateAggregateStatement(metadataFile *utils.FileWithByteCount, toc *toc.TOC, aggDef Aggregate, funcInfoMap map[uint32]FunctionInfo, aggMetadata ObjectMetadata) {
Expand Down
23 changes: 23 additions & 0 deletions backup/predata_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,29 @@ $_$`)
backup.PrintFunctionModifiers(backupfile, funcDef)
testhelper.ExpectRegexp(buffer, "SET client_min_messages TO error")
})
Context("Parallel cases", func() {
It("prints 'u' as 'PARALLEL UNSAFE'", func() {
funcDef.Parallel = "u"
backup.PrintFunctionModifiers(backupfile, funcDef)
testhelper.ExpectRegexp(buffer, "PARALLEL UNSAFE")
})
It("prints 's' as 'PARALLEL SAFE'", func() {
funcDef.Parallel = "s"
backup.PrintFunctionModifiers(backupfile, funcDef)
testhelper.ExpectRegexp(buffer, "PARALLEL SAFE")
})
It("prints 'r' as 'PARALLEL RESTRICTED'", func() {
funcDef.Parallel = "r"
backup.PrintFunctionModifiers(backupfile, funcDef)
testhelper.ExpectRegexp(buffer, "PARALLEL RESTRICTED")
})
It("panics is there is an unrecognized parallel value", func() {
defer testhelper.ShouldPanicWithMessage("unrecognized proparallel value for function public.func_name")
funcDef.Parallel = "unknown_value"
backup.PrintFunctionModifiers(backupfile, funcDef)
})
})

})

})
Expand Down
3 changes: 2 additions & 1 deletion backup/queries_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Function struct {
PlannerSupport string `db:"prosupport"` // GPDB 7+
IsWindow bool `db:"proiswindow"` // before 7
ExecLocation string `db:"proexeclocation"`
Parallel string `db:"proparallel"` // GPDB 7+
}

func (f Function) GetMetadataEntry() (string, toc.MetadataEntry) {
Expand Down Expand Up @@ -113,7 +114,7 @@ func GetFunctions(connectionPool *dbconn.DBConn) []Function {
}
var query string
if connectionPool.Version.AtLeast("7") {
masterAtts = "proexeclocation,proleakproof,"
masterAtts = "proexeclocation,proleakproof,proparallel,"
query = fmt.Sprintf(`
SELECT p.oid,
quote_ident(nspname) AS schema,
Expand Down
26 changes: 26 additions & 0 deletions integration/predata_functions_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,32 @@ var _ = Describe("backup integration create statement tests", func() {
structmatcher.ExpectStructsToMatchExcluding(&leakProofFunction, &resultFunctions[0], "Oid")
})
})
Context("Tests for GPDB 7", func() {
BeforeEach(func() {
testutils.SkipIfBefore7(connectionPool)
})
funcMetadata := backup.ObjectMetadata{}
It("creates a function with PARALLEL", func() {
ParallelFunction := backup.Function{
Schema: "public", Name: "add", ReturnsSet: false, FunctionBody: "SELECT $1 + $2",
BinaryPath: "", Arguments: sql.NullString{String: "integer, integer", Valid: true},
IdentArgs: sql.NullString{String: "integer, integer", Valid: true},
ResultType: sql.NullString{String: "integer", Valid: true},
Volatility: "v", IsStrict: false, IsLeakProof: false, IsSecurityDefiner: false, Config: "", Cost: 100, NumRows: 0, DataAccess: "c",
Language: "sql", IsWindow: false, ExecLocation: "a", PlannerSupport: "-", Kind: "f", Parallel: "u",
}

backup.PrintCreateFunctionStatement(backupfile, tocfile, ParallelFunction, funcMetadata)

testhelper.AssertQueryRuns(connectionPool, buffer.String())
defer testhelper.AssertQueryRuns(connectionPool, "DROP FUNCTION public.add(integer, integer)")

resultFunctions := backup.GetFunctionsAllVersions(connectionPool)

Expect(resultFunctions).To(HaveLen(1))
structmatcher.ExpectStructsToMatchExcluding(&ParallelFunction, &resultFunctions[0], "Oid")
})
})
})
Describe("PrintCreateAggregateStatement", func() {
emptyMetadata := backup.ObjectMetadata{}
Expand Down

0 comments on commit 08e1f84

Please sign in to comment.