Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
plugin: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
justenwalker committed Sep 3, 2019
1 parent ea95a86 commit 596ac5e
Show file tree
Hide file tree
Showing 14 changed files with 991 additions and 5 deletions.
18 changes: 18 additions & 0 deletions cmd/plugin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
log "github.com/hashicorp/go-hclog"

"github.com/hashicorp/nomad/plugins"
"github.com/jet/damon/plugin"
)

func main() {
// Serve the plugin
plugins.Serve(factory)
}

// factory returns a new instance of the LXC driver plugin
func factory(log log.Logger) interface{} {
return plugin.NewDriverPlugin(log)
}
45 changes: 43 additions & 2 deletions container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package container
import (
"context"
"fmt"
"os"
"os/exec"
"runtime"
"time"
Expand Down Expand Up @@ -38,8 +39,10 @@ const MinimumCPUMHz = 100
type Container struct {
Name string
StartTime time.Time
PID int
Logger Logger
doneCh chan struct{}
token *win32.Token
job *win32.JobObject
proc *win32.Process
result Result
Expand Down Expand Up @@ -117,7 +120,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
}
container.Logger = logger
if cfg.RestrictedToken {
cfg.Logger.Logln("creating restricted token")
logger.Logln("creating restricted token")
rt, err := token.CreateRestrictedToken(win32.TokenRestrictions{
DisableMaxPrivilege: true,
LUAToken: true,
Expand All @@ -132,7 +135,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
token = rt
}
defer logger.CloseLogError(token, "couldn't closed process token")

container.token = token
proc, err := win32.StartProcess(cmd, win32.AccessToken(token), win32.Suspended)
if err != nil {
return nil, errors.Wrapf(err, "unable to start process")
Expand All @@ -142,6 +145,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
return nil, err
}
container.proc = proc
container.PID = int(container.proc.Pid())
eli := &win32.ExtendedLimitInformation{
KillOnJobClose: true,
}
Expand Down Expand Up @@ -183,6 +187,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
logger.CloseLogError(job, "failed to close JobObject")
return nil, errors.Wrapf(err, "container: Could not resume process main thread")
}
container.StartTime = time.Now()
container.doneCh = make(chan struct{})
go (&container).wait()
return &container, nil
Expand Down Expand Up @@ -328,6 +333,42 @@ func (c *Container) Done() <-chan struct{} {
return c.doneCh
}

func (c *Container) Signal(sig os.Signal) error {
return c.proc.Signal(sig)
}

// Task is a program run in the context of a Container's job object
// that is not the main program

func (c *Container) Exec(cfg TaskConfig) (*Task, error) {
if len(cfg.Command) == 0 {
return nil, fmt.Errorf("exec requires at least 1 argument")
}
name := cfg.Command[0]
var args []string
if len(cfg.Command) > 1 {
args = cfg.Command[1:]
}
ec := exec.Command(name, args...)
ec.Env = cfg.EnvList
ec.Stderr = cfg.Stderr
ec.Stdout = cfg.Stdout
ec.Dir = cfg.Dir
proc, err := win32.StartProcess(ec, win32.AccessToken(c.token), win32.Suspended)
if err != nil {
return nil, err
}
if err := c.job.Assign(proc); err != nil {
c.Logger.Error(proc.Kill(), "unable to kill exec process")
return nil, err
}
if err := proc.Resume(); err != nil {
c.Logger.Error(proc.Kill(), "unable to kill exec process")
return nil, err
}
return &Task{osProcess: proc}, nil
}

func (c *Container) killOnError(err error) error {
if err != nil {
c.Logger.Error(c.proc.Kill(), "unable to kill child process")
Expand Down
49 changes: 49 additions & 0 deletions container/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package container

import (
"context"
"io"

"github.com/jet/damon/win32"
)

type TaskConfig struct {
Command []string
Dir string
EnvList []string
Stdout io.Writer
Stderr io.Writer
}

type Task struct {
osProcess *win32.Process
}

func (t *Task) Wait(ctx context.Context) (int, error) {
exitCh := make(chan int, 1)
errCh := make(chan error, 1)
go func() {
defer close(exitCh)
defer close(errCh)
res, err := t.osProcess.Wait()
if err != nil {
errCh <- err
return
}
if res.Err != nil {
errCh <- res.Err
return
}
exitCh <- res.ExitStatus
}()
select {
case err := <-errCh:
t.osProcess.Kill()
return -1, err
case res := <-exitCh:
return res, nil
case <-ctx.Done():
t.osProcess.Kill()
return -1, ctx.Err()
}
}
8 changes: 6 additions & 2 deletions make.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ Param(
[Parameter(ParameterSetName='Build')]
[switch]$Build,

[Parameter(ParameterSetName='Build')]
[string]$OutFile = "damon.exe"
[Parameter(ParameterSetName = 'Build')]
[string]$OutFile = "damon.exe",

[Parameter(ParameterSetName = 'Build')]
[string]$PluginOutFile = "damon-plugin.exe"
)

$GOLANG_LINT_VERSION="1.10.2"
Expand Down Expand Up @@ -78,6 +81,7 @@ if($Build) {

Write-Host $gcflags
go.exe build -o $OutFile -ldflags="$ldflags" ./cmd/standalone
go.exe build -o $PluginOutFile -ldflags="$ldflags" ./cmd/plugin
exit $LASTEXITCODE
}

2 changes: 1 addition & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (c *CPUCollector) Sample(m CPUMeasurement) CPUSample {
c.lock.Unlock()

// total cpu time = total time * num cores
ttime := (m.TotalTime - t0) * time.Duration(c.Cores)
ttime := (m.TotalTime - t0)
tmhz := c.MHzPerCore * float64(c.Cores)

kperc := float64(m.KernelTime-k0) / float64(ttime)
Expand Down
148 changes: 148 additions & 0 deletions plugin/damon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package plugin

import (
// "fmt"

"fmt"
"io"
"os/exec"
"strings"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/fifo"
"github.com/jet/damon/container"

"github.com/hashicorp/nomad/plugins/drivers"
)

type damonExec struct {
cmd *exec.Cmd
ccfg *container.Config
taskConfig TaskConfig
cfg *drivers.TaskConfig
stdout io.WriteCloser
stderr io.WriteCloser
logger log.Logger
}

type hcLogWrapper struct {
Logger log.Logger
}

func (l hcLogWrapper) Logln(v ...interface{}) {
if l.Logger == nil {
return
}
l.Logger.Debug(fmt.Sprint(v...))
}
func (l hcLogWrapper) Error(err error, msg string) {
if l.Logger == nil {
return
}
l.Logger.Error(msg, "error", err)
}

func getCPUMHz(cfg *drivers.TaskConfig, taskConfig TaskConfig, logger log.Logger) int {
if taskConfig.CPULimit > 0 {
return taskConfig.CPULimit
}
return int(cfg.Resources.NomadResources.Cpu.CpuShares)
}

func getMemoryMB(cfg *drivers.TaskConfig, taskConfig TaskConfig, logger log.Logger) int {
if taskConfig.MemoryLimit > 0 {
return taskConfig.MemoryLimit
}
return int(cfg.Resources.NomadResources.Memory.MemoryMB)
}

func newDamonExec(cfg *drivers.TaskConfig, taskConfig TaskConfig, logger log.Logger) (*damonExec, error) {
var d damonExec
d.ccfg = &container.Config{
Name: cfg.ID,
EnforceCPU: taskConfig.EnforceCPULimit,
CPUMHzLimit: getCPUMHz(cfg, taskConfig, logger),
EnforceMemory: taskConfig.EnforceMemoryLimit,
MemoryMBLimit: getMemoryMB(cfg, taskConfig, logger),
RestrictedToken: taskConfig.RestrictedToken,
CPUHardCap: true,
Logger: hcLogWrapper{Logger: logger},
}
d.cfg = cfg
d.taskConfig = taskConfig
d.cmd = exec.Command(taskConfig.Command, taskConfig.Args...)
d.cmd.Dir = cfg.TaskDir().Dir
d.cmd.Env = cfg.EnvList()
d.logger = logger
return &d, nil
}

func (d *damonExec) startContainer(commandCfg *drivers.TaskConfig) (*taskHandle, error) {
d.logger.Debug("running executable", "task_id", commandCfg.ID, "command", d.cmd.Path, "args", strings.Join(d.cmd.Args, " "))
stdout, err := d.Stdout()
if err != nil {
return nil, err
}
stderr, err := d.Stderr()
if err != nil {
return nil, err
}
cmd := d.cmd
cmd.Stdout = stdout
cmd.Stderr = stderr
c, err := container.RunContained(d.cmd, d.ccfg)
if err != nil {
defer d.Close()
return nil, err
}
return &taskHandle{
container: c,
pid: c.PID,
logger: d.logger,
taskConfig: commandCfg,
startedAt: c.StartTime,
procState: drivers.TaskStateRunning,
}, nil
}

func (d *damonExec) Stdout() (io.Writer, error) {
if d.stdout == nil {
if d.cfg.StdoutPath == "" {
return DevNull, nil
}
stdout, err := fifo.OpenWriter(d.cfg.StdoutPath)
if err != nil {
return nil, fmt.Errorf("failed to open stdout fifo '%s': %v", d.cfg.StdoutPath, err)
}
d.logger.Trace("stdout fifo opened", "path", "task_id", d.cfg.ID, d.cfg.StderrPath)
d.stdout = stdout
}
return d.stdout, nil
}

func (d *damonExec) Stderr() (io.Writer, error) {
if d.stderr == nil {
if d.cfg.StderrPath == "" {
return DevNull, nil
}
stderr, err := fifo.OpenWriter(d.cfg.StderrPath)
if err != nil {
return nil, fmt.Errorf("failed to open stderr fifo '%s': %v", d.cfg.StderrPath, err)
}
d.logger.Trace("stderr fifo opened", "path", "task_id", d.cfg.ID, d.cfg.StderrPath)
d.stderr = stderr
}
return d.stderr, nil
}

func (d *damonExec) Close() {
d.logger.Trace("damon closed", "task_id", d.cfg.ID)
if d.stdout != nil {
d.logger.Trace("stdout fifo closed", "task_id", d.cfg.ID)
d.stdout.Close()
}
if d.stderr != nil {
d.logger.Trace("stderr fifo closed", "task_id", d.cfg.ID)
d.stderr.Close()
}
}
49 changes: 49 additions & 0 deletions plugin/dev_null.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package plugin

type devNull int

// DevNull is a no-op io device
// It implements:
// - io.Reader
// - io.Writer
// - io.Closer (implicitly: io.ReadCloser & io.WriteCloser)
// - io.Seeker (implicitly: io.ReadSeeker & io.WriteSeeker)
// - io.ReaderAt
// - io.WriterAt
// - io.StringWriter
const DevNull = devNull(0)

// Read implements a no-op io.Reader
func (n devNull) Read(p []byte) (int, error) {
return 0, nil
}

// ReadAt implements a no-op io.ReaderAt
func (n devNull) ReadAt(p []byte, off int64) (int, error) {
return 0, nil
}

//Write implements a discarding io.Writer
func (n devNull) Write(p []byte) (int, error) {
return len(p), nil
}

//WriteAt implements a discarding io.WriterAt
func (n devNull) WriteAt(p []byte, off int64) (int, error) {
return len(p), nil
}

// WriteString implements a discarding io.StringWriter
func (n devNull) WriteString(s string) (int, error) {
return len(s), nil
}

// Close implements a no-op io.Closer
func (n devNull) Close() error {
return nil
}

// Seek implements a no-op io.Seeker
func (n devNull) Seek(offset, whence int) (int64, error) {
return 0, nil
}
Loading

0 comments on commit 596ac5e

Please sign in to comment.