From dcb9737ae58fea03b714675eb4cc8d7b6f52ccf4 Mon Sep 17 00:00:00 2001 From: tv Date: Wed, 14 Sep 2011 03:06:50 +0200 Subject: //hyper/process main.go: first steps --- hyper/process/main.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 hyper/process/main.go (limited to 'hyper') diff --git a/hyper/process/main.go b/hyper/process/main.go new file mode 100644 index 00000000..297be2cf --- /dev/null +++ b/hyper/process/main.go @@ -0,0 +1,52 @@ +package main + +import "fmt" +import "os" + + +func reader(file *os.File) { + var b []byte = make([]byte, 1024) + var err os.Error = nil + for err == nil { + var n int + n, err = file.Read(b) + fmt.Printf("data: %d, %s\n", n, b) + } +} + +func main() { + var name = "/usr/bin/bc" + var argv = []string{ "bc" } + var envv = []string{ "FOO=23" } + //var chroot = false + var dir = "/var/empty" + var files [3][2]*os.File + var err os.Error + + for i, _ := range files { + files[i][0], files[i][1], err = os.Pipe() + err = err + } + + var attr = &os.ProcAttr{ + Dir: dir, + Env: envv, + Files: []*os.File{ /*files[0][0] */ os.Stdin, files[1][1], files[2][1]}, + } + + var p *os.Process + + p, err = os.StartProcess(name, argv, attr) + + for _, file := range attr.Files { + file.Close() + } + + p=p + + go reader(files[1][0]) + reader(files[2][0]) + + fmt.Printf("hello, world\n") + +} -- cgit v1.2.3 From 6cb808fa910cb32a9ec786616be7b821a500e8d8 Mon Sep 17 00:00:00 2001 From: tv Date: Wed, 14 Sep 2011 03:13:20 +0200 Subject: //hyper/process Makefile: initial commit --- hyper/process/Makefile | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 hyper/process/Makefile (limited to 'hyper') diff --git a/hyper/process/Makefile b/hyper/process/Makefile new file mode 100644 index 00000000..7d61b28d --- /dev/null +++ b/hyper/process/Makefile @@ -0,0 +1,14 @@ + +A := 8 + +.PHONY: all clean +all: main + +clean: + rm -f main *.$A + +%.$A: %.go + $Ag $< + +%: %.$A + $Al -o $@ $< -- cgit v1.2.3 From 1e9f32bf57ab45ff1d3a61294e323b9c0cd3309b Mon Sep 17 00:00:00 2001 From: tv Date: Wed, 14 Sep 2011 21:47:41 +0200 Subject: //hyper/process: Emergency Commit #2 --- hyper/process/Makefile | 23 +++--- hyper/process/main.go | 80 +++++++++--------- hyper/process/src/hyper/process/Makefile | 11 +++ hyper/process/src/hyper/process/process.go | 125 +++++++++++++++++++++++++++++ 4 files changed, 194 insertions(+), 45 deletions(-) create mode 100644 hyper/process/src/hyper/process/Makefile create mode 100644 hyper/process/src/hyper/process/process.go (limited to 'hyper') diff --git a/hyper/process/Makefile b/hyper/process/Makefile index 7d61b28d..bbc1c2fb 100644 --- a/hyper/process/Makefile +++ b/hyper/process/Makefile @@ -1,14 +1,19 @@ +include $(GOROOT)/src/Make.inc -A := 8 +GCIMPORTS = -I pkg/$(GOOS)_$(GOARCH) +LDIMPORTS = -L pkg/$(GOOS)_$(GOARCH) -.PHONY: all clean -all: main +TARG=main +GOFILES=\ + main.go\ -clean: - rm -f main *.$A +include $(GOROOT)/src/Make.cmd -%.$A: %.go - $Ag $< +export GOPATH := $(PWD) +.PHONY: prepare +prepare: + #goinstall -v github.com/garyburd/twister/server + goinstall -v gorilla.googlecode.com/hg/gorilla/mux + goinstall -v $(PWD)/src/hyper/process -%: %.$A - $Al -o $@ $< +_go_.$O: prepare diff --git a/hyper/process/main.go b/hyper/process/main.go index 297be2cf..ebeeb6d6 100644 --- a/hyper/process/main.go +++ b/hyper/process/main.go @@ -1,52 +1,60 @@ package main -import "fmt" +import "json" +import "log" +import "http" +import "gorilla.googlecode.com/hg/gorilla/mux" import "os" +import "fmt" + +import "hyper/process" +var proc = map[string] *hyper.Process{} -func reader(file *os.File) { - var b []byte = make([]byte, 1024) - var err os.Error = nil - for err == nil { - var n int - n, err = file.Read(b) - fmt.Printf("data: %d, %s\n", n, b) +// TODO Retrieve Process, Write, Kill [autokill], get exit code + +func RespondJSON(res http.ResponseWriter, v interface{}) os.Error { + content, err := json.Marshal(v) + if err == nil { + log.Printf("< %s", content) + res.Header().Set("Content-Type", "application/json; charset=\"utf-8\"") + res.WriteHeader(http.StatusOK) + res.Write(content) + } else { + log.Printf("%s while json.Marshal(%s)", err, v) } + return err } -func main() { - var name = "/usr/bin/bc" - var argv = []string{ "bc" } - var envv = []string{ "FOO=23" } - //var chroot = false - var dir = "/var/empty" - var files [3][2]*os.File - var err os.Error - - for i, _ := range files { - files[i][0], files[i][1], err = os.Pipe() - err = err +func CreateProcessHandler(res http.ResponseWriter, req *http.Request) { + if p, err := hyper.NewProcess(req); err == nil { + id := p.Id() + proc[id] = p + RespondJSON(res, &map[string]string{ + "path": fmt.Sprintf("/proc/%s", id), + }) + } else { + log.Printf("%s", err) + res.WriteHeader(http.StatusInternalServerError) } +} - var attr = &os.ProcAttr{ - Dir: dir, - Env: envv, - Files: []*os.File{ /*files[0][0] */ os.Stdin, files[1][1], files[2][1]}, +func RetrieveProcess(res http.ResponseWriter, req *http.Request) { + if p := proc[mux.Vars(req)["id"]]; p != nil { + RespondJSON(res, p) + } else { + res.WriteHeader(http.StatusNotFound) } +} - var p *os.Process +func main() { - p, err = os.StartProcess(name, argv, attr) + // Gorilla + mux.HandleFunc("/proc", CreateProcessHandler).Methods("POST") + mux.HandleFunc("/proc/{id}", RetrieveProcess).Methods("GET") - for _, file := range attr.Files { - file.Close() + err := http.ListenAndServe(":8888", mux.DefaultRouter) + if err != nil { + log.Fatal("ListenAndServe: ", err.String()) } - - p=p - - go reader(files[1][0]) - reader(files[2][0]) - - fmt.Printf("hello, world\n") - } diff --git a/hyper/process/src/hyper/process/Makefile b/hyper/process/src/hyper/process/Makefile new file mode 100644 index 00000000..7ecda716 --- /dev/null +++ b/hyper/process/src/hyper/process/Makefile @@ -0,0 +1,11 @@ +include ${GOROOT}/src/Make.inc + +TARG=hyper/process + +GOFILES=\ + process.go\ + +#DEPS=\ +# gorilla.googlecode.com/hg/gorilla/context\ + +include ${GOROOT}/src/Make.pkg diff --git a/hyper/process/src/hyper/process/process.go b/hyper/process/src/hyper/process/process.go new file mode 100644 index 00000000..a52197f0 --- /dev/null +++ b/hyper/process/src/hyper/process/process.go @@ -0,0 +1,125 @@ +package hyper + +import "fmt" +import "http" +import "bytes" +import "json" +import "os" + +type Process struct { + Path string `json:"path"` + Argv []string `json:"argv"` + Envp map[string]string `json:"envp"` + //Stdin string `json:"stdin"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + process *os.Process + process_stdin *os.File + process_stdout *os.File + process_stderr *os.File + id string +} + +func (p *Process) Id() string { + return p.id +} + +func NewProcess(req *http.Request) (*Process, os.Error) { + body := make([]byte, 4096) + _, err := req.Body.Read(body) + if err != nil { + return nil, err + } + + body = bytes.TrimRight(body, string([]byte{0})) + + var p Process + + if err := json.Unmarshal(body, &p); err != nil { + return nil, err + } + + p.id = gensym() + + if err := p.Start(); err != nil { + return nil, err + } + + return &p, nil +} + +func (hp *Process) Write(b []byte) { + n, err := hp.process_stdin.Write(b) + if err != nil { + fmt.Printf("Write: %s\n", err) + } else { + fmt.Printf("Wrote: %d bytes\n", n) + } +} + +func (hp *Process) Start() os.Error { + var name = hp.Path //os.Args[1] //"/usr/b" + var argv = hp.Argv //os.Args[1:] //[]string{ "bc" } + //var chroot = false + //var dir = "/var/empty" + var files [3][2]*os.File + var err os.Error + + for i, _ := range files { + files[i][0], files[i][1], err = os.Pipe() + if err != nil { + return err + } + } + + var env []string + for k, v := range hp.Envp { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + + var attr = &os.ProcAttr{ + //Dir: dir, + Env: env, //os.Environ(), + Files: []*os.File{ files[0][0], files[1][1], files[2][1]}, + } + + //var foo, _ = json.Marshal(attr) + //fmt.Printf("%s\n", foo) + + hp.process, err = os.StartProcess(name, argv, attr) + if err != nil { + return err + } + + hp.process_stdin = files[0][1] + hp.process_stdout = files[1][0] + hp.process_stderr = files[2][0] + + for _, file := range attr.Files { + file.Close() + } + + go reader(hp.process_stdout) + go reader(hp.process_stderr) + return nil +} + +func reader(file *os.File) { + var b []byte = make([]byte, 1024) + var err os.Error = nil + for err == nil { + var n int + n, err = file.Read(b) + fmt.Printf("data: %d, %s\n", n, b) + } +} + +func gensym() string { + f, _ := os.Open("/dev/urandom") + b := make([]byte, 16) + f.Read(b) + f.Close() + uuid := fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) + return uuid +} + -- cgit v1.2.3 From b172e0b1314d1eaec8f72710e6b44ef3acdd859a Mon Sep 17 00:00:00 2001 From: tv Date: Thu, 15 Sep 2011 00:14:41 +0200 Subject: //hyper/sink: initial commit --- hyper/sink/index.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 hyper/sink/index.js (limited to 'hyper') diff --git a/hyper/sink/index.js b/hyper/sink/index.js new file mode 100644 index 00000000..b556b88d --- /dev/null +++ b/hyper/sink/index.js @@ -0,0 +1,13 @@ +require('http').createServer(function (req, res) { + + req.on('data', function (data) { + require('util').puts(data); + }); + + req.on('end', function () { + res.writeHead(200, {'Content-Type': 'text/plain', 'Content-Length': 0}); + res.end(); + }); +}).listen(1337, '127.0.0.1', function () { + console.log('Running HyperSink at http://127.0.0.1:1337/'); +}); -- cgit v1.2.3 From 668ca493744636a5fe090d431f9d36e97be7907b Mon Sep 17 00:00:00 2001 From: tv Date: Thu, 15 Sep 2011 00:15:36 +0200 Subject: //hyper/process: route POST /proc/{id} to stdin --- hyper/process/main.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) (limited to 'hyper') diff --git a/hyper/process/main.go b/hyper/process/main.go index ebeeb6d6..5420f681 100644 --- a/hyper/process/main.go +++ b/hyper/process/main.go @@ -6,6 +6,7 @@ import "http" import "gorilla.googlecode.com/hg/gorilla/mux" import "os" import "fmt" +import "bytes" import "hyper/process" @@ -47,13 +48,29 @@ func RetrieveProcess(res http.ResponseWriter, req *http.Request) { } } +func FeedProcess(res http.ResponseWriter, req *http.Request) { + if p := proc[mux.Vars(req)["id"]]; p != nil { + body := make([]byte, 4096) + if _, err := req.Body.Read(body); err == nil { + body = bytes.TrimRight(body, string([]byte{0})) + p.Write(body) + //if err := p.Write(body); err == nil { + RespondJSON(res, true) + //} + } + } else { + res.WriteHeader(http.StatusNotFound) + } +} + func main() { // Gorilla mux.HandleFunc("/proc", CreateProcessHandler).Methods("POST") mux.HandleFunc("/proc/{id}", RetrieveProcess).Methods("GET") + mux.HandleFunc("/proc/{id}", FeedProcess).Methods("POST") - err := http.ListenAndServe(":8888", mux.DefaultRouter) + err := http.ListenAndServe("0.0.0.0:8888", mux.DefaultRouter) if err != nil { log.Fatal("ListenAndServe: ", err.String()) } -- cgit v1.2.3 From 0db3ab13b036796dc9cd5cf045635484c34b9ae0 Mon Sep 17 00:00:00 2001 From: tv Date: Thu, 15 Sep 2011 00:16:48 +0200 Subject: //hyper/process: connect outgoing pipes --- hyper/process/src/hyper/process/process.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'hyper') diff --git a/hyper/process/src/hyper/process/process.go b/hyper/process/src/hyper/process/process.go index a52197f0..18cf55fb 100644 --- a/hyper/process/src/hyper/process/process.go +++ b/hyper/process/src/hyper/process/process.go @@ -18,6 +18,7 @@ type Process struct { process_stdout *os.File process_stderr *os.File id string + client http.Client } func (p *Process) Id() string { @@ -99,18 +100,24 @@ func (hp *Process) Start() os.Error { file.Close() } - go reader(hp.process_stdout) - go reader(hp.process_stderr) + go hp.reader(hp.process_stdout, hp.Stdout) + go hp.reader(hp.process_stderr, hp.Stderr) return nil } -func reader(file *os.File) { +func (p *Process) reader(file *os.File, url string) { var b []byte = make([]byte, 1024) var err os.Error = nil for err == nil { var n int n, err = file.Read(b) fmt.Printf("data: %d, %s\n", n, b) + + res, err := p.client.Post(url, "application/octet-stream", bytes.NewBuffer(b)) + res = res + if err != nil { + fmt.Printf("EE: %s: %s\n", url, err) + } } } -- cgit v1.2.3 From 90be8b346c43da2b36c0b4872fcf1190cfaeaa85 Mon Sep 17 00:00:00 2001 From: tv Date: Thu, 15 Sep 2011 00:29:46 +0200 Subject: //hyper README: initial commit --- hyper/README.md | 15 +++++++++++++++ hyper/process/test/bc.json | 11 +++++++++++ 2 files changed, 26 insertions(+) create mode 100644 hyper/README.md create mode 100644 hyper/process/test/bc.json (limited to 'hyper') diff --git a/hyper/README.md b/hyper/README.md new file mode 100644 index 00000000..07fa1de5 --- /dev/null +++ b/hyper/README.md @@ -0,0 +1,15 @@ +# Overview + +## start conductor on port 8888 and sink on 1337 + + //hyper/process/main + //bin/node //hyper/sink + +## create bc process and retrieve it's process id (AKA {path}) + + url=http://localhost:8888 + curl -fvsS --data-binary @//hyper/process/test/bc.json $url/proc + +## send data for calculation + + echo 9000+2^42 | curl -fvsS --data-binary @- $url/{path} diff --git a/hyper/process/test/bc.json b/hyper/process/test/bc.json new file mode 100644 index 00000000..5b3b0721 --- /dev/null +++ b/hyper/process/test/bc.json @@ -0,0 +1,11 @@ +{ + "path": "/usr/bin/bc", + "argv": [ + "bc" + ], + "envp": { + "was": "geht" + }, + "stdout": "http://127.0.0.1:1337/", + "stderr": "http://127.0.0.1:1337/" +} -- cgit v1.2.3