Skip to content

Commit

Permalink
Merge pull request #3 from bigboss2063/master
Browse files Browse the repository at this point in the history
ref(database): improve database initialization and testing
  • Loading branch information
elliotchenzichang authored Oct 9, 2024
2 parents 60243f6 + d5088d5 commit f5540b5
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 67 deletions.
7 changes: 4 additions & 3 deletions cdc-observer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cdcobserver

import (
dockerapi "cdc-observer/docker_api"
"context"
"errors"
"time"
Expand All @@ -19,12 +20,12 @@ type CDCObserver struct {
addr string
dbName string
river *canal.Canal
dockerClient *DockerClient
dockerClient *dockerapi.DockerClient
// only one database is enough for the goal of this project
db *database.Database
}

func NewCDCObserver(ctx context.Context, opt *Options) (*CDCObserver, error) {
func NewCDCObserver(opt *Options) (*CDCObserver, error) {
if err := opt.validates(); err != nil {
return nil, err
}
Expand All @@ -33,7 +34,7 @@ func NewCDCObserver(ctx context.Context, opt *Options) (*CDCObserver, error) {
if opt.EnableDocker {
observer.enableDocker = true
observer.containername = opt.ContainerName
dockerClient, err := NewDockerClient()
dockerClient, err := dockerapi.NewDockerClient()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cdc-observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestSyncCDCChangeFromDatabase(t *testing.T) {
DatabaseName: "elliot_test_database",
}
ctx := context.Background()
cdcObserver, err := NewCDCObserver(ctx, opt)
cdcObserver, err := NewCDCObserver(opt)
if err != nil {
t.Fatal(err)
}
Expand Down
17 changes: 17 additions & 0 deletions constant/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package constant

import "time"

// all the databases share this settings
const (
DatabaseHost = "0.0.0.0"
DatabaseName = "cdc-observer"
DatabaseUsername = "root"
DatabasePassword = "cdc-observer-password"
)

// retry times and interval for the database connection
const (
RetryTimes = 10
RetryInterval = 1 * time.Second
)
7 changes: 7 additions & 0 deletions constant/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package constant

// the prefix of the name of the cdc-observer containers, like /cdc-observer-mysql, /cdc-observer-pgsql
const ContainerNamePrefix = "cdc-observer-"

// the image name of the database instance in the container
const MysqlImageName = "mysql"
33 changes: 25 additions & 8 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,49 @@
package database

import (
"cdc-observer/constant"
"fmt"
"time"

"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)

type Database struct {
Name string
dbClient *gorm.DB
tables map[string]*Table
pendingTables map[string]*Table
}

// todo if the database is already existed, it's suppose to sync the database schema to local
// todo maybe can try to create a database if the database is not existed
func NewDatabase(name string, addr string, port int, username string, password string) (*Database, error) {
func NewDatabase(port string) (*Database, error) {
db := &Database{
Name: name,
tables: map[string]*Table{},
pendingTables: map[string]*Table{},
}
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, addr, port, name)
dbClient, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
})
dsn := fmt.Sprintf(
"%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
constant.DatabaseUsername,
constant.DatabasePassword,
constant.DatabaseHost,
port,
constant.DatabaseName,
)
var (
dbClient *gorm.DB
err error
)
// retry for RetryTimes, if the database is not ready, the database will be ready after 1 second
for i := 0; i < constant.RetryTimes; i++ {
dbClient, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
})
if err == nil {
break
}
time.Sleep(constant.RetryInterval)
}
if err != nil {
return nil, err
}
Expand Down
64 changes: 48 additions & 16 deletions database/database_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,56 @@
package database

import "testing"
import (
"cdc-observer/constant"
dockerapi "cdc-observer/docker_api"
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewDatabaseAndAddNewTable(t *testing.T) {
db, err := NewDatabase("elliot_test_database", "127.0.0.1", 3307, "root", "123456")
if err != nil {
t.Fatal(err)
}
table, err := NewTableBuilder("test_table", db.dbClient).AddFieldInt("test_field_int").AddFieldVarchar("test_field_string").Submit()
if err != nil {
t.Fatal(err)
}
// Create a new Docker client
dockerClient, err := dockerapi.NewDockerClient()
assert.NoError(t, err, "Failed to create Docker client")

// Start the MySQL container and get the assigned port
ctx := context.Background()
err = dockerClient.StartMySQLContainer(ctx)
assert.NoError(t, err, "Failed to start MySQL container")
defer func() {
dockerClient.StopAllContainers(ctx)
dockerClient.RemoveAllContainers(ctx)
}()

containerName := dockerClient.ContainerName(constant.MysqlImageName)
port, err := dockerClient.ContainerPort(ctx, containerName)
assert.NoError(t, err, "Failed to get MySQL container port")

// Initialize the database
db, err := NewDatabase(port)
assert.NoError(t, err, "Failed to create new database")

// Create a new table
table, err := NewTableBuilder("test_table", db.dbClient).
AddFieldInt("test_field_int").
AddFieldVarchar("test_field_string").
Submit()
assert.NoError(t, err, "Failed to create new table")

// Add the table to the database
err = db.AddTable(table)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err, "Failed to add table to database")

// Apply changes
err = db.Apply()
if err != nil {
t.Fatal(err)
}
r := NewRowBuilder().AddField("test_field_int", int64(1)).AddField("test_field_string", "test string").Submit()
assert.NoError(t, err, "Failed to apply changes")

// Add a row to the table
r := NewRowBuilder().
AddField("test_field_int", int64(1)).
AddField("test_field_string", "test string").
Submit()
table.AddRow(r)
assert.NoError(t, err, "Failed to add row to table")
}
12 changes: 0 additions & 12 deletions docker-interaction_test.go

This file was deleted.

73 changes: 47 additions & 26 deletions docker-interaction.go → docker_api/docker_interaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cdcobserver
package dockerapi

import (
"cdc-observer/constant"
"context"
"fmt"
"io"
Expand All @@ -15,21 +16,6 @@ import (
"github.com/docker/go-connections/nat"
)

// the prefix of the name of the cdc-observer containers, like /cdc-observer-mysql, /cdc-observer-pgsql
const ContainerNamePrefix = "cdc-observer-"

// the image name of the database instance in the container
const (
MysqlImageName = "mysql"
)

// all the databases share this settings
const (
DatabaseName = "cdc-observer"
DatabaseUsername = "root"
DatabasePassword = "cdc-observer-password"
)

type DockerClient struct {
client *client.Client
}
Expand All @@ -48,8 +34,8 @@ func NewDockerClient() (*DockerClient, error) {
func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {
cli := dc.client
// todo implement a common function to check if the image had been download
if !dc.checkImageExistence(ctx, MysqlImageName) {
reader, err := cli.ImagePull(ctx, MysqlImageName, image.PullOptions{})
if !dc.checkImageExistence(ctx, constant.MysqlImageName) {
reader, err := cli.ImagePull(ctx, constant.MysqlImageName, image.PullOptions{})
if err != nil {
return err
}
Expand All @@ -64,14 +50,14 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {
PortBindings: nat.PortMap{
"3306/tcp": []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostIP: constant.DatabaseHost,
HostPort: "0", // use 0 to let docker automatically choose a free port
},
},
},
}

containerName := fmt.Sprintf("/%s%s", ContainerNamePrefix, MysqlImageName)
containerName := dc.ContainerName(constant.MysqlImageName)

// Check if the container named /cdc-observer-mysql already exists
if exists, err := dc.checkContainerExistence(ctx, containerName); err != nil {
Expand All @@ -85,10 +71,10 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {

// If the container doesn't exist, create it
resp, err := cli.ContainerCreate(ctx, &container.Config{
Image: MysqlImageName,
Image: constant.MysqlImageName,
Env: []string{
fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", DatabasePassword),
fmt.Sprintf("MYSQL_DATABASE=%s", DatabaseName),
fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", constant.DatabasePassword),
fmt.Sprintf("MYSQL_DATABASE=%s", constant.DatabaseName),
},
Tty: false,
}, hostConfig, nil, nil, containerName)
Expand All @@ -113,6 +99,10 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {
return nil
}

func (dc *DockerClient) ContainerInfo(ctx context.Context, containerName string) (types.ContainerJSON, error) {
return dc.client.ContainerInspect(ctx, containerName)
}

func (dc *DockerClient) StopAllContainers(ctx context.Context) {
containers, err := dc.containers(ctx)
if err != nil {
Expand All @@ -131,13 +121,29 @@ func (dc *DockerClient) StopAllContainers(ctx context.Context) {
}
}

func (dc *DockerClient) RemoveAllContainers(ctx context.Context) {
containers, err := dc.containers(ctx)
if err != nil {
log.Printf("failed to list containers: %v", err)
}

for _, c := range containers {
if err := dc.client.ContainerRemove(ctx, c.ID, container.RemoveOptions{}); err != nil {
log.Printf("failed to remove container %s: %v", c.ID, err)
} else {
log.Printf("removed container %s\n", c.ID)
}
}
}

func (dc *DockerClient) checkImageExistence(ctx context.Context, imageName string) bool {
_, _, err := dc.client.ImageInspectWithRaw(ctx, imageName)
return err == nil
}

func (dc *DockerClient) checkContainerExistence(ctx context.Context, containerName string) (bool, error) {
if _, err := dc.client.ContainerInspect(ctx, containerName); err != nil {
_, err := dc.ContainerInfo(ctx, containerName)
if err != nil {
if client.IsErrNotFound(err) {
return false, nil
}
Expand All @@ -152,7 +158,7 @@ func (dc *DockerClient) containers(ctx context.Context) ([]types.Container, erro

func (dc *DockerClient) handleExistingContainer(ctx context.Context, containerName string) error {
// Check the container's state
cj, err := dc.client.ContainerInspect(ctx, containerName)
cj, err := dc.ContainerInfo(ctx, containerName)
if err != nil {
return err
}
Expand All @@ -171,7 +177,7 @@ func (dc *DockerClient) handleExistingContainer(ctx context.Context, containerNa
// Wait for the container to finish restarting
for cj.State.Status == "restarting" {
time.Sleep(100 * time.Millisecond)
cj, err = dc.client.ContainerInspect(ctx, containerName)
cj, err = dc.ContainerInfo(ctx, containerName)
if err != nil {
return err
}
Expand All @@ -190,3 +196,18 @@ func (dc *DockerClient) handleExistingContainer(ctx context.Context, containerNa
return nil
}
}

func (dc *DockerClient) ContainerPort(ctx context.Context, containerName string) (string, error) {
containerInfo, err := dc.ContainerInfo(ctx, containerName)
if err != nil {
return "", err
}

// Extract the assigned port
assignedPort := containerInfo.NetworkSettings.Ports["3306/tcp"][0].HostPort
return assignedPort, nil
}

func (dc *DockerClient) ContainerName(imageName string) string {
return fmt.Sprintf("/%s%s", constant.ContainerNamePrefix, imageName)
}
33 changes: 33 additions & 0 deletions docker_api/docker_interaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dockerapi

import (
"cdc-observer/constant"
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestStartMySQLContainer(t *testing.T) {
dockerClient, err := NewDockerClient()
assert.NoError(t, err, "Failed to create Docker client")

ctx := context.Background()
err = dockerClient.StartMySQLContainer(ctx)
assert.NoError(t, err, "Failed to start MySQL container")

defer func() {
dockerClient.StopAllContainers(ctx)
dockerClient.RemoveAllContainers(ctx)
}()

containerName := dockerClient.ContainerName(constant.MysqlImageName)
cj, err := dockerClient.ContainerInfo(ctx, containerName)
assert.NoError(t, err, "Failed to get MySQL container info")
assert.True(t, cj.State.Running, "MySQL container should be running")

dockerClient.StopAllContainers(ctx)
cj, err = dockerClient.ContainerInfo(ctx, containerName)
assert.NoError(t, err, "Failed to get MySQL container info")
assert.False(t, cj.State.Running, "MySQL container should be stopped")
}
Loading

0 comments on commit f5540b5

Please sign in to comment.