Skip to content

Commit

Permalink
Squash commits.
Browse files Browse the repository at this point in the history
  • Loading branch information
efritz committed Sep 20, 2024
1 parent 6cf6046 commit 43ab2ce
Show file tree
Hide file tree
Showing 117 changed files with 7,729 additions and 643 deletions.
8 changes: 8 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

export PGHOST=localhost
export PGPORT=5432
export PGUSER=postgres
export PGPASSWORD=
export PGDATABASE=postgres
export TEMPLATEDB=template0
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2022 Eric Fritz
Copyright (c) 2024 Eric Fritz

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
27 changes: 1 addition & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Postgres utilities for use with nacelle.

### Usage

This library creates a [sqlx](https://github.com/jmoiron/sqlx) connection wrapped in a nacelle [logger](https://nacelle.dev/docs/core/log). The supplied initializer adds this connection into the nacelle [service container](https://nacelle.dev/docs/core/service) under the key `db`. The initializer will block until a ping succeeds.
This library creates a Postgres connection wrapped in a nacelle [logger](https://nacelle.dev/docs/core/log). The supplied initializer adds this connection into the nacelle [service container](https://nacelle.dev/docs/core/service) under the key `db`. The initializer will block until a ping succeeds.

```go
func setup(processes nacelle.ProcessContainer, services nacelle.ServiceContainer) error {
Expand All @@ -21,28 +21,6 @@ func setup(processes nacelle.ProcessContainer, services nacelle.ServiceContainer
}
```

This library uses [golang migrate](https://github.com/golang-migrate/migrate) to optionally run migrations on application startup. To configure migrations, supply a [source driver](https://github.com/golang-migrate/migrate#migration-sources) to the initializer, as follows.

```go
import (
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/golang-migrate/migrate/v4/source"
)

func setup(processes nacelle.ProcessContainer, services nacelle.ServiceContainer) error {
migrationSourceDriver, err := source.Open("file:///migrations")
if err != nil {
return err
}

processes.RegisterInitializer(pgutil.NewInitializer(
pgutil.WithMigrationSourceDriver(migrationSourceDriver)
))

// ...
}
```

### Configuration

The default service behavior can be configured by the following environment variables.
Expand All @@ -51,6 +29,3 @@ The default service behavior can be configured by the following environment vari
| ------------------------------- | -------- | ----------------- | ---------------------------------------------------------------------------------------------------- |
| DATABASE_URL | yes | | The connection string of the remote database. |
| LOG_SQL_QUERIES | | false | Whether or not to log parameterized SQL queries. |
| MIGRATIONS_TABLE | | schema_migrations | The name of the migrations table. |
| MIGRATIONS_SCHEMA_NAME | | default | The name of the schema used during migrations. |
| FAIL_ON_NEWER_MIGRATION_VERSION | | false | If true, fail startup when the database migration version is newer than the known set of migrations. |
71 changes: 71 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package pgutil

import (
"context"
"fmt"
)

type BatchInserter struct {
db DB
numColumns int
maxBatchSize int
maxCapacity int
queryBuilder *batchQueryBuilder
returningScanner ScanFunc
values []any
}

const maxNumPostgresParameters = 65535

func NewBatchInserter(db DB, tableName string, columnNames []string, configs ...BatchInserterConfigFunc) *BatchInserter {
var (
options = getBatchInserterOptions(configs)
numColumns = len(columnNames)
maxBatchSize = int(maxNumPostgresParameters/numColumns) * numColumns
maxCapacity = maxBatchSize + numColumns
queryBuilder = newBatchQueryBuilder(tableName, columnNames, options.onConflictClause, options.returningClause)
returningScanner = options.returningScanner
)

return &BatchInserter{
db: db,
numColumns: numColumns,
maxBatchSize: maxBatchSize,
maxCapacity: maxCapacity,
queryBuilder: queryBuilder,
returningScanner: returningScanner,
values: make([]any, 0, maxCapacity),
}
}

func (i *BatchInserter) Insert(ctx context.Context, values ...any) error {
if len(values) != i.numColumns {
return fmt.Errorf("received %d values for %d columns", len(values), i.numColumns)
}

i.values = append(i.values, values...)

if len(i.values) >= i.maxBatchSize {
return i.Flush(ctx)
}

return nil
}

func (i *BatchInserter) Flush(ctx context.Context) error {
if len(i.values) == 0 {
return nil
}

n := i.maxBatchSize
if len(i.values) < i.maxBatchSize {
n = len(i.values)
}

batch := i.values[:n]
i.values = append(make([]any, 0, i.maxCapacity), i.values[n:]...)

batchSize := len(batch)
query := i.queryBuilder.build(batchSize)
return NewRowScanner(i.returningScanner)(i.db.Query(ctx, RawQuery(query, batch...)))
}
38 changes: 38 additions & 0 deletions batch_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pgutil

import (
"fmt"
"strings"
)

type (
batchInserterOptions struct {
onConflictClause string
returningClause string
returningScanner ScanFunc
}

BatchInserterConfigFunc func(*batchInserterOptions)
)

func getBatchInserterOptions(configs []BatchInserterConfigFunc) *batchInserterOptions {
options := &batchInserterOptions{}
for _, f := range configs {
f(options)
}

return options
}

func WithBatchInserterOnConflict(clause string) BatchInserterConfigFunc {
return func(o *batchInserterOptions) {
o.onConflictClause = fmt.Sprintf("ON CONFLICT %s", clause)
}
}

func WithBatchInserterReturn(columns []string, scanner ScanFunc) BatchInserterConfigFunc {
return func(o *batchInserterOptions) {
o.returningClause = fmt.Sprintf("RETURNING %s", strings.Join(quoteColumnNames(columns), ", "))
o.returningScanner = scanner
}
}
102 changes: 102 additions & 0 deletions batch_queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pgutil

import (
"fmt"
"strings"
"sync"
)

var (
placeholders []string
placeholdersCache = map[int]string{}
placeholdersCacheMutex sync.Mutex
)

func init() {
placeholders = make([]string, 0, maxNumPostgresParameters)

for i := 0; i < maxNumPostgresParameters; i++ {
placeholders = append(placeholders, fmt.Sprintf("$%05d", i+1))
}
}

type batchQueryBuilder struct {
numColumns int
queryPrefix string
querySuffix string
placeholders string
}

func newBatchQueryBuilder(tableName string, columnNames []string, onConflictClause, returningClause string) *batchQueryBuilder {
var (
numColumns = len(columnNames)
queryPrefix = fmt.Sprintf("INSERT INTO %q (%s) VALUES", tableName, strings.Join(quoteColumnNames(columnNames), ", "))
querySuffix = fmt.Sprintf("%s %s", onConflictClause, returningClause)
all = makeBatchPlaceholdersString(numColumns)
)

return &batchQueryBuilder{
numColumns: numColumns,
queryPrefix: queryPrefix,
querySuffix: querySuffix,
placeholders: all,
}
}

func (b *batchQueryBuilder) build(batchSize int) string {
return fmt.Sprintf("%s %s %s", b.queryPrefix, b.placeholders[:placeholdersLen(b.numColumns, batchSize)], b.querySuffix)
}

func makeBatchPlaceholdersString(numColumns int) string {
placeholdersCacheMutex.Lock()
defer placeholdersCacheMutex.Unlock()
if placeholders, ok := placeholdersCache[numColumns]; ok {
return placeholders
}

var sb strings.Builder
sb.WriteString("(")
sb.WriteString(placeholders[0])
for i := 1; i < maxNumPostgresParameters; i++ {
if i%numColumns == 0 {
sb.WriteString("),(")
} else {
sb.WriteString(",")
}

sb.WriteString(placeholders[i])
}
sb.WriteString(")")

placeholders := sb.String()
placeholdersCache[numColumns] = placeholders
return placeholders
}

func placeholdersLen(numColumns, batchSize int) int {
var (
numRows = batchSize / numColumns
placeholderLen = 6 // e.g., `$00123`
rowLen = sequenceLen(numColumns, placeholderLen) + 2 // e.g., `($00123,$001234,...)`
totalLen = sequenceLen(numRows, rowLen)
)

return totalLen
}

func sequenceLen(num, len int) int {
return num*(len+1) - 1
}

func quoteColumnNames(names []string) []string {
quoted := make([]string, len(names))
for i, name := range names {
quoted[i] = quoteColumnName(name)
}

return quoted
}

func quoteColumnName(name string) string {
return fmt.Sprintf("%q", name)
}
Loading

0 comments on commit 43ab2ce

Please sign in to comment.