Skip to content

Commit

Permalink
Merge pull request #4 from bigboss2063/ref-cdcobserver-init
Browse files Browse the repository at this point in the history
ref: the initialization process of CDCObserver
  • Loading branch information
elliotchenzichang authored Oct 11, 2024
2 parents f5540b5 + 2c5662d commit bf7bc3a
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 231 deletions.
97 changes: 0 additions & 97 deletions cdc-observer.go

This file was deleted.

70 changes: 0 additions & 70 deletions cdc-observer_test.go

This file was deleted.

100 changes: 100 additions & 0 deletions cdcobserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package cdcobserver

import (
"cdc-observer/constant"
dockerapi "cdc-observer/docker_api"
"cdc-observer/handler"
"context"
"errors"
"fmt"

"cdc-observer/database"

"github.com/go-mysql-org/go-mysql/canal"
)

type CDCObserver struct {
river *canal.Canal
dc *dockerapi.DockerClient
db *database.Database // Only support MySQL for now
}

func NewCDCObserver() (*CDCObserver, error) {
observer := &CDCObserver{}
dockerClient, err := dockerapi.NewDockerClient()
if err != nil {
return nil, err
}
observer.dc = dockerClient
return observer, nil
}

func (ob *CDCObserver) Start(ctx context.Context) error {
// Start the MySQL container
if err := ob.dc.StartMySQLContainer(ctx); err != nil {
return err
}

// Get the container name and port
containerName := ob.dc.ContainerName(constant.MysqlImageName)
port, err := ob.dc.ContainerPort(ctx, containerName)
if err != nil {
return err
}

// Create a new database connection
db, err := database.NewDatabase(port)
if err != nil {
return err
}
ob.db = db

// Configure and create a new Canal instance
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%s", constant.DatabaseHost, port)
cfg.User = constant.DatabaseUsername
cfg.Password = constant.DatabasePassword
cfg.Dump.TableDB = constant.DatabaseName
cfg.Dump.Tables = []string{}

c, err := canal.NewCanal(cfg)
if err != nil {
return err
}

// Check if the Canal instance was created successfully
if c == nil {
return errors.New("the river is empty, please check if your database enable the binlog and the log style is ROW")
}

// Set the event handler and start the Canal
c.SetEventHandler(&handler.CDCObserverHandler{})
ob.river = c
if err := ob.river.Run(); err != nil {
return err
}

return nil
}

func (ob *CDCObserver) Close(ctx context.Context) error {
ob.dc.StopAllContainers(ctx)
ob.river.Close()
return nil
}

func (ob *CDCObserver) AddTable(name string, table *database.Table) error {
return ob.db.AddTable(table)
}

func (ob *CDCObserver) DeleteTable(name string) error {
return ob.db.DeleteTable(name)
}

func (ob *CDCObserver) ApplyDB() error {
return ob.db.Apply()
}

func (ob *CDCObserver) Clean() error {
return ob.db.Clean()
}
24 changes: 24 additions & 0 deletions cdcobserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cdcobserver

import (
"context"
"testing"
)

func TestSyncCDCChangeFromDatabase(t *testing.T) {
ctx := context.Background()
cdcObserver, err := NewCDCObserver()
if err != nil {
t.Fatal(err)
}
err = cdcObserver.Start(ctx)
if err != nil {
t.Fatal(err)
}
t.Log("start to listen the change event from local MySQL")

err = cdcObserver.Close(ctx)
if err != nil {
t.Fatal(err)
}
}
4 changes: 2 additions & 2 deletions constant/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ const (
DatabaseHost = "0.0.0.0"
DatabaseName = "cdc-observer"
DatabaseUsername = "root"
DatabasePassword = "cdc-observer-password"
DatabasePassword = ""
)

// retry times and interval for the database connection
const (
RetryTimes = 10
RetryTimes = 30
RetryInterval = 1 * time.Second
)
56 changes: 0 additions & 56 deletions docker-api-poc/docker_api_hello_world.go

This file was deleted.

7 changes: 6 additions & 1 deletion docker_api/docker_interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {
resp, err := cli.ContainerCreate(ctx, &container.Config{
Image: constant.MysqlImageName,
Env: []string{
fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", constant.DatabasePassword),
"MYSQL_ALLOW_EMPTY_PASSWORD=true",
fmt.Sprintf("MYSQL_DATABASE=%s", constant.DatabaseName),
},
Cmd: []string{
"--server-id=1", // to avoid conflict with canal
"--log-bin=/var/lib/mysql/mysql-bin.log", // enable binlog
"--binlog-format=ROW", // set binlog format to ROW
},
Tty: false,
}, hostConfig, nil, nil, containerName)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion docker_api/docker_interaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func TestStartMySQLContainer(t *testing.T) {
assert.NoError(t, err, "Failed to start MySQL container")

defer func() {
dockerClient.StopAllContainers(ctx)
dockerClient.RemoveAllContainers(ctx)
}()

Expand Down
Loading

0 comments on commit bf7bc3a

Please sign in to comment.