diff --git a/cdc-observer.go b/cdc-observer.go index 9edcf3c..f682484 100644 --- a/cdc-observer.go +++ b/cdc-observer.go @@ -33,7 +33,10 @@ func NewCDCObserver(ctx context.Context, opt *Options) (*CDCObserver, error) { if opt.EnableDocker { observer.enableDocker = true observer.containername = opt.ContainerName - dockerClient := NewDockerClient(observer.dbName) + dockerClient, err := NewDockerClient() + if err != nil { + return nil, err + } observer.dockerClient = dockerClient } observer.containerPort = opt.ContainerPort @@ -70,7 +73,7 @@ func (ob *CDCObserver) Start(ctx context.Context) error { func (ob *CDCObserver) Close(ctx context.Context) error { if ob.enableDocker { - ob.dockerClient.StopAllContianer(ctx) + ob.dockerClient.StopAllContainers(ctx) } ob.river.Close() return nil diff --git a/docker-interaction.go b/docker-interaction.go index 902122a..a702d29 100644 --- a/docker-interaction.go +++ b/docker-interaction.go @@ -6,37 +6,50 @@ import ( "io" "log" "os" + "time" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" ) -const ImageName = "mysql" +// the prefix of the name of the cdc-observer containers, like /cdc-observer-mysql, /cdc-observer-pgsql +const ContainerNamePrefix = "cdc-observer-" + +// the image name of the database instance in the container +const ( + MysqlImageName = "mysql" +) + +// all the databases share this settings +const ( + DatabaseName = "cdc-observer" + DatabaseUsername = "root" + DatabasePassword = "cdc-observer-password" +) type DockerClient struct { - dbName string client *client.Client } -func NewDockerClient(dbName string) *DockerClient { +func NewDockerClient() (*DockerClient, error) { cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { - panic(err) + return nil, err } dockerClient := &DockerClient{ client: cli, - dbName: dbName, } - return dockerClient + return dockerClient, nil } func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error { cli := dc.client // todo implement a common function to check if the image had been download - if !dc.checkIamgeExistence(ctx, ImageName) { - reader, err := cli.ImagePull(ctx, ImageName, image.PullOptions{}) + if !dc.checkImageExistence(ctx, MysqlImageName) { + reader, err := cli.ImagePull(ctx, MysqlImageName, image.PullOptions{}) if err != nil { return err } @@ -52,25 +65,33 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error { "3306/tcp": []nat.PortBinding{ { HostIP: "0.0.0.0", - HostPort: "3307", + HostPort: "0", // use 0 to let docker automatically choose a free port }, }, }, } - // todo can only create one container and then start it in the future + containerName := fmt.Sprintf("/%s%s", ContainerNamePrefix, MysqlImageName) + + // Check if the container named /cdc-observer-mysql already exists + if exists, err := dc.checkContainerExistence(ctx, containerName); err != nil { + return err + } else if exists { + if err := dc.handleExistingContainer(ctx, containerName); err != nil { + return err + } + return nil + } + + // If the container doesn't exist, create it resp, err := cli.ContainerCreate(ctx, &container.Config{ - Image: ImageName, - // todo check why the this is not actually the container name, and also find the approach to set the customized container name - Hostname: "mysql:cdc-observer:" + RandStringBytesMaskImpr(10), + Image: MysqlImageName, Env: []string{ - // todo find out why the username setting here does work, but it's not a big matter - "MYSQL_ROOT_USERNAME=elliot_test", - "MYSQL_ROOT_PASSWORD=123456", - fmt.Sprintf("MYSQL_DATABASE=%s", dc.dbName), + fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", DatabasePassword), + fmt.Sprintf("MYSQL_DATABASE=%s", DatabaseName), }, Tty: false, - }, hostConfig, nil, nil, "") + }, hostConfig, nil, nil, containerName) if err != nil { return err } @@ -90,31 +111,82 @@ func (dc *DockerClient) StartMySQLContainer(ctx context.Context) error { // stdcopy.StdCopy(os.Stdout, os.Stderr, out) return nil - } -func (dc *DockerClient) StopAllContianer(ctx context.Context) { - cli := dc.client - - containers, err := cli.ContainerList(ctx, container.ListOptions{All: true}) +func (dc *DockerClient) StopAllContainers(ctx context.Context) { + containers, err := dc.containers(ctx) if err != nil { - log.Fatalf("failed to list containers: %v", err) + log.Printf("failed to list containers: %v", err) } for _, c := range containers { if c.State == "running" { - err := cli.ContainerStop(ctx, c.ID, container.StopOptions{}) + err := dc.client.ContainerStop(ctx, c.ID, container.StopOptions{}) if err != nil { log.Printf("failed to stop container %s: %v", c.ID, err) } else { - fmt.Printf("stopped container %s\n", c.ID) + log.Printf("stopped container %s\n", c.ID) } } } } -func (dc *DockerClient) checkIamgeExistence(ctx context.Context, imageName string) bool { - cli := dc.client - _, _, err := cli.ImageInspectWithRaw(ctx, imageName) +func (dc *DockerClient) checkImageExistence(ctx context.Context, imageName string) bool { + _, _, err := dc.client.ImageInspectWithRaw(ctx, imageName) return err == nil } + +func (dc *DockerClient) checkContainerExistence(ctx context.Context, containerName string) (bool, error) { + if _, err := dc.client.ContainerInspect(ctx, containerName); err != nil { + if client.IsErrNotFound(err) { + return false, nil + } + return false, err + } + return true, nil +} + +func (dc *DockerClient) containers(ctx context.Context) ([]types.Container, error) { + return dc.client.ContainerList(ctx, container.ListOptions{All: true}) +} + +func (dc *DockerClient) handleExistingContainer(ctx context.Context, containerName string) error { + // Check the container's state + cj, err := dc.client.ContainerInspect(ctx, containerName) + if err != nil { + return err + } + + switch cj.State.Status { + case "running": + // If the container is running, do nothing + return nil + case "exited", "created", "paused": + // If the container exists but is not running, start it + if err := dc.client.ContainerStart(ctx, containerName, container.StartOptions{}); err != nil { + return err + } + return nil + case "restarting": + // Wait for the container to finish restarting + for cj.State.Status == "restarting" { + time.Sleep(100 * time.Millisecond) + cj, err = dc.client.ContainerInspect(ctx, containerName) + if err != nil { + return err + } + } + if cj.State.Status != "running" { + if err := dc.client.ContainerStart(ctx, containerName, container.StartOptions{}); err != nil { + return err + } + } + return nil + default: + // For any other state, attempt to start the container + if err := dc.client.ContainerStart(ctx, containerName, container.StartOptions{}); err != nil { + return err + } + return nil + } +} diff --git a/docker-interaction_test.go b/docker-interaction_test.go index 8caa90f..b1da55d 100644 --- a/docker-interaction_test.go +++ b/docker-interaction_test.go @@ -6,7 +6,7 @@ import ( ) func TestStartMySQLContainer(t *testing.T) { - dockerClient := DockerClient{} + dockerClient, _ := NewDockerClient() ctx := context.Background() - dockerClient.StartMySQLContainer(ctx) + _ = dockerClient.StartMySQLContainer(ctx) } diff --git a/example/main.go b/example/main.go index c252b8e..06de08b 100644 --- a/example/main.go +++ b/example/main.go @@ -6,7 +6,7 @@ import ( ) func main() { - dockerClient := cdc.NewDockerClient("elliot_test_name") + dockerClient, _ := cdc.NewDockerClient() ctx := context.Background() - dockerClient.StartMySQLContainer(ctx) + _ = dockerClient.StartMySQLContainer(ctx) }