diff --git a/Makefile b/Makefile index faa0bc307..211b4c726 100644 --- a/Makefile +++ b/Makefile @@ -208,4 +208,12 @@ test-onchain: ${GOBIN} test -v -count 1 -tags="${BUILD_TAGS}" github.com/waku-org/go-waku/waku/v2/protocol/rln test-onchain-with-race: - ${GOBIN} test -race -v -count 1 -tags="${BUILD_TAGS}" github.com/waku-org/go-waku/waku/v2/protocol/rln \ No newline at end of file + ${GOBIN} test -race -v -count 1 -tags="${BUILD_TAGS}" github.com/waku-org/go-waku/waku/v2/protocol/rln + +test-postgres: PG_BUILD_TAGS = ${BUILD_TAGS} include_postgres_tests +test-postgres: + ${GOBIN} test -p 1 -v -count 1 -tags="${PG_BUILD_TAGS}" github.com/waku-org/go-waku/waku/persistence + +test-postgres-with-race: + ${GOBIN} test -race -p 1 -v -count 1 -tags="${PG_BUILD_TAGS}" github.com/waku-org/go-waku/waku/persistence + diff --git a/ci/Jenkinsfile.tests b/ci/Jenkinsfile.tests index 29e9b35eb..87cb3363a 100644 --- a/ci/Jenkinsfile.tests +++ b/ci/Jenkinsfile.tests @@ -30,6 +30,8 @@ pipeline { environment { TARGET = 'tests' + DB_CONT = "go-waku-test-db-${env.EXECUTOR_NUMBER.toInteger() + 1}" + DB_PORT = "${5432 + env.EXECUTOR_NUMBER.toInteger()}" REPO = "${env.WORKSPACE}/src/github.com/waku-org/go-waku" GOCACHE = "${env.WORKSPACE_TMP}/go-build" GOPATH = "${env.WORKSPACE}/go" @@ -58,6 +60,28 @@ pipeline { } } } } + stage('PostgresSQL') { + environment { + TEST_DB_PORT = "${env.DB_PORT}" + } + steps { script { dir(env.REPO) { + db = docker.image('postgres:9.6-alpine').withRun([ + "--name=${DB_CONT}", + "--env=POSTGRES_HOST_AUTH_METHOD=trust", + "--publish=${DB_PORT}:${DB_PORT}", + ].join(' '), "-p ${DB_PORT}") { c -> + if (params.RACE) { + nix.develop('make test-postgres-with-race', pure: false) + }else { + nix.develop('make test-postgres', pure: false) + } + } + } } } + post { cleanup { /* Leftover DB containers. */ + sh "docker rm ${DB_CONT} || true" + } } + } + stage('Ganache') { steps { script { ganache = docker.image( diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 0f5e68732..7edef3372 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -18,7 +18,6 @@ import ( "github.com/pbnjay/memory" "github.com/prometheus/client_golang/prometheus" - "github.com/waku-org/go-waku/waku/persistence/sqlite" dbutils "github.com/waku-org/go-waku/waku/persistence/utils" wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/rendezvous" @@ -203,7 +202,7 @@ func Execute(options NodeOptions) { if options.Store.Enable && options.PersistPeers { // Create persistent peerstore - queries, err := sqlite.NewQueries("peerstore", db) + queries, err := dbutils.NewQueries("peerstore", db) failOnErr(err, "Peerstore") datastore := dssql.NewDatastore(db, queries) diff --git a/waku/persistence/driver_type.go b/waku/persistence/driver_type.go new file mode 100644 index 000000000..86c784b47 --- /dev/null +++ b/waku/persistence/driver_type.go @@ -0,0 +1,22 @@ +package persistence + +import ( + "database/sql" + "reflect" +) + +const ( + UndefinedDriver = iota + PostgresDriver + SQLiteDriver +) + +func GetDriverType(db *sql.DB) int { + switch reflect.TypeOf(db.Driver()).String() { + case "*sqlite3.SQLiteDriver": + return SQLiteDriver + case "*stdlib.Driver": + return PostgresDriver + } + return UndefinedDriver +} diff --git a/waku/persistence/postgres/mock.go b/waku/persistence/postgres/mock.go new file mode 100644 index 000000000..ed9457bc7 --- /dev/null +++ b/waku/persistence/postgres/mock.go @@ -0,0 +1,50 @@ +package postgres + +import ( + "database/sql" + "fmt" + "log" + "os" +) + +var dbUrlTemplate = "postgres://postgres@localhost:%s/%s?sslmode=disable" + +func ResetDefaultTestPostgresDB(dropDBUrl string) error { + db, err := sql.Open("pgx", dropDBUrl) + if err != nil { + return err + } + + deletePrevConnectionsSql := ` + SELECT pid, pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE datname in ('template1', 'postgres') AND pid <> pg_backend_pid();` + _, err = db.Exec(deletePrevConnectionsSql) + if err != nil { + return err + } + + _, err = db.Exec("DROP DATABASE IF EXISTS postgres;") + if err != nil { + return err + } + + _, err = db.Exec("CREATE DATABASE postgres;") + return err +} + +func NewMockPgDB() *sql.DB { + mockPgDBPort := os.Getenv("TEST_DB_PORT") + + // + dropDBUrl := fmt.Sprintf(dbUrlTemplate, mockPgDBPort, "template1") + if err := ResetDefaultTestPostgresDB(dropDBUrl); err != nil { + log.Fatalf("an error '%s' while reseting the db", err) + } + mockDBUrl := fmt.Sprintf(dbUrlTemplate, mockPgDBPort, "postgres") + db, err := sql.Open("pgx", mockDBUrl) + if err != nil { + log.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + return db +} diff --git a/waku/persistence/postgres/postgres.go b/waku/persistence/postgres/postgres.go index bda79b545..cfc372fe3 100644 --- a/waku/persistence/postgres/postgres.go +++ b/waku/persistence/postgres/postgres.go @@ -71,5 +71,5 @@ func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) { if err != nil { return nil, err } - return persistence.CreateQueries(tbl, db), nil + return persistence.CreateQueries(tbl), nil } diff --git a/waku/persistence/postgres/postgres_test.go b/waku/persistence/postgres/postgres_test.go new file mode 100644 index 000000000..32ddc8dc4 --- /dev/null +++ b/waku/persistence/postgres/postgres_test.go @@ -0,0 +1,52 @@ +//go:build include_postgres_tests +// +build include_postgres_tests + +package postgres + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/persistence" +) + +func TestQueries(t *testing.T) { + db := persistence.NewMockPgDB() + + queries, err := NewQueries("test_queries", db) + require.NoError(t, err) + + query := queries.Delete() + require.NotEmpty(t, query) + + query = queries.Exists() + require.NotEmpty(t, query) + + query = queries.Get() + require.NotEmpty(t, query) + + query = queries.Put() + require.NotEmpty(t, query) + + query = queries.Query() + require.NotEmpty(t, query) + + query = queries.Prefix() + require.NotEmpty(t, query) + + query = queries.Limit() + require.NotEmpty(t, query) + + query = queries.Offset() + require.NotEmpty(t, query) + + query = queries.GetSize() + require.NotEmpty(t, query) +} + +func TestCreateTable(t *testing.T) { + db := persistence.NewMockPgDB() + + err := CreateTable(db, "test_create_table") + require.NoError(t, err) +} diff --git a/waku/persistence/sql_queries.go b/waku/persistence/sql_queries.go index 2329187e4..730f0c957 100644 --- a/waku/persistence/sql_queries.go +++ b/waku/persistence/sql_queries.go @@ -1,7 +1,6 @@ package persistence import ( - "database/sql" "fmt" ) @@ -20,7 +19,7 @@ type Queries struct { // CreateQueries Function creates a set of queries for an SQL table. // Note: Do not use this function to create queries for a table, rather use .NewQueries to create table as well as queries. -func CreateQueries(tbl string, db *sql.DB) *Queries { +func CreateQueries(tbl string) *Queries { return &Queries{ deleteQuery: fmt.Sprintf("DELETE FROM %s WHERE key = $1", tbl), existsQuery: fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key=$1)", tbl), diff --git a/waku/persistence/sqlite/mock.go b/waku/persistence/sqlite/mock.go new file mode 100644 index 000000000..be17cface --- /dev/null +++ b/waku/persistence/sqlite/mock.go @@ -0,0 +1,17 @@ +package sqlite + +import ( + "database/sql" + "log" + + _ "github.com/mattn/go-sqlite3" +) + +func NewMockSqliteDB() *sql.DB { + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + log.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + + return db +} diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index c7b845c49..75181608c 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -91,5 +91,5 @@ func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) { if err != nil { return nil, err } - return persistence.CreateQueries(tbl, db), nil + return persistence.CreateQueries(tbl), nil } diff --git a/waku/persistence/sqlite/sqlite_test.go b/waku/persistence/sqlite/sqlite_test.go index 025d1b43a..8692f329c 100644 --- a/waku/persistence/sqlite/sqlite_test.go +++ b/waku/persistence/sqlite/sqlite_test.go @@ -1,24 +1,13 @@ package sqlite import ( - "database/sql" - "log" "testing" "github.com/stretchr/testify/require" ) -func NewMock() *sql.DB { - db, err := sql.Open("sqlite3", ":memory:") - if err != nil { - log.Fatalf("an error '%s' was not expected when opening a stub database connection", err) - } - - return db -} - func TestQueries(t *testing.T) { - db := NewMock() + db := NewMockSqliteDB() queries, err := NewQueries("test_queries", db) require.NoError(t, err) @@ -51,7 +40,7 @@ func TestQueries(t *testing.T) { } func TestCreateTable(t *testing.T) { - db := NewMock() + db := NewMockSqliteDB() err := CreateTable(db, "test_create_table") require.NoError(t, err) diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 2fc8405e2..e91868564 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -224,8 +224,8 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error { // Limit number of records to a max N if d.maxMessages > 0 { start := time.Now() - sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET $1)` - _, err := d.db.Exec(sqlStmt, d.maxMessages) + + _, err := d.db.Exec(d.getDeleteOldRowsQuery(), d.maxMessages) if err != nil { d.metrics.RecordError(retPolicyFailure) return err @@ -239,6 +239,17 @@ func (d *DBStore) cleanOlderRecords(ctx context.Context) error { return nil } +func (d *DBStore) getDeleteOldRowsQuery() string { + sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC %s OFFSET $1)` + switch GetDriverType(d.db) { + case SQLiteDriver: + sqlStmt = fmt.Sprintf(sqlStmt, "LIMIT -1") + case PostgresDriver: + sqlStmt = fmt.Sprintf(sqlStmt, "") + } + return sqlStmt +} + func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) { defer d.wg.Done() diff --git a/waku/persistence/utils/db.go b/waku/persistence/utils/db.go index 96c5a9199..ad7e3a957 100644 --- a/waku/persistence/utils/db.go +++ b/waku/persistence/utils/db.go @@ -6,6 +6,7 @@ import ( "regexp" "strings" + "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/persistence/postgres" "github.com/waku-org/go-waku/waku/persistence/sqlite" "go.uber.org/zap" @@ -65,3 +66,13 @@ func ExtractDBAndMigration(databaseURL string, dbSettings DBSettings, logger *za return db, migrationFn, nil } + +func NewQueries(tbl string, db *sql.DB) (*persistence.Queries, error) { + switch persistence.GetDriverType(db) { + case persistence.SQLiteDriver: + return sqlite.NewQueries(tbl, db) + case persistence.PostgresDriver: + return postgres.NewQueries(tbl, db) + } + return nil, errors.New("unsupported database engine") +} diff --git a/waku/persistence/store_test.go b/waku/persistence/utils/store_test.go similarity index 77% rename from waku/persistence/store_test.go rename to waku/persistence/utils/store_test.go index b1ecc52a7..6b7b8bbad 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/utils/store_test.go @@ -1,4 +1,7 @@ -package persistence +//go:build include_postgres_tests +// +build include_postgres_tests + +package utils import ( "context" @@ -7,42 +10,49 @@ import ( "testing" "time" - "github.com/golang-migrate/migrate/v4/database/sqlite3" - _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/persistence/migrate" - sqlitemigrations "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" + "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/postgres" + "github.com/waku-org/go-waku/waku/persistence/sqlite" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" ) -func Migrate(db *sql.DB) error { - migrationDriver, err := sqlite3.WithInstance(db, &sqlite3.Config{ - MigrationsTable: "gowaku_" + sqlite3.DefaultMigrationsTable, - }) - if err != nil { - return err +func TestStore(t *testing.T) { + tests := []struct { + name string + fn func(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) + }{ + {"testDbStore", testDbStore}, + {"testStoreRetention", testStoreRetention}, + {"testQuery", testQuery}, + } + for _, driverName := range []string{"postgres", "sqlite"} { + // all tests are run for each db + for _, tc := range tests { + db, migrationFn := getDB(driverName) + t.Run(driverName+"_"+tc.name, func(t *testing.T) { + tc.fn(t, db, migrationFn) + }) + } } - return migrate.Migrate(db, migrationDriver, sqlitemigrations.AssetNames(), sqlitemigrations.Asset) } -func NewMock() *sql.DB { - db, err := sql.Open("sqlite3", ":memory:") - if err != nil { - utils.Logger().Fatal("opening a stub database connection", zap.Error(err)) +func getDB(driver string) (*sql.DB, func(*sql.DB) error) { + switch driver { + case "postgres": + return postgres.NewMockPgDB(), postgres.Migrations + case "sqlite": + return sqlite.NewMockSqliteDB(), sqlite.Migrations } - - return db + return nil, nil } - -func TestDbStore(t *testing.T) { - db := NewMock() - store, err := NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), WithDB(db), WithMigrations(Migrate)) +func testDbStore(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) { + store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn)) require.NoError(t, err) err = store.Start(context.Background(), timesource.NewDefaultClock()) @@ -60,9 +70,8 @@ func TestDbStore(t *testing.T) { require.NotEmpty(t, res) } -func TestStoreRetention(t *testing.T) { - db := NewMock() - store, err := NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second)) +func testStoreRetention(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) { + store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) err = store.Start(context.Background(), timesource.NewDefaultClock()) @@ -85,7 +94,7 @@ func TestStoreRetention(t *testing.T) { // This step simulates starting go-waku again from scratch - store, err = NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) + store, err = persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithRetentionPolicy(5, 40*time.Second)) require.NoError(t, err) err = store.Start(context.Background(), timesource.NewDefaultClock()) @@ -103,9 +112,8 @@ func TestStoreRetention(t *testing.T) { require.Equal(t, msgCount, 3) } -func TestQuery(t *testing.T) { - db := NewMock() - store, err := NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second)) +func testQuery(t *testing.T, db *sql.DB, migrationFn func(*sql.DB) error) { + store, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) insertTime := time.Now()