summaryrefslogtreecommitdiffstats
path: root/sandbox/hyper
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/hyper')
-rw-r--r--sandbox/hyper/README.md36
-rwxr-xr-xsandbox/hyper/influx/http/index.js57
-rw-r--r--sandbox/hyper/process/Makefile19
-rw-r--r--sandbox/hyper/process/main.go77
-rwxr-xr-xsandbox/hyper/process/spawn103
-rw-r--r--sandbox/hyper/process/src/hyper/process/Makefile11
-rw-r--r--sandbox/hyper/process/src/hyper/process/process.go132
-rw-r--r--sandbox/hyper/process/test/bc.json11
-rw-r--r--sandbox/hyper/sink/index.js13
9 files changed, 459 insertions, 0 deletions
diff --git a/sandbox/hyper/README.md b/sandbox/hyper/README.md
new file mode 100644
index 00000000..d8fe9d67
--- /dev/null
+++ b/sandbox/hyper/README.md
@@ -0,0 +1,36 @@
+# Overview
+
+## start conductor on port 8888 and sink on 1337
+
+ //hyper/process/main
+ //bin/node //hyper/sink
+
+## create bc process and retrieve it's process id (AKA {path})
+
+ url=http://localhost:8888
+ curl -fvsS --data-binary @//hyper/process/test/bc.json $url/proc
+
+## send data for calculation
+
+ echo 9000+2^42 | curl -fvsS --data-binary @- $url/{path}
+
+## spawn process with http influx and local efflux
+
+hint: maybe run each command in some separate terminal.
+
+ id=dummy sh -x //hyper/process/spawn stdbuf -o 0 sed 's/[^0-9 ]//g'
+ port=3 node //hyper/influx/http //proc/dummy/0
+ cat //proc/dummy/1
+ cat //proc/dummy/2
+ date | curl -fvsS --data-binary @- http://localhost:3
+
+## calculate the square of the current year in a little local hyper sewer system
+
+hint: maybe run each command in some separate terminal.
+
+ id=sqr sh -x //hyper/process/spawn stdbuf -o 0 sed 's/[^0-9]//g;s/.*/(&)^2/'
+ id=bc sh -x //hyper/process/spawn bc
+ port=42 node //hyper/influx/http //proc/sqr/0
+ cat //proc/sqr/1 > //proc/bc/0
+ cat //proc/bc/1
+ date +%Y | curl -fvsS --data-binary @- http://localhost:42
diff --git a/sandbox/hyper/influx/http/index.js b/sandbox/hyper/influx/http/index.js
new file mode 100755
index 00000000..346dde3b
--- /dev/null
+++ b/sandbox/hyper/influx/http/index.js
@@ -0,0 +1,57 @@
+#! /usr/bin/env node
+
+name = '//hyper/influx/http'
+port = process.env.port || 1337
+host = process.env.host || '127.0.0.1'
+
+
+console.info(name);
+
+fs = require('fs');
+path = require('path');
+http = require('http');
+
+fifo_path = path.resolve(process.argv[2] || path.join(process.cwd(), '0'));
+
+// check configuration
+try {
+ (function (stat) {
+ if ((stat.mode & 0010000) === 0) {
+ throw { code: 'E_not_fifo', path: fifo_path };
+ };
+ })(fs.statSync(fifo_path));
+} catch (exn) {
+ console.error(exn);
+ process.exit(23);
+};
+
+process.stdin.destroy();
+fifo = fs.createWriteStream(fifo_path);
+fifo.on('open', function (fd) {
+ console.info('fifo open as fd', fd);
+
+ http.createServer(function (req, res) {
+ var rhost = req.connection.remoteAddress;
+ var rport = req.connection.remotePort;
+ var id = rhost + ':' + rport;
+
+ console.info(id, 'request', req.method, req.url);
+
+ req.on('data', function (data) {
+ console.info(id, 'data', data.length);
+ });
+
+ req.on('end', function (data) {
+ console.info(id, 'end');
+ res.writeHead(202, {
+ 'Content-Length': 0,
+ 'Connection': 'close'
+ });
+ res.end();
+ });
+
+ req.pipe(fifo, { end: false });
+ }).listen(port, host, function () {
+ console.info('server running at http://' + host + ':' + port + '/');
+ });
+});
diff --git a/sandbox/hyper/process/Makefile b/sandbox/hyper/process/Makefile
new file mode 100644
index 00000000..bbc1c2fb
--- /dev/null
+++ b/sandbox/hyper/process/Makefile
@@ -0,0 +1,19 @@
+include $(GOROOT)/src/Make.inc
+
+GCIMPORTS = -I pkg/$(GOOS)_$(GOARCH)
+LDIMPORTS = -L pkg/$(GOOS)_$(GOARCH)
+
+TARG=main
+GOFILES=\
+ main.go\
+
+include $(GOROOT)/src/Make.cmd
+
+export GOPATH := $(PWD)
+.PHONY: prepare
+prepare:
+ #goinstall -v github.com/garyburd/twister/server
+ goinstall -v gorilla.googlecode.com/hg/gorilla/mux
+ goinstall -v $(PWD)/src/hyper/process
+
+_go_.$O: prepare
diff --git a/sandbox/hyper/process/main.go b/sandbox/hyper/process/main.go
new file mode 100644
index 00000000..214dade9
--- /dev/null
+++ b/sandbox/hyper/process/main.go
@@ -0,0 +1,77 @@
+package main
+
+import "json"
+import "log"
+import "http"
+import "gorilla.googlecode.com/hg/gorilla/mux"
+import "os"
+import "fmt"
+import "bytes"
+
+import "hyper/process"
+
+var proc = map[string]*hyper.Process{}
+
+// TODO Retrieve Process, Write, Kill [autokill], get exit code
+
+func RespondJSON(res http.ResponseWriter, v interface{}) os.Error {
+ content, err := json.Marshal(v)
+ if err == nil {
+ log.Printf("< %s", content)
+ res.Header().Set("Content-Type", "application/json; charset=\"utf-8\"")
+ res.WriteHeader(http.StatusOK)
+ res.Write(content)
+ } else {
+ log.Printf("%s while json.Marshal(%s)", err, v)
+ }
+ return err
+}
+
+func CreateProcessHandler(res http.ResponseWriter, req *http.Request) {
+ if p, err := hyper.NewProcess(req); err == nil {
+ id := p.Id()
+ proc[id] = p
+ RespondJSON(res, &map[string]string{
+ "path": fmt.Sprintf("/proc/%s", id),
+ })
+ } else {
+ log.Printf("%s", err)
+ res.WriteHeader(http.StatusInternalServerError)
+ }
+}
+
+func RetrieveProcess(res http.ResponseWriter, req *http.Request) {
+ if p := proc[mux.Vars(req)["id"]]; p != nil {
+ RespondJSON(res, p)
+ } else {
+ res.WriteHeader(http.StatusNotFound)
+ }
+}
+
+func FeedProcess(res http.ResponseWriter, req *http.Request) {
+ if p := proc[mux.Vars(req)["id"]]; p != nil {
+ body := make([]byte, 4096)
+ if _, err := req.Body.Read(body); err == nil {
+ body = bytes.TrimRight(body, string([]byte{0}))
+ p.Write(body)
+ //if err := p.Write(body); err == nil {
+ RespondJSON(res, true)
+ //}
+ }
+ } else {
+ res.WriteHeader(http.StatusNotFound)
+ }
+}
+
+func main() {
+
+ // Gorilla
+ mux.HandleFunc("/proc", CreateProcessHandler).Methods("POST")
+ mux.HandleFunc("/proc/{id}", RetrieveProcess).Methods("GET")
+ mux.HandleFunc("/proc/{id}", FeedProcess).Methods("POST")
+
+ err := http.ListenAndServe("0.0.0.0:8888", mux.DefaultRouter)
+ if err != nil {
+ log.Fatal("ListenAndServe: ", err.String())
+ }
+}
diff --git a/sandbox/hyper/process/spawn b/sandbox/hyper/process/spawn
new file mode 100755
index 00000000..65e94d86
--- /dev/null
+++ b/sandbox/hyper/process/spawn
@@ -0,0 +1,103 @@
+#! /bin/sh
+#
+# [sh -x] spawn [command [argument ...]]
+#
+# export id to create&destroy or reuse the working directory //proc/$id/.
+# this feature is for debug only and marked as deprecated, so don't rely
+# on it too hard.
+#
+spawn() {
+ set -euf
+
+ # establish working subdirectory in //proc. we're mking only
+ # transient dirs, i.e. if we mkdir, then we also defer rmdir.
+ if test -n "${id-}"; then
+ : "using id=$id from env"
+ wd=$pd/$id
+ if ! test -d $wd; then
+ : "make transient $wd/"
+ mkdir $wd
+ defer rmdir $wd
+ elif ! test `ls $wd | wc -l` = 0; then
+ : "$wd/ is not empty!"
+ exit 23
+ else
+ : "reuse existing $wd/"
+ fi
+ else
+ id=`cd $pd && mktemp -d XXXXXXXXXXXXXXXX`
+ wd=$pd/$id
+ defer rmdir $wd
+ : "made transient $wd/"
+ fi
+
+ # change to //proc working directory
+ cwd="$PWD"
+ cd $wd
+ defer cd $cwd
+
+ # create named pipes for the child process's stdio
+ mkfifo 0 1 2
+ defer rm 0 1 2
+
+ # spawn child process
+ ( : "in $PWD/ spawn ${*:-nothing}"
+ set +x # disable debug output so we don't clobber 2
+ exec 0>&- 1>&- 2>&- 0<>0 1<>1 2<>2
+ cd "$cwd"
+ exec "$@") &
+ pid=$!
+
+ # setup a trap to kill the child process if this (parent) process dies
+ defer kill $pid
+
+ # store misc. info.
+ ln -snf $cwd cwd
+ echo $id >id
+ echo $$ >ppid
+ echo $pid >pid
+ defer rm cwd id pid ppid
+
+ # wait for the child process's
+ set +e
+ wait $pid
+ code=$?
+ set -e
+
+ # the child is already dead
+ cancel kill $pid
+
+ # return the same way wait did
+ (exit $code)
+}
+
+#
+# defer [command [argument ...]]
+#
+# Defer execution of a command. Deferred commands are executed in LIFO
+# order immediately before the script terminates. See (golang's defer
+# statement for more information how this should work).
+#
+defer() {
+ defer="$*${defer+
+$defer}"
+}
+
+#
+# cancel [command [argument ...]]
+#
+# Cancel a deferred command. The arguments have to match exactly a
+# prior defer call or else chaos and mayhem shall haunt thee and shi-
+#
+cancel() {
+ defer="`echo "$defer" | grep -Fxv "$*"`"
+}
+
+# setup //proc directory
+pd=/tmp/krebs/proc
+mkdir -p $pd
+test -w $pd
+
+# setup deferred execution and spawn command
+trap 'eval "${defer-}"; defer=' EXIT INT TERM
+spawn "$@"
diff --git a/sandbox/hyper/process/src/hyper/process/Makefile b/sandbox/hyper/process/src/hyper/process/Makefile
new file mode 100644
index 00000000..7ecda716
--- /dev/null
+++ b/sandbox/hyper/process/src/hyper/process/Makefile
@@ -0,0 +1,11 @@
+include ${GOROOT}/src/Make.inc
+
+TARG=hyper/process
+
+GOFILES=\
+ process.go\
+
+#DEPS=\
+# gorilla.googlecode.com/hg/gorilla/context\
+
+include ${GOROOT}/src/Make.pkg
diff --git a/sandbox/hyper/process/src/hyper/process/process.go b/sandbox/hyper/process/src/hyper/process/process.go
new file mode 100644
index 00000000..18cf55fb
--- /dev/null
+++ b/sandbox/hyper/process/src/hyper/process/process.go
@@ -0,0 +1,132 @@
+package hyper
+
+import "fmt"
+import "http"
+import "bytes"
+import "json"
+import "os"
+
+type Process struct {
+ Path string `json:"path"`
+ Argv []string `json:"argv"`
+ Envp map[string]string `json:"envp"`
+ //Stdin string `json:"stdin"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ process *os.Process
+ process_stdin *os.File
+ process_stdout *os.File
+ process_stderr *os.File
+ id string
+ client http.Client
+}
+
+func (p *Process) Id() string {
+ return p.id
+}
+
+func NewProcess(req *http.Request) (*Process, os.Error) {
+ body := make([]byte, 4096)
+ _, err := req.Body.Read(body)
+ if err != nil {
+ return nil, err
+ }
+
+ body = bytes.TrimRight(body, string([]byte{0}))
+
+ var p Process
+
+ if err := json.Unmarshal(body, &p); err != nil {
+ return nil, err
+ }
+
+ p.id = gensym()
+
+ if err := p.Start(); err != nil {
+ return nil, err
+ }
+
+ return &p, nil
+}
+
+func (hp *Process) Write(b []byte) {
+ n, err := hp.process_stdin.Write(b)
+ if err != nil {
+ fmt.Printf("Write: %s\n", err)
+ } else {
+ fmt.Printf("Wrote: %d bytes\n", n)
+ }
+}
+
+func (hp *Process) Start() os.Error {
+ var name = hp.Path //os.Args[1] //"/usr/b"
+ var argv = hp.Argv //os.Args[1:] //[]string{ "bc" }
+ //var chroot = false
+ //var dir = "/var/empty"
+ var files [3][2]*os.File
+ var err os.Error
+
+ for i, _ := range files {
+ files[i][0], files[i][1], err = os.Pipe()
+ if err != nil {
+ return err
+ }
+ }
+
+ var env []string
+ for k, v := range hp.Envp {
+ env = append(env, fmt.Sprintf("%s=%s", k, v))
+ }
+
+ var attr = &os.ProcAttr{
+ //Dir: dir,
+ Env: env, //os.Environ(),
+ Files: []*os.File{ files[0][0], files[1][1], files[2][1]},
+ }
+
+ //var foo, _ = json.Marshal(attr)
+ //fmt.Printf("%s\n", foo)
+
+ hp.process, err = os.StartProcess(name, argv, attr)
+ if err != nil {
+ return err
+ }
+
+ hp.process_stdin = files[0][1]
+ hp.process_stdout = files[1][0]
+ hp.process_stderr = files[2][0]
+
+ for _, file := range attr.Files {
+ file.Close()
+ }
+
+ go hp.reader(hp.process_stdout, hp.Stdout)
+ go hp.reader(hp.process_stderr, hp.Stderr)
+ return nil
+}
+
+func (p *Process) reader(file *os.File, url string) {
+ var b []byte = make([]byte, 1024)
+ var err os.Error = nil
+ for err == nil {
+ var n int
+ n, err = file.Read(b)
+ fmt.Printf("data: %d, %s\n", n, b)
+
+ res, err := p.client.Post(url, "application/octet-stream", bytes.NewBuffer(b))
+ res = res
+ if err != nil {
+ fmt.Printf("EE: %s: %s\n", url, err)
+ }
+ }
+}
+
+func gensym() string {
+ f, _ := os.Open("/dev/urandom")
+ b := make([]byte, 16)
+ f.Read(b)
+ f.Close()
+ uuid := fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
+ return uuid
+}
+
diff --git a/sandbox/hyper/process/test/bc.json b/sandbox/hyper/process/test/bc.json
new file mode 100644
index 00000000..5b3b0721
--- /dev/null
+++ b/sandbox/hyper/process/test/bc.json
@@ -0,0 +1,11 @@
+{
+ "path": "/usr/bin/bc",
+ "argv": [
+ "bc"
+ ],
+ "envp": {
+ "was": "geht"
+ },
+ "stdout": "http://127.0.0.1:1337/",
+ "stderr": "http://127.0.0.1:1337/"
+}
diff --git a/sandbox/hyper/sink/index.js b/sandbox/hyper/sink/index.js
new file mode 100644
index 00000000..b556b88d
--- /dev/null
+++ b/sandbox/hyper/sink/index.js
@@ -0,0 +1,13 @@
+require('http').createServer(function (req, res) {
+
+ req.on('data', function (data) {
+ require('util').puts(data);
+ });
+
+ req.on('end', function () {
+ res.writeHead(200, {'Content-Type': 'text/plain', 'Content-Length': 0});
+ res.end();
+ });
+}).listen(1337, '127.0.0.1', function () {
+ console.log('Running HyperSink at http://127.0.0.1:1337/');
+});