-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
109 changed files
with
7,361 additions
and
643 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
#!/bin/bash | ||
|
||
export PGHOST=localhost | ||
export PGPORT=5432 | ||
export PGUSER=postgres | ||
export PGPASSWORD= | ||
export PGDATABASE=postgres | ||
export TEMPLATEDB=template0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package pgutil | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
) | ||
|
||
type BatchInserter struct { | ||
db DB | ||
numColumns int | ||
maxBatchSize int | ||
maxCapacity int | ||
queryBuilder *batchQueryBuilder | ||
returningScanner ScanFunc | ||
values []any | ||
} | ||
|
||
const maxNumPostgresParameters = 65535 | ||
|
||
func NewBatchInserter(db DB, tableName string, columnNames []string, configs ...BatchInserterConfigFunc) *BatchInserter { | ||
var ( | ||
options = getBatchInserterOptions(configs) | ||
numColumns = len(columnNames) | ||
maxBatchSize = int(maxNumPostgresParameters/numColumns) * numColumns | ||
maxCapacity = maxBatchSize + numColumns | ||
queryBuilder = newBatchQueryBuilder(tableName, columnNames, options.onConflictClause, options.returningClause) | ||
returningScanner = options.returningScanner | ||
) | ||
|
||
return &BatchInserter{ | ||
db: db, | ||
numColumns: numColumns, | ||
maxBatchSize: maxBatchSize, | ||
maxCapacity: maxCapacity, | ||
queryBuilder: queryBuilder, | ||
returningScanner: returningScanner, | ||
values: make([]any, 0, maxCapacity), | ||
} | ||
} | ||
|
||
func (i *BatchInserter) Insert(ctx context.Context, values ...any) error { | ||
if len(values) != i.numColumns { | ||
return fmt.Errorf("received %d values for %d columns", len(values), i.numColumns) | ||
} | ||
|
||
i.values = append(i.values, values...) | ||
|
||
if len(i.values) >= i.maxBatchSize { | ||
return i.Flush(ctx) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (i *BatchInserter) Flush(ctx context.Context) error { | ||
if len(i.values) == 0 { | ||
return nil | ||
} | ||
|
||
n := i.maxBatchSize | ||
if len(i.values) < i.maxBatchSize { | ||
n = len(i.values) | ||
} | ||
|
||
batch := i.values[:n] | ||
i.values = append(make([]any, 0, i.maxCapacity), i.values[n:]...) | ||
|
||
batchSize := len(batch) | ||
query := i.queryBuilder.build(batchSize) | ||
return NewRowScanner(i.returningScanner)(i.db.Query(ctx, RawQuery(query, batch...))) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package pgutil | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
) | ||
|
||
type ( | ||
batchInserterOptions struct { | ||
onConflictClause string | ||
returningClause string | ||
returningScanner ScanFunc | ||
} | ||
|
||
BatchInserterConfigFunc func(*batchInserterOptions) | ||
) | ||
|
||
func getBatchInserterOptions(configs []BatchInserterConfigFunc) *batchInserterOptions { | ||
options := &batchInserterOptions{} | ||
for _, f := range configs { | ||
f(options) | ||
} | ||
|
||
return options | ||
} | ||
|
||
func WithBatchInserterOnConflict(clause string) BatchInserterConfigFunc { | ||
return func(o *batchInserterOptions) { | ||
o.onConflictClause = fmt.Sprintf("ON CONFLICT %s", clause) | ||
} | ||
} | ||
|
||
func WithBatchInserterReturn(columns []string, scanner ScanFunc) BatchInserterConfigFunc { | ||
return func(o *batchInserterOptions) { | ||
o.returningClause = fmt.Sprintf("RETURNING %s", strings.Join(quoteColumnNames(columns), ", ")) | ||
o.returningScanner = scanner | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package pgutil | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"sync" | ||
) | ||
|
||
var ( | ||
placeholders []string | ||
placeholdersCache = map[int]string{} | ||
placeholdersCacheMutex sync.Mutex | ||
) | ||
|
||
func init() { | ||
placeholders = make([]string, 0, maxNumPostgresParameters) | ||
|
||
for i := 0; i < maxNumPostgresParameters; i++ { | ||
placeholders = append(placeholders, fmt.Sprintf("$%05d", i+1)) | ||
} | ||
} | ||
|
||
type batchQueryBuilder struct { | ||
numColumns int | ||
queryPrefix string | ||
querySuffix string | ||
placeholders string | ||
} | ||
|
||
func newBatchQueryBuilder(tableName string, columnNames []string, onConflictClause, returningClause string) *batchQueryBuilder { | ||
var ( | ||
numColumns = len(columnNames) | ||
queryPrefix = fmt.Sprintf("INSERT INTO %q (%s) VALUES", tableName, strings.Join(quoteColumnNames(columnNames), ", ")) | ||
querySuffix = fmt.Sprintf("%s %s", onConflictClause, returningClause) | ||
all = makeBatchPlaceholdersString(numColumns) | ||
) | ||
|
||
return &batchQueryBuilder{ | ||
numColumns: numColumns, | ||
queryPrefix: queryPrefix, | ||
querySuffix: querySuffix, | ||
placeholders: all, | ||
} | ||
} | ||
|
||
func (b *batchQueryBuilder) build(batchSize int) string { | ||
return fmt.Sprintf("%s %s %s", b.queryPrefix, b.placeholders[:placeholdersLen(b.numColumns, batchSize)], b.querySuffix) | ||
} | ||
|
||
func makeBatchPlaceholdersString(numColumns int) string { | ||
placeholdersCacheMutex.Lock() | ||
defer placeholdersCacheMutex.Unlock() | ||
if placeholders, ok := placeholdersCache[numColumns]; ok { | ||
return placeholders | ||
} | ||
|
||
var sb strings.Builder | ||
sb.WriteString("(") | ||
sb.WriteString(placeholders[0]) | ||
for i := 1; i < maxNumPostgresParameters; i++ { | ||
if i%numColumns == 0 { | ||
sb.WriteString("),(") | ||
} else { | ||
sb.WriteString(",") | ||
} | ||
|
||
sb.WriteString(placeholders[i]) | ||
} | ||
sb.WriteString(")") | ||
|
||
placeholders := sb.String() | ||
placeholdersCache[numColumns] = placeholders | ||
return placeholders | ||
} | ||
|
||
func placeholdersLen(numColumns, batchSize int) int { | ||
var ( | ||
numRows = batchSize / numColumns | ||
placeholderLen = 6 // e.g., `$00123` | ||
rowLen = sequenceLen(numColumns, placeholderLen) + 2 // e.g., `($00123,$001234,...)` | ||
totalLen = sequenceLen(numRows, rowLen) | ||
) | ||
|
||
return totalLen | ||
} | ||
|
||
func sequenceLen(num, len int) int { | ||
return num*(len+1) - 1 | ||
} | ||
|
||
func quoteColumnNames(names []string) []string { | ||
quoted := make([]string, len(names)) | ||
for i, name := range names { | ||
quoted[i] = quoteColumnName(name) | ||
} | ||
|
||
return quoted | ||
} | ||
|
||
func quoteColumnName(name string) string { | ||
return fmt.Sprintf("%q", name) | ||
} |
Oops, something went wrong.