Skip to content

Commit

Permalink
Add dial-stdio command
Browse files Browse the repository at this point in the history
This allows the buildx CLI to act a proxy to the configured instance.
It allows external code to use buildx itself as a driver for connecting
to buildkitd instances.

Instance and node selection should follow the same semantics as as
`buildx build`, including taking into account the `BUILDX_BUILDER` env
var and the `--builder` global flag.

Signed-off-by: Brian Goff <[email protected]>
  • Loading branch information
cpuguy83 committed Nov 13, 2023
1 parent 0408f3a commit 435fa9d
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 5 deletions.
46 changes: 46 additions & 0 deletions build/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package build

import (
"context"
"math/rand"
"net"

"github.com/docker/buildx/builder"
"github.com/docker/buildx/driver"
"github.com/docker/buildx/util/progress"
"github.com/pkg/errors"
)

func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer) (net.Conn, error) {
nodes, err := filterAvailableNodes(nodes)
if err != nil {
return nil, err
}

if len(nodes) == 0 {
return nil, errors.New("no nodes available")
}

rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})

// TODO: filter nodes by platform
for i := 0; i < len(nodes); i++ {
n := nodes[i]

pw := progress.WithPrefix(pw, n.Name, false)
c, err := driver.Boot(ctx, ctx, n.Driver, pw)
if err != nil {
continue
}
c.Close()

conn, err := n.Driver.Dial(ctx)
if err != nil {
continue
}
return conn, nil
}
return nil, errors.New("no nodes available")
}
90 changes: 90 additions & 0 deletions commands/dial_stdio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package commands

import (
"io"
"os"

"github.com/docker/buildx/build"
"github.com/docker/buildx/builder"
"github.com/docker/buildx/util/progress"
"github.com/docker/cli/cli/command"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/progress/progressui"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

type stdioOptions struct {
builder string
}

func runDialStdio(dockerCli command.Cli, opts stdioOptions) error {
ctx := appcontext.Context()

contextPathHash, _ := os.Getwd()
b, err := builder.New(dockerCli,
builder.WithName(opts.builder),
builder.WithContextPathHash(contextPathHash),
)
if err != nil {
return err
}

if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil {
return errors.Wrapf(err, "failed to update builder last activity time")
}
nodes, err := b.LoadNodes(ctx)
if err != nil {
return err
}

eg, ctx := errgroup.WithContext(ctx)

printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.AutoMode)
if err != nil {
return err
}

return progress.Wrap("[internal] Dialing builder: "+b.Name, printer.Write, func(sub progress.SubLogger) error {
conn, err := build.Dial(ctx, nodes, printer)
if err != nil {
return err
}
defer conn.Close()

go func() {
<-ctx.Done()
conn.Close()
}()

eg.Go(func() error {
io.Copy(conn, os.Stdin)
conn.Close()
return nil
})
eg.Go(func() error {
io.Copy(os.Stdout, conn)
conn.Close()
return nil
})
eg.Go(printer.Wait)

return eg.Wait()
})
}

func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command {
opts := stdioOptions{}

cmd := &cobra.Command{
Use: "dial-stdio",
Short: "Dial stdio",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
opts.builder = rootOpts.builder
return runDialStdio(dockerCli, opts)
},
}
return cmd
}
1 change: 1 addition & 0 deletions commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) {
buildCmd(dockerCli, opts, nil),
bakeCmd(dockerCli, opts),
createCmd(dockerCli),
dialStdioCmd(dockerCli, opts),
rmCmd(dockerCli, opts),
lsCmd(dockerCli),
useCmd(dockerCli, opts),
Expand Down
11 changes: 9 additions & 2 deletions driver/docker-container/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
_, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"})
if err != nil {
return nil, err
}

conn = demuxConn(conn)
return conn, nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
conn, err := d.Dial(ctx)
if err != nil {
return nil, err
}

exp, err := detect.Exporter()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions driver/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", nil)
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
opts := []client.ClientOpt{
client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
Expand Down
1 change: 1 addition & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Driver interface {
Version(context.Context) (string, error)
Stop(ctx context.Context, force bool) error
Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error
Dial(ctx context.Context) (net.Conn, error)
Client(ctx context.Context) (*client.Client, error)
Features(ctx context.Context) map[Feature]bool
HostGatewayIP(ctx context.Context) (net.IP, error)
Expand Down
7 changes: 5 additions & 2 deletions driver/kubernetes/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
restClient := d.clientset.CoreV1().RESTClient()
restClientConfig, err := d.KubeClientConfig.ClientConfig()
if err != nil {
Expand All @@ -205,15 +205,18 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
if err != nil {
return nil, err
}
return conn, nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
exp, err := detect.Exporter()
if err != nil {
return nil, err
}

var opts []client.ClientOpt
opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return conn, nil
return d.Dial(ctx)
}))
if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td))
Expand Down
21 changes: 20 additions & 1 deletion driver/remote/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package remote

import (
"context"
"errors"
"fmt"
"net"
"strings"

"github.com/docker/buildx/driver"
"github.com/docker/buildx/util/progress"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors"
)

type Driver struct {
Expand Down Expand Up @@ -84,6 +86,23 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
return client.New(ctx, d.InitConfig.EndpointAddr, opts...)
}

func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
if d.tlsOpts != nil {
// TODO: add TLS support
return nil, errors.New("TLS dialer is not supported yet")
}

network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://")
if !ok {
return nil, fmt.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr)
}
conn, err := net.Dial(network, addr)
if err != nil {
return nil, errors.Wrap(err, "error ")
}
return conn, nil
}

func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool {
return map[driver.Feature]bool{
driver.OCIExporter: true,
Expand Down
71 changes: 71 additions & 0 deletions tests/dialstdio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package tests

import (
"bytes"
"context"
"net"
"os/exec"
"testing"

"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/testutil/integration"
"github.com/pkg/errors"
)

var dialstdioTests = []func(t *testing.T, sb integration.Sandbox){
testDialStdio,
}

func testDialStdio(t *testing.T, sb integration.Sandbox) {
errBuf := bytes.NewBuffer(nil)
defer func() {
if t.Failed() {
t.Log(errBuf.String())
}
}()
c, err := client.New(sb.Context(), "", client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
c1, c2 := net.Pipe()
cmd := buildxCmd(sb, withArgs("dial-stdio"), func(cmd *exec.Cmd) {
cmd.Stdin = c1
cmd.Stdout = c1
cmd.Stderr = errBuf
})

if err := cmd.Start(); err != nil {
c1.Close()
c2.Close()
return nil, errors.Wrap(err, errBuf.String())
}

return wrapCmdConn(c2, cmd), nil
}))
if err != nil {
t.Fatal(err)
}
defer c.Close()

_, err = c.Info(sb.Context())
if err != nil {
t.Fatal(err)
}
}

type connWithCloser struct {
net.Conn
closer func()
}

func (c *connWithCloser) Close() error {
err := c.Conn.Close()
c.closer()
return err
}

func wrapCmdConn(conn net.Conn, cmd *exec.Cmd) net.Conn {
return &connWithCloser{
Conn: conn,
closer: func() {
cmd.Process.Kill()
},
}
}
1 change: 1 addition & 0 deletions tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestIntegration(t *testing.T) {
tests = append(tests, lsTests...)
tests = append(tests, imagetoolsTests...)
tests = append(tests, versionTests...)
tests = append(tests, dialstdioTests...)
testIntegration(t, tests...)
}

Expand Down

0 comments on commit 435fa9d

Please sign in to comment.