Skip to content

Commit

Permalink
feat: improve the startup process of the MySQL container
Browse files Browse the repository at this point in the history
  • Loading branch information
bigboss2063 committed Oct 8, 2024
1 parent 400c5cf commit 8a6d05d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 35 deletions.
7 changes: 5 additions & 2 deletions cdc-observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func NewCDCObserver(ctx context.Context, opt *Options) (*CDCObserver, error) {
if opt.EnableDocker {
observer.enableDocker = true
observer.containername = opt.ContainerName
dockerClient := NewDockerClient(observer.dbName)
dockerClient, err := NewDockerClient()
if err != nil {
return nil, err
}
observer.dockerClient = dockerClient
}
observer.containerPort = opt.ContainerPort
Expand Down Expand Up @@ -70,7 +73,7 @@ func (ob *CDCObserver) Start(ctx context.Context) error {

func (ob *CDCObserver) Close(ctx context.Context) error {
if ob.enableDocker {
ob.dockerClient.StopAllContianer(ctx)
ob.dockerClient.StopAllContainers(ctx)
}
ob.river.Close()
return nil
Expand Down
130 changes: 101 additions & 29 deletions docker-interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,50 @@ import (
"io"
"log"
"os"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
)

const ImageName = "mysql"
// the prefix of the name of the cdc-observer containers, like /cdc-observer-mysql, /cdc-observer-pgsql
const ContainerNamePrefix = "cdc-observer-"

// the image name of the database instance in the container
const (
MysqlImageName = "mysql"
)

// all the databases share this settings
const (
DatabaseName = "cdc-observer"
DatabaseUsername = "root"
DatabasePassword = "cdc-observer-password"
)

type DockerClient struct {
dbName string
client *client.Client
}

func NewDockerClient(dbName string) *DockerClient {
func NewDockerClient() (*DockerClient, error) {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
return nil, err
}
dockerClient := &DockerClient{
client: cli,
dbName: dbName,
}
return dockerClient
return dockerClient, nil
}

func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {
cli := dc.client
// todo implement a common function to check if the image had been download
if !dc.checkIamgeExistence(ctx, ImageName) {
reader, err := cli.ImagePull(ctx, ImageName, image.PullOptions{})
if !dc.checkImageExistence(ctx, MysqlImageName) {
reader, err := cli.ImagePull(ctx, MysqlImageName, image.PullOptions{})
if err != nil {
return err
}
Expand All @@ -52,25 +65,33 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {
"3306/tcp": []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostPort: "3307",
HostPort: "0", // use 0 to let docker automatically choose a free port
},
},
},
}

// todo can only create one container and then start it in the future
containerName := fmt.Sprintf("/%s%s", ContainerNamePrefix, MysqlImageName)

// Check if the container named /cdc-observer-mysql already exists
if exists, err := dc.checkContainerExistence(ctx, containerName); err != nil {
return err
} else if exists {
if err := dc.handleExistingContainer(ctx, containerName); err != nil {
return err
}
return nil
}

// If the container doesn't exist, create it
resp, err := cli.ContainerCreate(ctx, &container.Config{
Image: ImageName,
// todo check why the this is not actually the container name, and also find the approach to set the customized container name
Hostname: "mysql:cdc-observer:" + RandStringBytesMaskImpr(10),
Image: MysqlImageName,
Env: []string{
// todo find out why the username setting here does work, but it's not a big matter
"MYSQL_ROOT_USERNAME=elliot_test",
"MYSQL_ROOT_PASSWORD=123456",
fmt.Sprintf("MYSQL_DATABASE=%s", dc.dbName),
fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", DatabasePassword),
fmt.Sprintf("MYSQL_DATABASE=%s", DatabaseName),
},
Tty: false,
}, hostConfig, nil, nil, "")
}, hostConfig, nil, nil, containerName)
if err != nil {
return err
}
Expand All @@ -90,31 +111,82 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error {

// stdcopy.StdCopy(os.Stdout, os.Stderr, out)
return nil

}

func (dc *DockerClient) StopAllContianer(ctx context.Context) {
cli := dc.client

containers, err := cli.ContainerList(ctx, container.ListOptions{All: true})
func (dc *DockerClient) StopAllContainers(ctx context.Context) {
containers, err := dc.containers(ctx)
if err != nil {
log.Fatalf("failed to list containers: %v", err)
log.Printf("failed to list containers: %v", err)
}

for _, c := range containers {
if c.State == "running" {
err := cli.ContainerStop(ctx, c.ID, container.StopOptions{})
err := dc.client.ContainerStop(ctx, c.ID, container.StopOptions{})
if err != nil {
log.Printf("failed to stop container %s: %v", c.ID, err)
} else {
fmt.Printf("stopped container %s\n", c.ID)
log.Printf("stopped container %s\n", c.ID)
}
}
}
}

func (dc *DockerClient) checkIamgeExistence(ctx context.Context, imageName string) bool {
cli := dc.client
_, _, err := cli.ImageInspectWithRaw(ctx, imageName)
func (dc *DockerClient) checkImageExistence(ctx context.Context, imageName string) bool {
_, _, err := dc.client.ImageInspectWithRaw(ctx, imageName)
return err == nil
}

func (dc *DockerClient) checkContainerExistence(ctx context.Context, containerName string) (bool, error) {
if _, err := dc.client.ContainerInspect(ctx, containerName); err != nil {
if client.IsErrNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}

func (dc *DockerClient) containers(ctx context.Context) ([]types.Container, error) {
return dc.client.ContainerList(ctx, container.ListOptions{All: true})
}

func (dc *DockerClient) handleExistingContainer(ctx context.Context, containerName string) error {
// Check the container's state
cj, err := dc.client.ContainerInspect(ctx, containerName)
if err != nil {
return err
}

switch cj.State.Status {
case "running":
// If the container is running, do nothing
return nil
case "exited", "created", "paused":
// If the container exists but is not running, start it
if err := dc.client.ContainerStart(ctx, containerName, container.StartOptions{}); err != nil {
return err
}
return nil
case "restarting":
// Wait for the container to finish restarting
for cj.State.Status == "restarting" {
time.Sleep(100 * time.Millisecond)
cj, err = dc.client.ContainerInspect(ctx, containerName)
if err != nil {
return err
}
}
if cj.State.Status != "running" {
if err := dc.client.ContainerStart(ctx, containerName, container.StartOptions{}); err != nil {
return err
}
}
return nil
default:
// For any other state, attempt to start the container
if err := dc.client.ContainerStart(ctx, containerName, container.StartOptions{}); err != nil {
return err
}
return nil
}
}
4 changes: 2 additions & 2 deletions docker-interaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestStartMySQLContainer(t *testing.T) {
dockerClient := DockerClient{}
dockerClient, _ := NewDockerClient()
ctx := context.Background()
dockerClient.StartMySQLContainer(ctx)
_ = dockerClient.StartMySQLContainer(ctx)
}
4 changes: 2 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func main() {
dockerClient := cdc.NewDockerClient("elliot_test_name")
dockerClient, _ := cdc.NewDockerClient()
ctx := context.Background()
dockerClient.StartMySQLContainer(ctx)
_ = dockerClient.StartMySQLContainer(ctx)
}

0 comments on commit 8a6d05d

Please sign in to comment.