Skip to content
Snippets Groups Projects
Commit b019f6ca authored by Sargun Dhillon's avatar Sargun Dhillon Committed by Alex Ellis
Browse files

Add Concurrency Limiter

This enables limiting concurrency. It is a naive approach which
will reject requests as soon as they exceed the maximum number
of in-flight requests.

It is a port of the following PR from the new watchdog code:
https://github.com/openfaas-incubator/of-watchdog/pull/54



Signed-off-by: default avatarSargun Dhillon <sargun@sargun.me>
Signed-off-by: default avatarLucas Roesler <roesler.lucas@gmail.com>
parent 45cf4db4
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,14 @@
revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c"
version = "v1.0.1"
[[projects]]
branch = "master"
digest = "1:cc8e0046e1076991a51c3aa9ab4827b8bf1bf4d7715cd45a95888663c2d851e3"
name = "github.com/openfaas/faas-middleware"
packages = ["concurrency-limiter"]
pruneopts = "UT"
revision = "6a78c3a94beb2a99d6aa443bca21331653840a23"
[[projects]]
digest = "1:ef03fb1dae4d010196652653f00a8002e94c19bcabdc8ca5100a804ffef63a47"
name = "github.com/prometheus/client_golang"
......@@ -70,6 +78,7 @@
analyzer-name = "dep"
analyzer-version = 1
input-imports = [
"github.com/openfaas/faas-middleware/concurrency-limiter",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promauto",
"github.com/prometheus/client_golang/prometheus/promhttp",
......
......@@ -102,6 +102,7 @@ The watchdog can be configured through environmental variables. You must always
| `exec_timeout` | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0 |
| `write_debug` | Write all output, error messages, and additional information to the logs. Default is false |
| `combine_output` | True by default - combines stdout/stderr in function response, when set to false `stderr` is written to the container logs and stdout is used for function response |
| `max_inflight` | Limit the maximum number of requests in flight |
## Advanced / tuning
......
......@@ -17,6 +17,8 @@ import (
"sync/atomic"
"time"
limiter "github.com/openfaas/faas-middleware/concurrency-limiter"
"github.com/openfaas/faas/watchdog/types"
)
......@@ -294,8 +296,8 @@ func makeHealthHandler() func(http.ResponseWriter, *http.Request) {
}
}
func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
func makeRequestHandler(config *WatchdogConfig) http.HandlerFunc {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case
http.MethodPost,
......@@ -309,5 +311,6 @@ func makeRequestHandler(config *WatchdogConfig) func(http.ResponseWriter, *http.
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
})
return limiter.NewConcurrencyLimiter(handler, config.maxInflight).ServeHTTP
}
......@@ -59,7 +59,7 @@ func (m *MetricsServer) Serve(cancel chan bool) {
// InstrumentHandler returns a handler which records HTTP requests
// as they are made
func InstrumentHandler(next http.HandlerFunc, _http Http) http.HandlerFunc {
func InstrumentHandler(next http.Handler, _http Http) http.HandlerFunc {
return promhttp.InstrumentHandlerCounter(_http.RequestsTotal,
promhttp.InstrumentHandlerDuration(_http.RequestDurationHistogram, next))
}
......@@ -92,6 +92,7 @@ func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig {
}
cfg.metricsPort = 8081
cfg.maxInflight = parseIntValue(hasEnv.Getenv("max_inflight"), 0)
return cfg
}
......@@ -137,4 +138,10 @@ type WatchdogConfig struct {
// metricsPort is the HTTP port to serve metrics on
metricsPort int
// maxInflight limits the number of simultaneous
// requests that the watchdog allows concurrently.
// Any request which exceeds this limit will
// have an immediate response of 429.
maxInflight int
}
MIT License
Copyright (c) 2019 OpenFaaS
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
package limiter
import (
"fmt"
"net/http"
"sync/atomic"
)
type ConcurrencyLimiter struct {
backendHTTPHandler http.Handler
/*
We keep two counters here in order to make it so that we can know when a request has gone to completed
in the tests. We could wrap these up in a condvar, so there's no need to spinlock, but that seems overkill
for testing.
This is effectively a very fancy semaphore built for optimistic concurrency only, and with spinlocks. If
you want to add timeouts here / pessimistic concurrency, signaling needs to be added and/or a condvar esque
sorta thing needs to be done to wake up waiters who are waiting post-spin.
Otherwise, there's all sorts of futzing in order to make sure that the concurrency limiter handler
has completed
The math works on overflow:
var x, y uint64
x = (1 << 64 - 1)
y = (1 << 64 - 1)
x++
fmt.Println(x)
fmt.Println(y)
fmt.Println(x - y)
Prints:
0
18446744073709551615
1
*/
requestsStarted uint64
requestsCompleted uint64
maxInflightRequests uint64
}
func (cl *ConcurrencyLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
requestsStarted := atomic.AddUint64(&cl.requestsStarted, 1)
completedRequested := atomic.LoadUint64(&cl.requestsCompleted)
if requestsStarted-completedRequested > cl.maxInflightRequests {
// This is a failure pathway, and we do not want to block on the write to finish
atomic.AddUint64(&cl.requestsCompleted, 1)
w.WriteHeader(http.StatusTooManyRequests)
fmt.Fprintf(w, "Concurrent request limit exceeded. Max concurrent requests: %d\n", cl.maxInflightRequests)
return
}
cl.backendHTTPHandler.ServeHTTP(w, r)
atomic.AddUint64(&cl.requestsCompleted, 1)
}
// NewConcurrencyLimiter creates a handler which limits the active number of active, concurrent
// requests. If the concurrency limit is less than, or equal to 0, then it will just return the handler
// passed to it.
func NewConcurrencyLimiter(handler http.Handler, concurrencyLimit int) http.Handler {
if concurrencyLimit <= 0 {
return handler
}
return &ConcurrencyLimiter{
backendHTTPHandler: handler,
maxInflightRequests: uint64(concurrencyLimit),
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment