Skip to content
Snippets Groups Projects
Commit 4d058967 authored by Alex Ellis's avatar Alex Ellis
Browse files

Watchdog refurbishments


- Watchdog - allow new methods with and without body.
- Enforce hard-timeout via exec_timeout variable.
- Correct bug in timeouts for read/write of HTTP.
- Documentation for new verbs and hard timeout.

Signed-off-by: default avatarAlex Ellis <alexellis2@gmail.com>
parent 1edee097
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,7 @@ Watchdog
The FaaS watchdog is designed to marshal a HTTP request between your public HTTP URL and a individual function.
Every FaaS function embeds this binary and uses it as its entrypoint.
Every FaaS function should embed this binary and uses it as its entrypoint. It is in effect a tiny web-server or shim that will fork your desired process for every HTTP request.
Creating a function:
......@@ -54,16 +54,67 @@ A number of environmental overrides can be added for additional flexibility and
| Option | Usage |
|------------------------|--------------|
| `fprocess` | The process to invoke for each function call. This must be a UNIX binary and accept input via STDIN and output via STDOUT. |
| `cgi_headers` | HTTP headers from request are made available through environmental variables - `Http_X-Served-By` etc. |
| `cgi_headers` | HTTP headers from request are made available through environmental variables - `Http_X-Served-By` etc. See section: *Handling headers* for more detail. Enabled by default. |
| `marshal_requests` | Instead of re-directing the raw HTTP body into your fprocess, it will first be marshalled into JSON. Use this if you need to work with HTTP headers and do not want to use environmental variables via the `cgi_headers` flag. |
| `content_type` | Force a specific Content-Type response for all responses. |
| `write_timeout` | HTTP timeout for writing a response body from your function |
| `read_timeout` | HTTP timeout for reading the payload from the client caller |
| `content_type` | Force a specific Content-Type response for all responses. |
| `write_timeout` | HTTP timeout for writing a response body from your function (in seconds) |
| `read_timeout` | HTTP timeout for reading the payload from the client caller (in seconds) |
| `suppress_lock` | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |
| `exec_timeout` | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. |
## Advanced / tuning
**Handling headers**
You can get hold of the HTTP headers by enabling the `cgi_headers` environmental variable.
Here's an example of a POST request with an additional header and a query-string.
```
$ cgi_headers=true fprocess=env ./watchdog &
2017/06/23 17:02:58 Writing lock-file to: /tmp/.lock
$ curl "localhost:8080?q=serverless&page=1" -X POST -H X-Forwarded-By:http://my.vpn.com
```
This is what you'd see if you had set your `fprocess` to `env` on a Linux system:
```
Http_User-Agent=curl/7.43.0
Http_Accept=*/*
Http_X-Forwarded-By=http://my.vpn.com
Http_Method=POST
Http_Query=q=serverless&page=1
```
You can also use the `GET` verb:
```
$ curl "localhost:8080?action=quote&qty=1&productId=105"
```
The output from the watchdog would be:
```
Http_User-Agent=curl/7.43.0
Http_Accept=*/*
Http_Method=GET
Http_Query=action=quote&qty=1&productId=105
```
You can now use HTTP state from within your application to make decisions.
**HTTP methods**
The HTTP methods supported for the watchdog are:
With a body:
* POST, PUT, DELETE, UPDATE
Without a body:
* GET
**Content-Type of request/response**
By default the watchdog will match the response of your function to the "Content-Type" of the client.
......
......@@ -25,6 +25,32 @@ func (e EnvBucket) Getenv(key string) string {
func (e EnvBucket) Setenv(key string, value string) {
e.Items[key] = value
}
func TestRead_CgiHeaders_OverideFalse(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
defaults.Setenv("cgi_headers", "false")
config := readConfig.Read(defaults)
if config.cgiHeaders != false {
t.Logf("cgiHeaders should have been false (via env)")
t.Fail()
}
}
func TestRead_CgiHeaders_DefaultIsTrueConfig(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
config := readConfig.Read(defaults)
if config.cgiHeaders != true {
t.Logf("cgiHeaders should have been true (unspecified)")
t.Fail()
}
}
func TestRead_WriteDebug_DefaultIsTrueConfig(t *testing.T) {
defaults := NewEnvBucket()
readConfig := ReadConfig{}
......@@ -135,3 +161,17 @@ func TestRead_ReadAndWriteTimeoutConfig(t *testing.T) {
t.Fail()
}
}
func TestRead_ExecTimeoutConfig(t *testing.T) {
defaults := NewEnvBucket()
defaults.Setenv("exec_timeout", "3")
readConfig := ReadConfig{}
config := readConfig.Read(defaults)
want := time.Duration(3) * time.Second
if (config.execTimeout) != want {
t.Logf("readTimeout incorrect, got: %d - want: %s\n", config.execTimeout, want)
t.Fail()
}
}
......@@ -23,8 +23,11 @@ func buildFunctionInput(config *WatchdogConfig, r *http.Request) ([]byte, error)
var requestBytes []byte
var err error
defer r.Body.Close()
requestBytes, _ = ioutil.ReadAll(r.Body)
if r.Body != nil {
defer r.Body.Close()
}
requestBytes, err = ioutil.ReadAll(r.Body)
if config.marshalRequest {
marshalRes, marshalErr := types.MarshalRequest(requestBytes, &r.Header)
err = marshalErr
......@@ -41,23 +44,25 @@ func debugHeaders(source *http.Header, direction string) {
}
}
func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request) {
type requestInfo struct {
headerWritten bool
}
func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request, method string, hasBody bool) {
startTime := time.Now()
parts := strings.Split(config.faasProcess, " ")
ri := &requestInfo{}
if config.debugHeaders {
debugHeaders(&r.Header, "in")
}
targetCmd := exec.Command(parts[0], parts[1:]...)
if config.cgiHeaders {
envs := os.Environ()
for k, v := range r.Header {
kv := fmt.Sprintf("Http_%s=%s", k, v[0])
envs = append(envs, kv)
}
envs := getAdditionalEnvs(config, r, method)
if len(envs) > 0 {
targetCmd.Env = envs
}
......@@ -65,24 +70,59 @@ func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request)
var out []byte
var err error
var res []byte
var requestBody []byte
var wg sync.WaitGroup
wg.Add(2)
res, buildInputErr := buildFunctionInput(config, r)
if buildInputErr != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(buildInputErr.Error()))
return
wgCount := 2
if hasBody == false {
wgCount = 1
}
// Write to pipe in separate go-routine to prevent blocking
go func() {
defer wg.Done()
writer.Write(res)
writer.Close()
}()
if hasBody {
var buildInputErr error
requestBody, buildInputErr = buildFunctionInput(config, r)
if buildInputErr != nil {
ri.headerWritten = true
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(buildInputErr.Error()))
return
}
}
wg.Add(wgCount)
var timer *time.Timer
if config.execTimeout > 0*time.Second {
timer = time.NewTimer(config.execTimeout)
go func() {
<-timer.C
log.Printf("Killing process: %s\n", config.faasProcess)
if targetCmd != nil && targetCmd.Process != nil {
ri.headerWritten = true
w.WriteHeader(http.StatusRequestTimeout)
w.Write([]byte("Killed process.\n"))
val := targetCmd.Process.Kill()
if val != nil {
log.Printf("Killed process: %s - error %s\n", config.faasProcess, val.Error())
}
}
}()
}
// Only write body if this is appropriate for the method.
if hasBody {
// Write to pipe in separate go-routine to prevent blocking
go func() {
defer wg.Done()
writer.Write(requestBody)
writer.Close()
}()
}
go func() {
defer wg.Done()
......@@ -90,16 +130,25 @@ func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request)
}()
wg.Wait()
if timer != nil {
timer.Stop()
}
if err != nil {
if config.writeDebug == true {
log.Println(targetCmd, err)
log.Printf("Success=%t, Error=%s\n", targetCmd.ProcessState.Success(), err.Error())
log.Printf("Out=%s\n", out)
}
if ri.headerWritten == false {
w.WriteHeader(http.StatusInternalServerError)
response := bytes.NewBufferString(err.Error())
w.Write(response.Bytes())
ri.headerWritten = true
}
w.WriteHeader(500)
response := bytes.NewBufferString(err.Error())
w.Write(response.Bytes())
return
}
if config.writeDebug == true {
os.Stdout.Write(out)
}
......@@ -115,11 +164,13 @@ func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request)
}
}
execTime := time.Since(startTime).Seconds()
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", execTime))
w.WriteHeader(200)
w.Write(out)
if ri.headerWritten == false {
execTime := time.Since(startTime).Seconds()
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", execTime))
ri.headerWritten = true
w.WriteHeader(200)
w.Write(out)
}
if config.debugHeaders {
header := w.Header()
......@@ -127,12 +178,42 @@ func pipeRequest(config *WatchdogConfig, w http.ResponseWriter, r *http.Request)
}
}
func getAdditionalEnvs(config *WatchdogConfig, r *http.Request, method string) []string {
var envs []string
if config.cgiHeaders {
envs = os.Environ()
for k, v := range r.Header {
kv := fmt.Sprintf("Http_%s=%s", k, v[0])
envs = append(envs, kv)
}
envs = append(envs, fmt.Sprintf("Http_Method=%s", method))
if len(r.URL.RawQuery) > 0 {
envs = append(envs, fmt.Sprintf("Http_Query=%s", r.URL.RawQuery))
}
}
return envs
}
func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
pipeRequest(config, w, r)
} else {
switch r.Method {
case
"POST",
"PUT",
"DELETE",
"UPDATE":
pipeRequest(config, w, r, r.Method, true)
break
case
"GET":
pipeRequest(config, w, r, r.Method, false)
break
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
}
......@@ -147,14 +228,14 @@ func main() {
return
}
readTimeout := time.Duration(config.readTimeout) * time.Second
writeTimeout := time.Duration(config.writeTimeout) * time.Second
readTimeout := config.readTimeout
writeTimeout := config.writeTimeout
s := &http.Server{
Addr: ":8080",
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
MaxHeaderBytes: 1 << 20,
MaxHeaderBytes: 1 << 20, // Max header of 1MB
}
http.HandleFunc("/", makeRequestHandler(&config))
......@@ -164,9 +245,8 @@ func main() {
log.Printf("Writing lock-file to: %s\n", path)
writeErr := ioutil.WriteFile(path, []byte{}, 0660)
if writeErr != nil {
log.Panicf("Cannot write %s. Error: %s\n", path, writeErr.Error())
log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error())
}
}
log.Fatal(s.ListenAndServe())
}
......@@ -14,6 +14,10 @@ type HasEnv interface {
type ReadConfig struct {
}
func isBoolValueSet(val string) bool {
return len(val) > 0
}
func parseBoolValue(val string) bool {
if val == "true" {
return true
......@@ -37,6 +41,7 @@ func parseIntValue(val string) int {
func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig {
cfg := WatchdogConfig{
writeDebug: true,
cgiHeaders: true,
}
cfg.faasProcess = hasEnv.Getenv("fprocess")
......@@ -44,6 +49,8 @@ func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig {
readTimeout := parseIntValue(hasEnv.Getenv("read_timeout"))
writeTimeout := parseIntValue(hasEnv.Getenv("write_timeout"))
cfg.execTimeout = time.Duration(parseIntValue(hasEnv.Getenv("exec_timeout"))) * time.Second
if readTimeout == 0 {
readTimeout = 5
}
......@@ -55,11 +62,15 @@ func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig {
cfg.readTimeout = time.Duration(readTimeout) * time.Second
cfg.writeTimeout = time.Duration(writeTimeout) * time.Second
if len(hasEnv.Getenv("write_debug")) > 0 {
cfg.writeDebug = parseBoolValue(hasEnv.Getenv("write_debug"))
writeDebugEnv := hasEnv.Getenv("write_debug")
if isBoolValueSet(writeDebugEnv) {
cfg.writeDebug = parseBoolValue(writeDebugEnv)
}
cfg.cgiHeaders = parseBoolValue(hasEnv.Getenv("cgi_headers"))
cgiHeadersEnv := hasEnv.Getenv("cgi_headers")
if isBoolValueSet(cgiHeadersEnv) {
cfg.cgiHeaders = parseBoolValue(cgiHeadersEnv)
}
cfg.marshalRequest = parseBoolValue(hasEnv.Getenv("marshal_request"))
cfg.debugHeaders = parseBoolValue(hasEnv.Getenv("debug_headers"))
......@@ -73,15 +84,23 @@ func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig {
// WatchdogConfig for the process.
type WatchdogConfig struct {
// HTTP read timeout
readTimeout time.Duration
// HTTP write timeout
writeTimeout time.Duration
// faasProcess is the process to exec
faasProcess string
// duration until the faasProcess will be killed
execTimeout time.Duration
// writeDebug write console stdout statements to the container
writeDebug bool
// marshal header and body via JSON
marshalRequest bool
// cgiHeaders will make environmental variables available with all the HTTP headers.
......
......@@ -10,6 +10,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
)
func TestHandler_make(t *testing.T) {
......@@ -120,35 +121,86 @@ func TestHandler_HasXDurationSecondsHeader(t *testing.T) {
}
}
func TestHandler_StatusOKAllowed_ForPOST(t *testing.T) {
func TestHandler_RequestTimeoutFailsForExceededDuration(t *testing.T) {
rr := httptest.NewRecorder()
body := "hello"
req, err := http.NewRequest("POST", "/", bytes.NewBufferString(body))
verbs := []string{"POST"}
for _, verb := range verbs {
body := "hello"
req, err := http.NewRequest(verb, "/", bytes.NewBufferString(body))
if err != nil {
t.Fatal(err)
}
config := WatchdogConfig{
faasProcess: "sleep 2",
execTimeout: time.Duration(100) * time.Millisecond,
}
handler := makeRequestHandler(&config)
handler(rr, req)
required := http.StatusRequestTimeout
if status := rr.Code; status != required {
t.Errorf("handler returned wrong status code for verb [%s]: got %v, but wanted %v",
verb, status, required)
}
}
}
func TestHandler_StatusOKAllowed_ForWriteableVerbs(t *testing.T) {
rr := httptest.NewRecorder()
verbs := []string{"POST", "PUT", "UPDATE", "DELETE"}
for _, verb := range verbs {
body := "hello"
req, err := http.NewRequest(verb, "/", bytes.NewBufferString(body))
if err != nil {
t.Fatal(err)
}
config := WatchdogConfig{
faasProcess: "cat",
}
handler := makeRequestHandler(&config)
handler(rr, req)
required := http.StatusOK
if status := rr.Code; status != required {
t.Errorf("handler returned wrong status code for verb [%s]: got %v, but wanted %v",
verb, status, required)
}
buf, _ := ioutil.ReadAll(rr.Body)
val := string(buf)
if val != body {
t.Errorf("Exec of cat did not return input value, %s", val)
}
}
}
func TestHandler_StatusMethodNotAllowed_ForUnknown(t *testing.T) {
rr := httptest.NewRecorder()
req, err := http.NewRequest("UNKNOWN", "/", nil)
if err != nil {
t.Fatal(err)
}
config := WatchdogConfig{
faasProcess: "cat",
}
config := WatchdogConfig{}
handler := makeRequestHandler(&config)
handler(rr, req)
required := http.StatusOK
required := http.StatusMethodNotAllowed
if status := rr.Code; status != required {
t.Errorf("handler returned wrong status code: got %v, but wanted %v",
status, required)
}
buf, _ := ioutil.ReadAll(rr.Body)
val := string(buf)
if val != body {
t.Errorf("Exec of cat did not return input value, %s", val)
}
}
func TestHandler_StatusMethodNotAllowed_ForGet(t *testing.T) {
func TestHandler_StatusOKForGETAndNoBody(t *testing.T) {
rr := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/", nil)
......@@ -156,11 +208,15 @@ func TestHandler_StatusMethodNotAllowed_ForGet(t *testing.T) {
t.Fatal(err)
}
config := WatchdogConfig{}
config := WatchdogConfig{
// writeDebug: true,
faasProcess: "date",
}
handler := makeRequestHandler(&config)
handler(rr, req)
required := http.StatusMethodNotAllowed
required := http.StatusOK
if status := rr.Code; status != required {
t.Errorf("handler returned wrong status code: got %v, but wanted %v",
status, required)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment