Skip to content

Commit

Permalink
Update schema and add migrations for mysql and pgsql
Browse files Browse the repository at this point in the history
Extends id column to bigint for new databases.

Existing databases will be migrated if the KINE_SCHEMA_MIGRATION environment variable is set to 1 or higher.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Mar 1, 2024
1 parent abe1265 commit 57fbf01
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
40 changes: 29 additions & 11 deletions pkg/drivers/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
cryptotls "crypto/tls"
"database/sql"
"fmt"
"os"
"strconv"

"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -27,12 +29,12 @@ var (
schema = []string{
`CREATE TABLE IF NOT EXISTS kine
(
id INTEGER AUTO_INCREMENT,
id BIGINT UNSIGNED AUTO_INCREMENT,
name VARCHAR(630) CHARACTER SET ascii,
created INTEGER,
deleted INTEGER,
create_revision INTEGER,
prev_revision INTEGER,
create_revision BIGINT UNSIGNED,
prev_revision BIGINT UNSIGNED,
lease INTEGER,
value MEDIUMBLOB,
old_value MEDIUMBLOB,
Expand All @@ -44,6 +46,9 @@ var (
`CREATE INDEX kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
schemaMigrations = []string{
`ALTER TABLE kine MODIFY COLUMN id BIGINT UNSIGNED AUTO_INCREMENT NOT NULL UNIQUE, MODIFY COLUMN create_revision BIGINT UNSIGNED, MODIFY COLUMN prev_revision BIGINT UNSIGNED`,
}
createDB = "CREATE DATABASE IF NOT EXISTS "
)

Expand Down Expand Up @@ -117,25 +122,40 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}

func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
var exists bool
err := db.QueryRow("SELECT 1 FROM information_schema.TABLES WHERE table_schema = DATABASE() AND table_name = ?", "kine").Scan(&exists)
if err != nil && err != sql.ErrNoRows {
logrus.Warnf("failed to check existence of database table %s, going to attempt create: %v", "kine", err)
logrus.Warnf("Failed to check existence of database table %s, going to attempt create: %v", "kine", err)
}

if !exists {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
if _, err := db.Exec(stmt); err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
return err
}
}
}
}

// Run enabled schama migrations.
// Note that the schema created by the `schema` var is always the latest revision;
// migrations should handle deltas between prior schema versions.
schemaVersion, _ := strconv.ParseUint(os.Getenv("KINE_SCHEMA_MIGRATION"), 10, 64)
for i, stmt := range schemaMigrations {
if i >= int(schemaVersion) {
break
}
logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt))
if _, err := db.Exec(stmt); err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
return err
}
}
}

logrus.Infof("Database tables and indexes are up to date")
return nil
}
Expand All @@ -159,8 +179,7 @@ func createDBIfNotExist(dataSourceName string) error {
}

if !exists {
_, err = db.Exec(createDB + dbName)
if err != nil {
if _, err = db.Exec(createDB + dbName); err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1049 {
return err
}
Expand All @@ -169,8 +188,7 @@ func createDBIfNotExist(dataSourceName string) error {
if err != nil {
return err
}
_, err = db.Exec(createDB + dbName)
if err != nil {
if _, err = db.Exec(createDB + dbName); err != nil {
return err
}
}
Expand Down
30 changes: 23 additions & 7 deletions pkg/drivers/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"net/url"
"os"
"regexp"
"strconv"
"strings"
Expand All @@ -30,12 +31,12 @@ var (
schema = []string{
`CREATE TABLE IF NOT EXISTS kine
(
id SERIAL PRIMARY KEY,
id BIGSERIAL PRIMARY KEY,
name VARCHAR(630),
created INTEGER,
deleted INTEGER,
create_revision INTEGER,
prev_revision INTEGER,
create_revision BIGINT,
prev_revision BIGINT,
lease INTEGER,
value bytea,
old_value bytea
Expand All @@ -46,6 +47,9 @@ var (
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
schemaMigrations = []string{
`ALTER TABLE kine ALTER COLUMN id SET DATA TYPE BIGINT, ALTER COLUMN create_revision SET DATA TYPE BIGINT, ALTER COLUMN prev_revision SET DATA TYPE BIGINT; ALTER SEQUENCE kine_id_seq AS BIGINT`,
}
createDB = "CREATE DATABASE "
)

Expand Down Expand Up @@ -117,8 +121,21 @@ func setup(db *sql.DB) error {

for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
if _, err := db.Exec(stmt); err != nil {
return err
}
}

// Run enabled schama migrations.
// Note that the schema created by the `schema` var is always the latest revision;
// migrations should handle deltas between prior schema versions.
schemaVersion, _ := strconv.ParseUint(os.Getenv("KINE_SCHEMA_MIGRATION"), 10, 64)
for i, stmt := range schemaMigrations {
if i >= int(schemaVersion) {
break
}
logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt))
if _, err := db.Exec(stmt); err != nil {
return err
}
}
Expand Down Expand Up @@ -152,8 +169,7 @@ func createDBIfNotExist(dataSourceName string) error {

if !exists {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err = db.Exec(stmt)
if err != nil {
if _, err = db.Exec(stmt); err != nil {
logrus.Warnf("failed to create database %s: %v", dbName, err)
} else {
logrus.Tracef("created database: %s", dbName)
Expand Down

0 comments on commit 57fbf01

Please sign in to comment.