diff --git a/cli/run.go b/cli/run.go index 6aeb6282f..0a886f9c6 100644 --- a/cli/run.go +++ b/cli/run.go @@ -23,7 +23,6 @@ import ( // serverless registrations "github.com/yomorun/yomo/cli/serverless" - _ "github.com/yomorun/yomo/cli/serverless/deno" _ "github.com/yomorun/yomo/cli/serverless/exec" _ "github.com/yomorun/yomo/cli/serverless/golang" _ "github.com/yomorun/yomo/cli/serverless/nodejs" diff --git a/cli/serverless/deno/mod/mod.ts b/cli/serverless/deno/mod/mod.ts deleted file mode 100644 index 17805706a..000000000 --- a/cli/serverless/deno/mod/mod.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { Reader, Writer } from "https://deno.land/std/types.d.ts"; -import { - readVarnum, - varnumBytes, - VarnumOptions, -} from "https://deno.land/std/encoding/binary.ts"; -import { loadSync } from "https://deno.land/std/dotenv/mod.ts"; - -export class Context { - tag: number; - input: Uint8Array; - private writer: Writer; - - constructor(tag: number, input: Uint8Array, writer: Writer) { - this.tag = tag; - this.input = input; - this.writer = writer; - } - - async write(tag: number, data: Uint8Array) { - await this.writer.write(numberToBytes(tag)); - await writeData(this.writer, data); - } -} - -const VARNUM_OPTIONS: VarnumOptions = { - "dataType": "uint32", - "endian": "little", -}; - -function numberToBytes(val: number): Uint8Array { - return varnumBytes(val, VARNUM_OPTIONS); -} - -async function readNumber(reader: Reader): Promise { - try { - return await readVarnum(reader, VARNUM_OPTIONS); - } catch (e) { - if (e instanceof Deno.errors.UnexpectedEof) { - return null; - } - throw e; - } -} - -async function readData(reader: Reader): Promise { - const length = await readNumber(reader); - if (length == null) { - return null; - } - const buf = new Uint8Array(length); - const n = await reader.read(buf); - if (n == null || n !== length) { - return null; - } - return buf; -} - -async function writeData(writer: Writer, data: Uint8Array) { - await writer.write(numberToBytes(data.length)); - await writer.write(data); -} - -export async function run( - observed: [number], - handler: (ctx: Context) => Promise, -) { - let sock = "./sfn.sock"; - let env = null; - if (Deno.args.length > 0) { - sock = Deno.args[0]; - if (Deno.args.length > 1) { - env = Deno.args[1]; - } - } - - if (env != null) { - loadSync({ - envPath: env, - defaultsPath: "", - examplePath: "", - export: true, - allowEmptyValues: true, - }); - } - - const conn: Deno.UnixConn = await Deno.connect({ - path: sock, - transport: "unix", - }); - - await conn.write(numberToBytes(observed.length)); - for (const tag of observed) { - await conn.write(numberToBytes(tag)); - } - - for (;;) { - const tag = await readNumber(conn); - if (tag == null) { - break; - } - - const data = await readData(conn); - if (data == null) { - break; - } - - const ctx = new Context(tag, data, conn); - await handler(ctx); - - await conn.write(numberToBytes(0)); // tag - await conn.write(numberToBytes(0)); // length - } - - conn.close(); -} diff --git a/cli/serverless/deno/runtime.go b/cli/serverless/deno/runtime.go deleted file mode 100644 index 738b6e6a5..000000000 --- a/cli/serverless/deno/runtime.go +++ /dev/null @@ -1,198 +0,0 @@ -// Package deno provides a js/ts serverless runtime -package deno - -import ( - "encoding/binary" - "errors" - "io" - "log" - "net" - "os" - "os/exec" - "time" - - "github.com/yomorun/yomo" - "github.com/yomorun/yomo/core/frame" - "github.com/yomorun/yomo/pkg/file" - "github.com/yomorun/yomo/serverless" -) - -func listen(path string) (*net.UnixListener, error) { - err := file.Remove(path) - if err != nil { - return nil, err - } - - addr, err := net.ResolveUnixAddr("unix", path) - if err != nil { - return nil, err - } - return net.ListenUnix("unix", addr) -} - -func accept(listener *net.UnixListener) ([]frame.Tag, *net.UnixConn, error) { - defer listener.Close() - - listener.SetUnlinkOnClose(true) - listener.SetDeadline(time.Now().Add(3 * time.Second)) - - conn, err := listener.AcceptUnix() - if err != nil { - return nil, nil, err - } - - conn.SetReadDeadline(time.Now().Add(3 * time.Second)) - var length uint32 - err = binary.Read(conn, binary.LittleEndian, &length) - if err != nil { - conn.Close() - return nil, nil, err - } - - observedBytes := make([]byte, length*4) - _, err = io.ReadFull(conn, observedBytes) - if err != nil { - conn.Close() - return nil, nil, err - } - conn.SetReadDeadline(time.Time{}) - - observed := make([]frame.Tag, length) - for i := 0; i < int(length); i++ { - observed[i] = frame.Tag(binary.LittleEndian.Uint32(observedBytes[i*4 : i*4+4])) - } - - return observed, conn, nil -} - -func runDeno(jsPath string, socketPath string, errCh chan<- error) { - cmd := exec.Command( - "deno", - "run", - "--unstable", - "--allow-read=.,"+socketPath, - "--allow-write=.,"+socketPath, - "--allow-env", - "--allow-net", - jsPath, - socketPath, - ) - cmd.Stdin = os.Stdin - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err := cmd.Run() - if err != nil { - errCh <- err - } -} - -func startSfn(name string, zipperAddr string, credential string, observed []frame.Tag, conn net.Conn, errCh chan<- error) (yomo.StreamFunction, error) { - sfn := yomo.NewStreamFunction( - name, - zipperAddr, - yomo.WithSfnCredential(credential), - ) - - // init - sfn.Init(func() error { - return nil - }) - - sfn.SetObserveDataTags(observed...) - - sfn.SetHandler( - func(ctx serverless.Context) { - tag := ctx.Tag() - err := binary.Write(conn, binary.LittleEndian, tag) - if err != nil { - errCh <- err - return - } - - data := ctx.Data() - err = binary.Write(conn, binary.LittleEndian, uint32(len(data))) - if err != nil { - errCh <- err - return - } - - _, err = conn.Write(data) - if err != nil { - errCh <- err - return - } - - var length uint32 - for { - err := binary.Read(conn, binary.LittleEndian, &tag) - if err != nil { - errCh <- err - return - } - - err = binary.Read(conn, binary.LittleEndian, &length) - if err != nil { - errCh <- err - return - } - - if tag == 0 && length == 0 { - break - } - - data := make([]byte, length) - _, err = io.ReadFull(conn, data) - if err != nil { - errCh <- err - return - } - - ctx.Write(tag, data) - } - }, - ) - - sfn.SetErrorHandler( - func(err error) { - log.Printf("[deno][%s] error handler: %T %v\n", zipperAddr, err, err) - }, - ) - - err := sfn.Connect() - if err != nil { - return nil, err - } - - return sfn, nil -} - -func run(name string, zipperAddr string, credential string, jsPath string, socketPath string) error { - if _, err := exec.LookPath("deno"); err != nil { - return errors.New("[deno] command was not found. For details, visit https://deno.land") - } - - errCh := make(chan error) - - listener, err := listen(socketPath) - if err != nil { - return err - } - - go runDeno(jsPath, socketPath, errCh) - - observed, conn, err := accept(listener) - if err != nil { - return err - } - defer conn.Close() - - sfn, err := startSfn(name, zipperAddr, credential, observed, conn, errCh) - if err != nil { - return err - } - defer sfn.Close() - - err = <-errCh - return err -} diff --git a/cli/serverless/deno/serverless.go b/cli/serverless/deno/serverless.go deleted file mode 100644 index db9cc7078..000000000 --- a/cli/serverless/deno/serverless.go +++ /dev/null @@ -1,42 +0,0 @@ -// Package deno provides a js/ts serverless runtime -package deno - -import ( - "github.com/yomorun/yomo/cli/serverless" -) - -// denoServerless will start deno program to run serverless functions. -type denoServerless struct { - name string - fileName string - zipperAddr string - credential string -} - -// Init initializes the serverless -func (s *denoServerless) Init(opts *serverless.Options) error { - s.name = opts.Name - s.fileName = opts.Filename - s.zipperAddr = opts.ZipperAddr - s.credential = opts.Credential - return nil -} - -// Build is an empty implementation -func (s *denoServerless) Build(clean bool) error { - return nil -} - -// Run the wasm serverless function -func (s *denoServerless) Run(verbose bool) error { - return run(s.name, s.zipperAddr, s.credential, s.fileName, "./"+s.name+".sock") -} - -// Executable shows whether the program needs to be built -func (s *denoServerless) Executable() bool { - return true -} - -func init() { - serverless.Register(&denoServerless{}, ".js") -}