Skip to content

Commit

Permalink
patched ora batch
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianwit@gmail.com authored and adrianwit@gmail.com committed Aug 5, 2019
1 parent 823aab1 commit f7a328f
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type batch struct {
writer *gzip.Writer
values []interface{}
placeholders string
columns string
dataIndexes []int
firstSeq int64
bulkInsertType string
Expand Down Expand Up @@ -80,34 +81,35 @@ func (b *batch) transformFirst(parametrizedSQL *ParametrizedSQL) error {
b.values = parametrizedSQL.Values
fragment := " VALUES"
valuesIndex := strings.Index(parametrizedSQL.SQL, fragment)
if beginIndex := strings.Index(parametrizedSQL.SQL, "("); beginIndex != -1 {
names := string(parametrizedSQL.SQL[beginIndex+1:])
if endIndex := strings.Index(names, ")"); endIndex != -1 {
b.columns = string(names[:endIndex])
}
}
b.placeholders = strings.Trim(strings.TrimSpace(string(parametrizedSQL.SQL[valuesIndex+7:])), "()")
switch b.bulkInsertType {

case CopyLocalInsert:
b.tempDir = b.manager.config.GetString("tempDir", os.TempDir())
if beginIndex := strings.Index(parametrizedSQL.SQL, "("); beginIndex != -1 {
names := string(parametrizedSQL.SQL[beginIndex+1:])
if endIndex := strings.Index(names, ")"); endIndex != -1 {
names = string(names[:endIndex])
file, err := ioutil.TempFile(b.tempDir, "temp")
if err != nil {
return err
}
b.tempFile = file.Name()
b.writer = gzip.NewWriter(file)
if _, err := b.writer.Write([]byte(b.expandedValues(parametrizedSQL))); err != nil {
return err
}
b.sql = fmt.Sprintf(`COPY %v(%v)
if b.columns == "" {
return fmt.Errorf("columns were empty")
}
file, err := ioutil.TempFile(b.tempDir, "temp")
if err != nil {
return err
}
b.tempFile = file.Name()
b.writer = gzip.NewWriter(file)
if _, err := b.writer.Write([]byte(b.expandedValues(parametrizedSQL))); err != nil {
return err
}
b.sql = fmt.Sprintf(`COPY %v(%v)
FROM LOCAL '%v' GZIP
DELIMITER ','
NULL AS 'null'
ENCLOSED BY ''''
`, b.table, names, file.Name())
}
}
b.values = []interface{}{}

`, b.table, b.columns, file.Name())
b.values = make([]interface{}, 0)
case UnionSelectInsert:
valuesIndex := strings.Index(parametrizedSQL.SQL, " VALUES")
selectAll := " SELECT " + b.expandedValues(parametrizedSQL)
Expand All @@ -131,7 +133,7 @@ func (b *batch) transformNext(parametrizedSQL *ParametrizedSQL) error {
case UnionSelectInsert:
b.sql += "\nUNION ALL SELECT " + b.expandedValues(parametrizedSQL)
case BulkInsertAllType:
b.sql += fmt.Sprintf("\nINTO %v(%v)", b.table, b.placeholders)
b.sql += fmt.Sprintf("\nINTO %v(%v) VALUES(%v)", b.table, b.columns, b.placeholders)
b.values = append(b.values, parametrizedSQL.Values...)
default:
b.sql += fmt.Sprintf(",(%v)", b.placeholders)
Expand Down

0 comments on commit f7a328f

Please sign in to comment.