Skip to content

Commit

Permalink
driver: allow non-cached client
Browse files Browse the repository at this point in the history
Adds ability to opt-out usage of the cached client in driver
handler. This implies that the caller of Client without cache
needs to properly release the connection.

Signed-off-by: CrazyMax <[email protected]>
  • Loading branch information
crazy-max committed Oct 12, 2023
1 parent 13e1fba commit 855b686
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 22 deletions.
27 changes: 20 additions & 7 deletions builder/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (b *Builder) Nodes() []Node {
type LoadNodesOption func(*loadNodesOptions)

type loadNodesOptions struct {
data bool
data bool
cachedClient bool
}

func WithData() LoadNodesOption {
Expand All @@ -54,11 +55,18 @@ func WithData() LoadNodesOption {
}
}

func WithCachedClient(enabled bool) LoadNodesOption {
return func(o *loadNodesOptions) {
o.cachedClient = enabled
}
}

// LoadNodes loads and returns nodes for this builder.
// TODO: this should be a method on a Node object and lazy load data for each driver.
func (b *Builder) LoadNodes(ctx context.Context, opts ...LoadNodesOption) (_ []Node, err error) {
lno := loadNodesOptions{
data: false,
data: false,
cachedClient: true,
}
for _, opt := range opts {
opt(&lno)
Expand Down Expand Up @@ -141,7 +149,7 @@ func (b *Builder) LoadNodes(ctx context.Context, opts ...LoadNodesOption) (_ []N
node.ImageOpt = imageopt

if lno.data {
if err := node.loadData(ctx); err != nil {
if err := node.loadData(ctx, lno.cachedClient); err != nil {
node.Err = err
}
}
Expand Down Expand Up @@ -192,7 +200,7 @@ func (b *Builder) LoadNodes(ctx context.Context, opts ...LoadNodesOption) (_ []N
return b.nodes, nil
}

func (n *Node) loadData(ctx context.Context) error {
func (n *Node) loadData(ctx context.Context, cachedClient bool) error {
if n.Driver == nil {
return nil
}
Expand All @@ -202,11 +210,15 @@ func (n *Node) loadData(ctx context.Context) error {
}
n.DriverInfo = info
if n.DriverInfo.Status == driver.Running {
driverClient, err := n.Driver.Client(ctx)
c, err := n.Driver.Client(ctx, driver.WithCachedClient(cachedClient))
if err != nil {
return err
}
workers, err := driverClient.ListWorkers(ctx)
if !cachedClient {
defer c.Close()
}

workers, err := c.ListWorkers(ctx)
if err != nil {
return errors.Wrap(err, "listing workers")
}
Expand All @@ -220,7 +232,8 @@ func (n *Node) loadData(ctx context.Context) error {
}
sort.Strings(n.IDs)
n.Platforms = platformutil.Dedupe(n.Platforms)
inf, err := driverClient.Info(ctx)

inf, err := c.Info(ctx)
if err != nil {
if st, ok := grpcerrors.AsGRPCStatus(err); ok && st.Code() == codes.Unimplemented {
n.Version, err = n.Driver.Version(ctx)
Expand Down
2 changes: 1 addition & 1 deletion driver/docker-container/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
_, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"})
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions driver/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
opts := []client.ClientOpt{
client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", nil)
Expand Down Expand Up @@ -83,13 +83,13 @@ func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool {
d.features.once.Do(func() {
var useContainerdSnapshotter bool
if c, err := d.Client(ctx); err == nil {
defer c.Close()
workers, _ := c.ListWorkers(ctx)
for _, w := range workers {
if _, ok := w.Labels["org.mobyproject.buildkit.worker.snapshotter"]; ok {
useContainerdSnapshotter = true
}
}
c.Close()
}
d.features.list = map[driver.Feature]bool{
driver.OCIExporter: useContainerdSnapshotter,
Expand Down
6 changes: 3 additions & 3 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ type Driver interface {
Version(context.Context) (string, error)
Stop(ctx context.Context, force bool) error
Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error
Client(ctx context.Context) (*client.Client, error)
Client(ctx context.Context, copts ...ClientOption) (*client.Client, error)
Features(ctx context.Context) map[Feature]bool
HostGatewayIP(ctx context.Context) (net.IP, error)
IsMobyDriver() bool
Config() InitConfig
}

func Boot(ctx, clientContext context.Context, d *DriverHandle, pw progress.Writer) (*client.Client, error) {
func Boot(ctx, clientContext context.Context, d *DriverHandle, pw progress.Writer, copts ...ClientOption) (*client.Client, error) {
try := 0
for {
info, err := d.Info(ctx)
Expand All @@ -83,7 +83,7 @@ func Boot(ctx, clientContext context.Context, d *DriverHandle, pw progress.Write
}
}

c, err := d.Client(clientContext)
c, err := d.Client(clientContext, copts...)
if err != nil {
if errors.Cause(err) == ErrNotRunning && try <= 2 {
continue
Expand Down
2 changes: 1 addition & 1 deletion driver/kubernetes/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
restClient := d.clientset.CoreV1().RESTClient()
restClientConfig, err := d.KubeClientConfig.ClientConfig()
if err != nil {
Expand Down
35 changes: 28 additions & 7 deletions driver/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,39 @@ type DriverHandle struct {
historyAPISupported bool
}

func (d *DriverHandle) Client(ctx context.Context) (*client.Client, error) {
d.once.Do(func() {
d.client, d.err = d.Driver.Client(ctx)
})
return d.client, d.err
func (d *DriverHandle) Client(ctx context.Context, copts ...ClientOption) (*client.Client, error) {
co := ClientOptions{
cachedClient: true,
}
for _, opt := range copts {
opt(&co)
}
if co.cachedClient {
d.once.Do(func() {
d.client, d.err = d.Driver.Client(ctx, copts...)
})
return d.client, d.err
}
return d.Driver.Client(ctx, copts...)
}

func (d *DriverHandle) HistoryAPISupported(ctx context.Context) bool {
func (d *DriverHandle) HistoryAPISupported(ctx context.Context, copts ...ClientOption) bool {
d.historyAPISupportedOnce.Do(func() {
if c, err := d.Client(ctx); err == nil {
if c, err := d.Client(ctx, copts...); err == nil {
d.historyAPISupported = historyAPISupported(ctx, c)
}
})
return d.historyAPISupported
}

type ClientOption func(*ClientOptions)

type ClientOptions struct {
cachedClient bool
}

func WithCachedClient(enabled bool) ClientOption {
return func(o *ClientOptions) {
o.cachedClient = enabled
}
}
2 changes: 1 addition & 1 deletion driver/remote/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
opts := []client.ClientOpt{}

exp, err := detect.Exporter()
Expand Down

0 comments on commit 855b686

Please sign in to comment.