From 435fa9dddafacf8496a9da45955305353a87c195 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 13 Nov 2023 23:57:12 +0000 Subject: [PATCH] Add dial-stdio command This allows the buildx CLI to act a proxy to the configured instance. It allows external code to use buildx itself as a driver for connecting to buildkitd instances. Instance and node selection should follow the same semantics as as `buildx build`, including taking into account the `BUILDX_BUILDER` env var and the `--builder` global flag. Signed-off-by: Brian Goff --- build/dial.go | 46 ++++++++++++++++ commands/dial_stdio.go | 90 +++++++++++++++++++++++++++++++ commands/root.go | 1 + driver/docker-container/driver.go | 11 +++- driver/docker/driver.go | 4 ++ driver/driver.go | 1 + driver/kubernetes/driver.go | 7 ++- driver/remote/driver.go | 21 +++++++- tests/dialstdio.go | 71 ++++++++++++++++++++++++ tests/integration_test.go | 1 + 10 files changed, 248 insertions(+), 5 deletions(-) create mode 100644 build/dial.go create mode 100644 commands/dial_stdio.go create mode 100644 tests/dialstdio.go diff --git a/build/dial.go b/build/dial.go new file mode 100644 index 000000000000..96ace6421648 --- /dev/null +++ b/build/dial.go @@ -0,0 +1,46 @@ +package build + +import ( + "context" + "math/rand" + "net" + + "github.com/docker/buildx/builder" + "github.com/docker/buildx/driver" + "github.com/docker/buildx/util/progress" + "github.com/pkg/errors" +) + +func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer) (net.Conn, error) { + nodes, err := filterAvailableNodes(nodes) + if err != nil { + return nil, err + } + + if len(nodes) == 0 { + return nil, errors.New("no nodes available") + } + + rand.Shuffle(len(nodes), func(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] + }) + + // TODO: filter nodes by platform + for i := 0; i < len(nodes); i++ { + n := nodes[i] + + pw := progress.WithPrefix(pw, n.Name, false) + c, err := driver.Boot(ctx, ctx, n.Driver, pw) + if err != nil { + continue + } + c.Close() + + conn, err := n.Driver.Dial(ctx) + if err != nil { + continue + } + return conn, nil + } + return nil, errors.New("no nodes available") +} diff --git a/commands/dial_stdio.go b/commands/dial_stdio.go new file mode 100644 index 000000000000..282cb4434b34 --- /dev/null +++ b/commands/dial_stdio.go @@ -0,0 +1,90 @@ +package commands + +import ( + "io" + "os" + + "github.com/docker/buildx/build" + "github.com/docker/buildx/builder" + "github.com/docker/buildx/util/progress" + "github.com/docker/cli/cli/command" + "github.com/moby/buildkit/util/appcontext" + "github.com/moby/buildkit/util/progress/progressui" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +type stdioOptions struct { + builder string +} + +func runDialStdio(dockerCli command.Cli, opts stdioOptions) error { + ctx := appcontext.Context() + + contextPathHash, _ := os.Getwd() + b, err := builder.New(dockerCli, + builder.WithName(opts.builder), + builder.WithContextPathHash(contextPathHash), + ) + if err != nil { + return err + } + + if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil { + return errors.Wrapf(err, "failed to update builder last activity time") + } + nodes, err := b.LoadNodes(ctx) + if err != nil { + return err + } + + eg, ctx := errgroup.WithContext(ctx) + + printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.AutoMode) + if err != nil { + return err + } + + return progress.Wrap("[internal] Dialing builder: "+b.Name, printer.Write, func(sub progress.SubLogger) error { + conn, err := build.Dial(ctx, nodes, printer) + if err != nil { + return err + } + defer conn.Close() + + go func() { + <-ctx.Done() + conn.Close() + }() + + eg.Go(func() error { + io.Copy(conn, os.Stdin) + conn.Close() + return nil + }) + eg.Go(func() error { + io.Copy(os.Stdout, conn) + conn.Close() + return nil + }) + eg.Go(printer.Wait) + + return eg.Wait() + }) +} + +func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command { + opts := stdioOptions{} + + cmd := &cobra.Command{ + Use: "dial-stdio", + Short: "Dial stdio", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + opts.builder = rootOpts.builder + return runDialStdio(dockerCli, opts) + }, + } + return cmd +} diff --git a/commands/root.go b/commands/root.go index 43a43a4c10c2..707ca97ad855 100644 --- a/commands/root.go +++ b/commands/root.go @@ -75,6 +75,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) { buildCmd(dockerCli, opts, nil), bakeCmd(dockerCli, opts), createCmd(dockerCli), + dialStdioCmd(dockerCli, opts), rmCmd(dockerCli, opts), lsCmd(dockerCli), useCmd(dockerCli, opts), diff --git a/driver/docker-container/driver.go b/driver/docker-container/driver.go index b79e2f00e9f8..b6ad865d0117 100644 --- a/driver/docker-container/driver.go +++ b/driver/docker-container/driver.go @@ -380,13 +380,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { _, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"}) if err != nil { return nil, err } - conn = demuxConn(conn) + return conn, nil +} + +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { + conn, err := d.Dial(ctx) + if err != nil { + return nil, err + } exp, err := detect.Exporter() if err != nil { diff --git a/driver/docker/driver.go b/driver/docker/driver.go index 0bed41877822..4a09e654234d 100644 --- a/driver/docker/driver.go +++ b/driver/docker/driver.go @@ -55,6 +55,10 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", nil) +} + func (d *Driver) Client(ctx context.Context) (*client.Client, error) { opts := []client.ClientOpt{ client.WithContextDialer(func(context.Context, string) (net.Conn, error) { diff --git a/driver/driver.go b/driver/driver.go index 16d43d7af3ab..6d3c546737c9 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -59,6 +59,7 @@ type Driver interface { Version(context.Context) (string, error) Stop(ctx context.Context, force bool) error Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error + Dial(ctx context.Context) (net.Conn, error) Client(ctx context.Context) (*client.Client, error) Features(ctx context.Context) map[Feature]bool HostGatewayIP(ctx context.Context) (net.IP, error) diff --git a/driver/kubernetes/driver.go b/driver/kubernetes/driver.go index d46448be4eda..6467950b26dc 100644 --- a/driver/kubernetes/driver.go +++ b/driver/kubernetes/driver.go @@ -186,7 +186,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { restClient := d.clientset.CoreV1().RESTClient() restClientConfig, err := d.KubeClientConfig.ClientConfig() if err != nil { @@ -205,7 +205,10 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { if err != nil { return nil, err } + return conn, nil +} +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { exp, err := detect.Exporter() if err != nil { return nil, err @@ -213,7 +216,7 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { var opts []client.ClientOpt opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return conn, nil + return d.Dial(ctx) })) if td, ok := exp.(client.TracerDelegate); ok { opts = append(opts, client.WithTracerDelegate(td)) diff --git a/driver/remote/driver.go b/driver/remote/driver.go index 2efd22636516..43fc04a9e1ee 100644 --- a/driver/remote/driver.go +++ b/driver/remote/driver.go @@ -2,13 +2,15 @@ package remote import ( "context" - "errors" + "fmt" "net" + "strings" "github.com/docker/buildx/driver" "github.com/docker/buildx/util/progress" "github.com/moby/buildkit/client" "github.com/moby/buildkit/util/tracing/detect" + "github.com/pkg/errors" ) type Driver struct { @@ -84,6 +86,23 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { return client.New(ctx, d.InitConfig.EndpointAddr, opts...) } +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + if d.tlsOpts != nil { + // TODO: add TLS support + return nil, errors.New("TLS dialer is not supported yet") + } + + network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://") + if !ok { + return nil, fmt.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr) + } + conn, err := net.Dial(network, addr) + if err != nil { + return nil, errors.Wrap(err, "error ") + } + return conn, nil +} + func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool { return map[driver.Feature]bool{ driver.OCIExporter: true, diff --git a/tests/dialstdio.go b/tests/dialstdio.go new file mode 100644 index 000000000000..643c4ed58c36 --- /dev/null +++ b/tests/dialstdio.go @@ -0,0 +1,71 @@ +package tests + +import ( + "bytes" + "context" + "net" + "os/exec" + "testing" + + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/util/testutil/integration" + "github.com/pkg/errors" +) + +var dialstdioTests = []func(t *testing.T, sb integration.Sandbox){ + testDialStdio, +} + +func testDialStdio(t *testing.T, sb integration.Sandbox) { + errBuf := bytes.NewBuffer(nil) + defer func() { + if t.Failed() { + t.Log(errBuf.String()) + } + }() + c, err := client.New(sb.Context(), "", client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + c1, c2 := net.Pipe() + cmd := buildxCmd(sb, withArgs("dial-stdio"), func(cmd *exec.Cmd) { + cmd.Stdin = c1 + cmd.Stdout = c1 + cmd.Stderr = errBuf + }) + + if err := cmd.Start(); err != nil { + c1.Close() + c2.Close() + return nil, errors.Wrap(err, errBuf.String()) + } + + return wrapCmdConn(c2, cmd), nil + })) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + _, err = c.Info(sb.Context()) + if err != nil { + t.Fatal(err) + } +} + +type connWithCloser struct { + net.Conn + closer func() +} + +func (c *connWithCloser) Close() error { + err := c.Conn.Close() + c.closer() + return err +} + +func wrapCmdConn(conn net.Conn, cmd *exec.Cmd) net.Conn { + return &connWithCloser{ + Conn: conn, + closer: func() { + cmd.Process.Kill() + }, + } +} diff --git a/tests/integration_test.go b/tests/integration_test.go index db2e5d447b68..381ef25f244b 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -27,6 +27,7 @@ func TestIntegration(t *testing.T) { tests = append(tests, lsTests...) tests = append(tests, imagetoolsTests...) tests = append(tests, versionTests...) + tests = append(tests, dialstdioTests...) testIntegration(t, tests...) }