Skip to content

Commit

Permalink
feat: add sample snippets + transaction retry helper
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Nov 18, 2024
1 parent ffd9196 commit 280231e
Show file tree
Hide file tree
Showing 14 changed files with 1,013 additions and 23 deletions.
41 changes: 35 additions & 6 deletions migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sort"
"strings"

spannerdriver "github.com/googleapis/go-sql-spanner"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/migrator"
Expand All @@ -33,6 +34,7 @@ const (
type SpannerMigrator interface {
gorm.Migrator

AutoMigrateDryRun(values ...interface{}) ([]string, error)
StartBatchDDL() error
RunBatch() error
AbortBatch() error
Expand All @@ -41,6 +43,7 @@ type SpannerMigrator interface {
type spannerMigrator struct {
migrator.Migrator
Dialector
dryRun bool
}

type spannerColumnType struct {
Expand All @@ -60,21 +63,47 @@ func (m spannerMigrator) CurrentDatabase() (name string) {
return ""
}

func (m spannerMigrator) AutoMigrateDryRun(values ...interface{}) ([]string, error) {
return m.autoMigrate( /* dryRun = */ true, values...)
}

func (m spannerMigrator) AutoMigrate(values ...interface{}) error {
if !m.Dialector.Config.DisableAutoMigrateBatching {
_, err := m.autoMigrate( /* dryRun = */ false, values...)
return err
}

func (m spannerMigrator) autoMigrate(dryRun bool, values ...interface{}) ([]string, error) {
if dryRun || !m.Dialector.Config.DisableAutoMigrateBatching {
if err := m.StartBatchDDL(); err != nil {
return err
return nil, err
}
}
err := m.Migrator.AutoMigrate(values...)
if err == nil {
if m.Dialector.Config.DisableAutoMigrateBatching {
return nil
if !dryRun && m.Dialector.Config.DisableAutoMigrateBatching {
return nil, nil
} else if dryRun {
connPool := m.DB.Statement.ConnPool
conn, ok := connPool.(*sql.Conn)
if !ok {
return nil, fmt.Errorf("unexpected ConnPool type")
}
if err := conn.Raw(func(driverConn any) error {
spannerConn, ok := driverConn.(spannerdriver.SpannerConn)
if !ok {
return fmt.Errorf("dry-run is only supported for Spanner")
}
spannerConn.InDDLBatch()
return nil
}); err != nil {
return nil, err
}
return nil, m.AbortBatch()
} else {
return m.RunBatch()
return nil, m.RunBatch()
}
}
return fmt.Errorf("unexpected return value type: %v", err)
return nil, fmt.Errorf("unexpected return value type: %v", err)
}

func (m spannerMigrator) StartBatchDDL() error {
Expand Down
51 changes: 51 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gorm

import (
"context"
"database/sql"

"cloud.google.com/go/spanner"
"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gorm.io/gorm"
)

// TransactionWithRetryOnAborted executes a transaction on Spanner using the given
// gorm database, and retries the transaction if it is aborted by Spanner.
func TransactionWithRetryOnAborted(
ctx context.Context,
db *gorm.DB,
fc func(tx *gorm.DB) error,
opts ...*sql.TxOptions) error {
for {
err := db.Transaction(fc, opts...)
if err == nil {
return nil
}
s, ok := status.FromError(err)
if !ok || s.Code() != codes.Aborted {
return err
}
delay, ok := spanner.ExtractRetryDelay(err)
if ok {
if err := gax.Sleep(ctx, delay); err != nil {
return err
}
}
}
}
34 changes: 19 additions & 15 deletions samples/emulator/emulator_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var containerId string
// 4. Stop the Docker container with the emulator.
func RunSampleOnEmulator(sample func(string, string, string) error, ddlStatements ...string) {
var err error
if err = startEmulator(); err != nil {
if _, _, err = startEmulator(); err != nil {
log.Fatalf("failed to start emulator: %v", err)
}
projectId, instanceId, databaseId := "my-project", "my-instance", "my-database"
Expand All @@ -64,42 +64,39 @@ func RunSampleOnEmulator(sample func(string, string, string) error, ddlStatement
}
}

func startEmulator() error {
func startEmulator() (host, port string, err error) {
ctx := context.Background()
if err := os.Setenv("SPANNER_EMULATOR_HOST", "localhost:9010"); err != nil {
return err
}

// Initialize a Docker client.
var err error
cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return err
return "", "", err
}
// Pull the Spanner Emulator docker image.
reader, err := cli.ImagePull(ctx, "gcr.io/cloud-spanner-emulator/emulator", image.PullOptions{})
if err != nil {
return err
return "", "", err
}
defer func() { _ = reader.Close() }()
// cli.ImagePull is asynchronous.
// The reader needs to be read completely for the pull operation to complete.
if _, err := io.Copy(io.Discard, reader); err != nil {
return err
return "", "", err
}
// Create and start a container with the emulator.
resp, err := cli.ContainerCreate(ctx, &container.Config{
Image: "gcr.io/cloud-spanner-emulator/emulator",
ExposedPorts: nat.PortSet{"9010": {}},
}, &container.HostConfig{
PortBindings: map[nat.Port][]nat.PortBinding{"9010": {{HostIP: "0.0.0.0", HostPort: "9010"}}},
AutoRemove: true,
PortBindings: map[nat.Port][]nat.PortBinding{"9010": {{HostIP: "0.0.0.0", HostPort: ""}}},
}, nil, nil, "")
if err != nil {
return err
return "", "", err
}
containerId = resp.ID
if err := cli.ContainerStart(ctx, containerId, container.StartOptions{}); err != nil {
return err
return "", "", err
}
// Wait max 10 seconds or until the emulator is running.
for c := 0; c < 20; c++ {
Expand All @@ -108,14 +105,21 @@ func startEmulator() error {
<-time.After(500 * time.Millisecond)
resp, err := cli.ContainerInspect(ctx, containerId)
if err != nil {
return fmt.Errorf("failed to inspect container state: %v", err)
return "", "", fmt.Errorf("failed to inspect container state: %v", err)
}
if resp.State.Running {
host = resp.NetworkSettings.Ports["9010/tcp"][0].HostIP
port = resp.NetworkSettings.Ports["9010/tcp"][0].HostPort
break
}
}

return nil
if host == "" || port == "" {
return "", "", fmt.Errorf("emulator did not start successfully")
}
if err := os.Setenv("SPANNER_EMULATOR_HOST", fmt.Sprintf("%s:%s", host, port)); err != nil {
return "", "", err
}
return
}

func createInstance(projectId, instanceId string) error {
Expand Down
47 changes: 45 additions & 2 deletions samples/run_sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,57 @@
package main

import (
_ "embed"
"fmt"
"os"
"strings"

"github.com/googleapis/go-gorm-spanner/samples/emulator"
samples "github.com/googleapis/go-gorm-spanner/samples/sample_application"
"github.com/googleapis/go-gorm-spanner/samples/snippets"
)

//go:embed snippets/sample_model/data_model.sql
var createDataModelSQL string

func main() {
emulator.RunSampleOnEmulator(func(project string, instance string, database string) error {
return samples.RunSample(os.Stdout, "projects/"+project+"/instances/"+instance+"/databases/"+database)
// Run the larger sample application.
if len(os.Args) == 1 {
emulator.RunSampleOnEmulator(func(project string, instance string, database string) error {
return samples.RunSample(os.Stdout, "projects/"+project+"/instances/"+instance+"/databases/"+database)
})
return
}

// Get the DDL statements for the sample data model.
ddlStatements := strings.FieldsFunc(createDataModelSQL, func(r rune) bool {
return r == ';'
})
// Skip the last (empty) statement.
ddlStatements = ddlStatements[0 : len(ddlStatements)-1]

// Run one of the sample snippets.
sample := os.Args[1]

switch sample {
case "hello_world":
emulator.RunSampleOnEmulator(snippets.HelloWorld, ddlStatements...)
case "insert_data":
emulator.RunSampleOnEmulator(snippets.InsertData, ddlStatements...)
case "auto_save_associations":
emulator.RunSampleOnEmulator(snippets.AutoSaveAssociations, ddlStatements...)
case "interleaved_tables":
emulator.RunSampleOnEmulator(snippets.InterleavedTables, ddlStatements...)
case "read_only_transaction":
emulator.RunSampleOnEmulator(snippets.ReadOnlyTransaction, ddlStatements...)
case "read_write_transaction":
emulator.RunSampleOnEmulator(snippets.ReadWriteTransaction, ddlStatements...)
case "aborted_transaction":
emulator.RunSampleOnEmulator(snippets.AbortedTransaction, ddlStatements...)
case "migrations":
emulator.RunSampleOnEmulator(snippets.Migrations)
default:
fmt.Printf("unknown sample: %s\n", sample)
os.Exit(1)
}
}
108 changes: 108 additions & 0 deletions samples/snippets/aborted_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package snippets

import (
"context"
"fmt"

spannergorm "github.com/googleapis/go-gorm-spanner"
"github.com/googleapis/go-gorm-spanner/samples/snippets/sample_model"
"gorm.io/gorm"
)

// AbortedTransaction shows how transaction retries work on Spanner.
// Read/write transactions guarantee the consistency and atomicity of multiple queries
// and updates on Spanner. Read/write transactions take locks on the rows that are read
// and updated. Spanner can abort any read/write transaction due to lock conflicts or
// due to transient failures (e.g. network errors, machine restarts, etc.).
//
// Transactions that fail with an Aborted error should be retried. The Spanner gorm
// dialect provides the helper function `spannergorm.TransactionWithRetryOnAborted`
// for this. It is recommended to run all read/write transactions using this helper
// function, or add a similar retry function to your own application.
//
// Execute the sample with the command `go run run_sample.go aborted_transaction`
// from the samples directory.
func AbortedTransaction(projectId, instanceId, databaseId string) error {
db, err := gorm.Open(spannergorm.New(spannergorm.Config{
DriverName: "spanner",
DSN: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, databaseId),
}), &gorm.Config{PrepareStmt: true})
if err != nil {
return fmt.Errorf("failed to open database connection: %v\n", err)
}

// Insert a test row.
if err := insertVenue(db); err != nil {
return err
}

// TransactionWithRetryOnAborted automatically retries the transaction if it
// is aborted by Spanner. It is recommended to use this helper function for
// all read/write transactions.
attempt := 0
if err := spannergorm.TransactionWithRetryOnAborted(context.Background(), db, func(tx *gorm.DB) error {
attempt++
fmt.Printf("Executing attempt %d of the first transaction\n", attempt)
// Select the venue row in this transaction.
var venue sample_model.Venue
if err := tx.First(&venue).Error; err != nil {
return err
}
if attempt == 1 {
// Execute another read/write transaction that reads and updates the same row.
// This will cause this transaction to be aborted by Spanner.
if err := readAndUpdateVenueInTransaction(db); err != nil {
return err
}
}
venue.Name = venue.Name + " - Updated in first transaction"
if err := tx.Updates(&venue).Error; err != nil {
return err
}

return nil
}); err != nil {
return err
}

fmt.Printf("First transaction succeeded after %d attempt(s)\n", attempt)

return nil
}

func readAndUpdateVenueInTransaction(db *gorm.DB) error {
attempt := 0
if err := spannergorm.TransactionWithRetryOnAborted(context.Background(), db, func(tx *gorm.DB) error {
attempt++
fmt.Printf("Executing attempt %d of the second transaction\n", attempt)
var venue sample_model.Venue
if err := tx.First(&venue).Error; err != nil {
return err
}
venue.Name = venue.Name + " - Updated in second transaction"
if err := tx.Updates(&venue).Error; err != nil {
return err
}
return nil
}); err != nil {
return err
}

fmt.Printf("Second transaction succeeded after %d attempt(s)\n", attempt)

return nil
}
Loading

0 comments on commit 280231e

Please sign in to comment.