diff options
Diffstat (limited to 'sandbox/hyper')
-rw-r--r-- | sandbox/hyper/README.md | 36 | ||||
-rwxr-xr-x | sandbox/hyper/influx/http/index.js | 57 | ||||
-rw-r--r-- | sandbox/hyper/process/Makefile | 19 | ||||
-rw-r--r-- | sandbox/hyper/process/main.go | 77 | ||||
-rwxr-xr-x | sandbox/hyper/process/spawn | 103 | ||||
-rw-r--r-- | sandbox/hyper/process/src/hyper/process/Makefile | 11 | ||||
-rw-r--r-- | sandbox/hyper/process/src/hyper/process/process.go | 132 | ||||
-rw-r--r-- | sandbox/hyper/process/test/bc.json | 11 | ||||
-rw-r--r-- | sandbox/hyper/sink/index.js | 13 |
9 files changed, 459 insertions, 0 deletions
diff --git a/sandbox/hyper/README.md b/sandbox/hyper/README.md new file mode 100644 index 00000000..d8fe9d67 --- /dev/null +++ b/sandbox/hyper/README.md @@ -0,0 +1,36 @@ +# Overview + +## start conductor on port 8888 and sink on 1337 + + //hyper/process/main + //bin/node //hyper/sink + +## create bc process and retrieve it's process id (AKA {path}) + + url=http://localhost:8888 + curl -fvsS --data-binary @//hyper/process/test/bc.json $url/proc + +## send data for calculation + + echo 9000+2^42 | curl -fvsS --data-binary @- $url/{path} + +## spawn process with http influx and local efflux + +hint: maybe run each command in some separate terminal. + + id=dummy sh -x //hyper/process/spawn stdbuf -o 0 sed 's/[^0-9 ]//g' + port=3 node //hyper/influx/http //proc/dummy/0 + cat //proc/dummy/1 + cat //proc/dummy/2 + date | curl -fvsS --data-binary @- http://localhost:3 + +## calculate the square of the current year in a little local hyper sewer system + +hint: maybe run each command in some separate terminal. + + id=sqr sh -x //hyper/process/spawn stdbuf -o 0 sed 's/[^0-9]//g;s/.*/(&)^2/' + id=bc sh -x //hyper/process/spawn bc + port=42 node //hyper/influx/http //proc/sqr/0 + cat //proc/sqr/1 > //proc/bc/0 + cat //proc/bc/1 + date +%Y | curl -fvsS --data-binary @- http://localhost:42 diff --git a/sandbox/hyper/influx/http/index.js b/sandbox/hyper/influx/http/index.js new file mode 100755 index 00000000..346dde3b --- /dev/null +++ b/sandbox/hyper/influx/http/index.js @@ -0,0 +1,57 @@ +#! /usr/bin/env node + +name = '//hyper/influx/http' +port = process.env.port || 1337 +host = process.env.host || '127.0.0.1' + + +console.info(name); + +fs = require('fs'); +path = require('path'); +http = require('http'); + +fifo_path = path.resolve(process.argv[2] || path.join(process.cwd(), '0')); + +// check configuration +try { + (function (stat) { + if ((stat.mode & 0010000) === 0) { + throw { code: 'E_not_fifo', path: fifo_path }; + }; + })(fs.statSync(fifo_path)); +} catch (exn) { + console.error(exn); + process.exit(23); +}; + +process.stdin.destroy(); +fifo = fs.createWriteStream(fifo_path); +fifo.on('open', function (fd) { + console.info('fifo open as fd', fd); + + http.createServer(function (req, res) { + var rhost = req.connection.remoteAddress; + var rport = req.connection.remotePort; + var id = rhost + ':' + rport; + + console.info(id, 'request', req.method, req.url); + + req.on('data', function (data) { + console.info(id, 'data', data.length); + }); + + req.on('end', function (data) { + console.info(id, 'end'); + res.writeHead(202, { + 'Content-Length': 0, + 'Connection': 'close' + }); + res.end(); + }); + + req.pipe(fifo, { end: false }); + }).listen(port, host, function () { + console.info('server running at http://' + host + ':' + port + '/'); + }); +}); diff --git a/sandbox/hyper/process/Makefile b/sandbox/hyper/process/Makefile new file mode 100644 index 00000000..bbc1c2fb --- /dev/null +++ b/sandbox/hyper/process/Makefile @@ -0,0 +1,19 @@ +include $(GOROOT)/src/Make.inc + +GCIMPORTS = -I pkg/$(GOOS)_$(GOARCH) +LDIMPORTS = -L pkg/$(GOOS)_$(GOARCH) + +TARG=main +GOFILES=\ + main.go\ + +include $(GOROOT)/src/Make.cmd + +export GOPATH := $(PWD) +.PHONY: prepare +prepare: + #goinstall -v github.com/garyburd/twister/server + goinstall -v gorilla.googlecode.com/hg/gorilla/mux + goinstall -v $(PWD)/src/hyper/process + +_go_.$O: prepare diff --git a/sandbox/hyper/process/main.go b/sandbox/hyper/process/main.go new file mode 100644 index 00000000..214dade9 --- /dev/null +++ b/sandbox/hyper/process/main.go @@ -0,0 +1,77 @@ +package main + +import "json" +import "log" +import "http" +import "gorilla.googlecode.com/hg/gorilla/mux" +import "os" +import "fmt" +import "bytes" + +import "hyper/process" + +var proc = map[string]*hyper.Process{} + +// TODO Retrieve Process, Write, Kill [autokill], get exit code + +func RespondJSON(res http.ResponseWriter, v interface{}) os.Error { + content, err := json.Marshal(v) + if err == nil { + log.Printf("< %s", content) + res.Header().Set("Content-Type", "application/json; charset=\"utf-8\"") + res.WriteHeader(http.StatusOK) + res.Write(content) + } else { + log.Printf("%s while json.Marshal(%s)", err, v) + } + return err +} + +func CreateProcessHandler(res http.ResponseWriter, req *http.Request) { + if p, err := hyper.NewProcess(req); err == nil { + id := p.Id() + proc[id] = p + RespondJSON(res, &map[string]string{ + "path": fmt.Sprintf("/proc/%s", id), + }) + } else { + log.Printf("%s", err) + res.WriteHeader(http.StatusInternalServerError) + } +} + +func RetrieveProcess(res http.ResponseWriter, req *http.Request) { + if p := proc[mux.Vars(req)["id"]]; p != nil { + RespondJSON(res, p) + } else { + res.WriteHeader(http.StatusNotFound) + } +} + +func FeedProcess(res http.ResponseWriter, req *http.Request) { + if p := proc[mux.Vars(req)["id"]]; p != nil { + body := make([]byte, 4096) + if _, err := req.Body.Read(body); err == nil { + body = bytes.TrimRight(body, string([]byte{0})) + p.Write(body) + //if err := p.Write(body); err == nil { + RespondJSON(res, true) + //} + } + } else { + res.WriteHeader(http.StatusNotFound) + } +} + +func main() { + + // Gorilla + mux.HandleFunc("/proc", CreateProcessHandler).Methods("POST") + mux.HandleFunc("/proc/{id}", RetrieveProcess).Methods("GET") + mux.HandleFunc("/proc/{id}", FeedProcess).Methods("POST") + + err := http.ListenAndServe("0.0.0.0:8888", mux.DefaultRouter) + if err != nil { + log.Fatal("ListenAndServe: ", err.String()) + } +} diff --git a/sandbox/hyper/process/spawn b/sandbox/hyper/process/spawn new file mode 100755 index 00000000..65e94d86 --- /dev/null +++ b/sandbox/hyper/process/spawn @@ -0,0 +1,103 @@ +#! /bin/sh +# +# [sh -x] spawn [command [argument ...]] +# +# export id to create&destroy or reuse the working directory //proc/$id/. +# this feature is for debug only and marked as deprecated, so don't rely +# on it too hard. +# +spawn() { + set -euf + + # establish working subdirectory in //proc. we're mking only + # transient dirs, i.e. if we mkdir, then we also defer rmdir. + if test -n "${id-}"; then + : "using id=[32;1m$id[m from env" + wd=$pd/$id + if ! test -d $wd; then + : "make transient [32;1m$wd/[m" + mkdir $wd + defer rmdir $wd + elif ! test `ls $wd | wc -l` = 0; then + : "[31;1m$wd/[;31m is not empty![m" + exit 23 + else + : "reuse existing [32;1m$wd/[m" + fi + else + id=`cd $pd && mktemp -d XXXXXXXXXXXXXXXX` + wd=$pd/$id + defer rmdir $wd + : "made transient [32;1m$wd/[m" + fi + + # change to //proc working directory + cwd="$PWD" + cd $wd + defer cd $cwd + + # create named pipes for the child process's stdio + mkfifo 0 1 2 + defer rm 0 1 2 + + # spawn child process + ( : "in [32;1m$PWD/[m spawn [32m${*:-[35;1mnothing}[m" + set +x # disable debug output so we don't clobber 2 + exec 0>&- 1>&- 2>&- 0<>0 1<>1 2<>2 + cd "$cwd" + exec "$@") & + pid=$! + + # setup a trap to kill the child process if this (parent) process dies + defer kill $pid + + # store misc. info. + ln -snf $cwd cwd + echo $id >id + echo $$ >ppid + echo $pid >pid + defer rm cwd id pid ppid + + # wait for the child process's + set +e + wait $pid + code=$? + set -e + + # the child is already dead + cancel kill $pid + + # return the same way wait did + (exit $code) +} + +# +# defer [command [argument ...]] +# +# Defer execution of a command. Deferred commands are executed in LIFO +# order immediately before the script terminates. See (golang's defer +# statement for more information how this should work). +# +defer() { + defer="$*${defer+ +$defer}" +} + +# +# cancel [command [argument ...]] +# +# Cancel a deferred command. The arguments have to match exactly a +# prior defer call or else chaos and mayhem shall haunt thee and shi- +# +cancel() { + defer="`echo "$defer" | grep -Fxv "$*"`" +} + +# setup //proc directory +pd=/tmp/krebs/proc +mkdir -p $pd +test -w $pd + +# setup deferred execution and spawn command +trap 'eval "${defer-}"; defer=' EXIT INT TERM +spawn "$@" diff --git a/sandbox/hyper/process/src/hyper/process/Makefile b/sandbox/hyper/process/src/hyper/process/Makefile new file mode 100644 index 00000000..7ecda716 --- /dev/null +++ b/sandbox/hyper/process/src/hyper/process/Makefile @@ -0,0 +1,11 @@ +include ${GOROOT}/src/Make.inc + +TARG=hyper/process + +GOFILES=\ + process.go\ + +#DEPS=\ +# gorilla.googlecode.com/hg/gorilla/context\ + +include ${GOROOT}/src/Make.pkg diff --git a/sandbox/hyper/process/src/hyper/process/process.go b/sandbox/hyper/process/src/hyper/process/process.go new file mode 100644 index 00000000..18cf55fb --- /dev/null +++ b/sandbox/hyper/process/src/hyper/process/process.go @@ -0,0 +1,132 @@ +package hyper + +import "fmt" +import "http" +import "bytes" +import "json" +import "os" + +type Process struct { + Path string `json:"path"` + Argv []string `json:"argv"` + Envp map[string]string `json:"envp"` + //Stdin string `json:"stdin"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + process *os.Process + process_stdin *os.File + process_stdout *os.File + process_stderr *os.File + id string + client http.Client +} + +func (p *Process) Id() string { + return p.id +} + +func NewProcess(req *http.Request) (*Process, os.Error) { + body := make([]byte, 4096) + _, err := req.Body.Read(body) + if err != nil { + return nil, err + } + + body = bytes.TrimRight(body, string([]byte{0})) + + var p Process + + if err := json.Unmarshal(body, &p); err != nil { + return nil, err + } + + p.id = gensym() + + if err := p.Start(); err != nil { + return nil, err + } + + return &p, nil +} + +func (hp *Process) Write(b []byte) { + n, err := hp.process_stdin.Write(b) + if err != nil { + fmt.Printf("Write: %s\n", err) + } else { + fmt.Printf("Wrote: %d bytes\n", n) + } +} + +func (hp *Process) Start() os.Error { + var name = hp.Path //os.Args[1] //"/usr/b" + var argv = hp.Argv //os.Args[1:] //[]string{ "bc" } + //var chroot = false + //var dir = "/var/empty" + var files [3][2]*os.File + var err os.Error + + for i, _ := range files { + files[i][0], files[i][1], err = os.Pipe() + if err != nil { + return err + } + } + + var env []string + for k, v := range hp.Envp { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + + var attr = &os.ProcAttr{ + //Dir: dir, + Env: env, //os.Environ(), + Files: []*os.File{ files[0][0], files[1][1], files[2][1]}, + } + + //var foo, _ = json.Marshal(attr) + //fmt.Printf("%s\n", foo) + + hp.process, err = os.StartProcess(name, argv, attr) + if err != nil { + return err + } + + hp.process_stdin = files[0][1] + hp.process_stdout = files[1][0] + hp.process_stderr = files[2][0] + + for _, file := range attr.Files { + file.Close() + } + + go hp.reader(hp.process_stdout, hp.Stdout) + go hp.reader(hp.process_stderr, hp.Stderr) + return nil +} + +func (p *Process) reader(file *os.File, url string) { + var b []byte = make([]byte, 1024) + var err os.Error = nil + for err == nil { + var n int + n, err = file.Read(b) + fmt.Printf("data: %d, %s\n", n, b) + + res, err := p.client.Post(url, "application/octet-stream", bytes.NewBuffer(b)) + res = res + if err != nil { + fmt.Printf("EE: %s: %s\n", url, err) + } + } +} + +func gensym() string { + f, _ := os.Open("/dev/urandom") + b := make([]byte, 16) + f.Read(b) + f.Close() + uuid := fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) + return uuid +} + diff --git a/sandbox/hyper/process/test/bc.json b/sandbox/hyper/process/test/bc.json new file mode 100644 index 00000000..5b3b0721 --- /dev/null +++ b/sandbox/hyper/process/test/bc.json @@ -0,0 +1,11 @@ +{ + "path": "/usr/bin/bc", + "argv": [ + "bc" + ], + "envp": { + "was": "geht" + }, + "stdout": "http://127.0.0.1:1337/", + "stderr": "http://127.0.0.1:1337/" +} diff --git a/sandbox/hyper/sink/index.js b/sandbox/hyper/sink/index.js new file mode 100644 index 00000000..b556b88d --- /dev/null +++ b/sandbox/hyper/sink/index.js @@ -0,0 +1,13 @@ +require('http').createServer(function (req, res) { + + req.on('data', function (data) { + require('util').puts(data); + }); + + req.on('end', function () { + res.writeHead(200, {'Content-Type': 'text/plain', 'Content-Length': 0}); + res.end(); + }); +}).listen(1337, '127.0.0.1', function () { + console.log('Running HyperSink at http://127.0.0.1:1337/'); +}); |