Browse Source

React to HTTP 429 Too many requests

Image registries tend to have rate limiting, and it is hard to know in
advance what the practical limits are -- and they will often
change. Nonetheless, we have to back off when we get a HTTP 429,
otherwise the registry could e.g., decide to blacklist our IP.

Therefore:

 - decrease the rate limit for a registry domain if we get HTTP 429
   - we only reliably see this in the round-tripper, but that's where
     the rate limiting is anyway
   - this goes through the RateLimiters struct so that we can log it

 - we also want to edge back towards the configured rate when we have
   succeeded
   - but we only know if we succeeded in the calling code, so it has
     to be wired through
Michael Bridgen 1 year ago
parent
commit
e3bc6d0901

+ 9
- 7
cmd/fluxd/main.go View File

@@ -44,10 +44,11 @@ var version = "unversioned"
44 44
 const (
45 45
 	product = "weave-flux"
46 46
 
47
-	// The number of connections chosen for memcache and remote GETs should match for best performance (hence the single hardcoded value)
48
-	// Value chosen through performance tests on sock-shop. I was unable to get higher performance than this.
49
-	defaultRemoteConnections   = 125 // Chosen performance tests on sock-shop. Unable to get higher performance than this.
50
-	defaultMemcacheConnections = 10  // This doesn't need to be high. The user is only requesting one tag/image at a time.
47
+	// This is used as the "burst" value for rate limiting, and
48
+	// therefore also as the limit to the number of concurrent fetches
49
+	// and memcached connections, since these in general can't do any
50
+	// more work than is allowed by the burst amount.
51
+	defaultRemoteConnections = 10
51 52
 
52 53
 	// There are running systems that assume these defaults (by not
53 54
 	// supplying a value for one or both). Don't change them.
@@ -99,7 +100,7 @@ func main() {
99 100
 		memcachedTimeout     = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
100 101
 		memcachedService     = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
101 102
 		registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images")
102
-		registryRPS          = fs.Int("registry-rps", 200, "maximum registry requests per second per host")
103
+		registryRPS          = fs.Float64("registry-rps", 50, "maximum registry requests per second per host")
103 104
 		registryBurst        = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache")
104 105
 		registryTrace        = fs.Bool("registry-trace", false, "output trace of image registry requests to log")
105 106
 		registryInsecure     = fs.StringSlice("registry-insecure-host", []string{}, "use HTTP for this image registry domain (e.g., registry.cluster.local), instead of HTTPS")
@@ -299,8 +300,9 @@ func main() {
299 300
 		// Remote client, for warmer to refresh entries
300 301
 		registryLogger := log.With(logger, "component", "registry")
301 302
 		registryLimits := &registryMiddleware.RateLimiters{
302
-			RPS:   *registryRPS,
303
-			Burst: *registryBurst,
303
+			RPS:    *registryRPS,
304
+			Burst:  *registryBurst,
305
+			Logger: log.With(logger, "component", "ratelimiter"),
304 306
 		}
305 307
 		remoteFactory := &registry.RemoteClientFactory{
306 308
 			Logger:        registryLogger,

+ 4
- 0
registry/cache/warming.go View File

@@ -334,6 +334,10 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
334 334
 			LastUpdate: time.Now(),
335 335
 			Images:     newImages,
336 336
 		}
337
+		// If we got through all that without bumping into `HTTP 429
338
+		// Too Many Requests` (or other problems), we can potentially
339
+		// creep the rate limit up
340
+		w.clientFactory.Succeed(id.CanonicalName())
337 341
 	}
338 342
 
339 343
 	if w.Notify != nil {

+ 1
- 0
registry/client.go View File

@@ -69,6 +69,7 @@ type Client interface {
69 69
 // implementations.
70 70
 type ClientFactory interface {
71 71
 	ClientFor(image.CanonicalName, Credentials) (Client, error)
72
+	Succeed(image.CanonicalName)
72 73
 }
73 74
 
74 75
 type Remote struct {

+ 7
- 0
registry/client_factory.go View File

@@ -108,6 +108,13 @@ func (f *RemoteClientFactory) ClientFor(repo image.CanonicalName, creds Credenti
108 108
 	return NewInstrumentedClient(client), nil
109 109
 }
110 110
 
111
+// Succeed exists merely so that the user of the ClientFactory can
112
+// bump rate limits up if a repo's metadata has successfully been
113
+// fetched.
114
+func (f *RemoteClientFactory) Succeed(repo image.CanonicalName) {
115
+	f.Limiters.Recover(repo.Domain)
116
+}
117
+
111 118
 // store adapts a set of pre-selected creds to be an
112 119
 // auth.CredentialsStore
113 120
 type store struct {

+ 96
- 7
registry/middleware/rate_limiter.go View File

@@ -2,16 +2,93 @@ package middleware
2 2
 
3 3
 import (
4 4
 	"net/http"
5
+	"strconv"
5 6
 	"sync"
6 7
 
8
+	"github.com/go-kit/kit/log"
7 9
 	"github.com/pkg/errors"
8 10
 	"golang.org/x/time/rate"
9 11
 )
10 12
 
13
+const (
14
+	minLimit  = 0.1
15
+	backOffBy = 2.0
16
+	recoverBy = 1.5
17
+)
18
+
19
+// RateLimiters keeps track of per-host rate limiting for an arbitrary
20
+// set of hosts.
21
+
22
+// Use `*RateLimiter.RoundTripper(host)` to obtain a rate limited HTTP
23
+// transport for an operation. The RoundTripper will react to a `HTTP
24
+// 429 Too many requests` response by reducing the limit for that
25
+// host. It will only do so once, so that concurrent requests don't
26
+// *also* reduce the limit.
27
+//
28
+// Call `*RateLimiter.Recover(host)` when an operation has succeeded
29
+// without incident, which will increase the rate limit modestly back
30
+// towards the given ideal.
11 31
 type RateLimiters struct {
12
-	RPS, Burst int
13
-	perHost    map[string]*rate.Limiter
14
-	mu         sync.Mutex
32
+	RPS     float64
33
+	Burst   int
34
+	Logger  log.Logger
35
+	perHost map[string]*rate.Limiter
36
+	mu      sync.Mutex
37
+}
38
+
39
+func (limiters *RateLimiters) clip(limit float64) float64 {
40
+	if limit < minLimit {
41
+		return minLimit
42
+	}
43
+	if limit > limiters.RPS {
44
+		return limiters.RPS
45
+	}
46
+	return limit
47
+}
48
+
49
+// BackOff can be called to explicitly reduce the limit for a
50
+// particular host. Usually this isn't necessary since a RoundTripper
51
+// obtained for a host will respond to `HTTP 429` by doing this for
52
+// you.
53
+func (limiters *RateLimiters) BackOff(host string) {
54
+	limiters.mu.Lock()
55
+	defer limiters.mu.Unlock()
56
+
57
+	var limiter *rate.Limiter
58
+	if limiters.perHost == nil {
59
+		limiters.perHost = map[string]*rate.Limiter{}
60
+	}
61
+	if rl, ok := limiters.perHost[host]; ok {
62
+		limiter = rl
63
+	} else {
64
+		limiter = rate.NewLimiter(rate.Limit(limiters.RPS), limiters.Burst)
65
+		limiters.perHost[host] = limiter
66
+	}
67
+
68
+	oldLimit := float64(limiter.Limit())
69
+	newLimit := limiters.clip(oldLimit / backOffBy)
70
+	if oldLimit != newLimit && limiters.Logger != nil {
71
+		limiters.Logger.Log("info", "reducing rate limit", "host", host, "limit", strconv.FormatFloat(newLimit, 'f', 2, 64))
72
+	}
73
+	limiter.SetLimit(rate.Limit(newLimit))
74
+}
75
+
76
+// Recover should be called when a use of a RoundTripper has
77
+// succeeded, to bump the limit back up again.
78
+func (limiters *RateLimiters) Recover(host string) {
79
+	limiters.mu.Lock()
80
+	defer limiters.mu.Unlock()
81
+	if limiters.perHost == nil {
82
+		return
83
+	}
84
+	if limiter, ok := limiters.perHost[host]; ok {
85
+		oldLimit := float64(limiter.Limit())
86
+		newLimit := limiters.clip(oldLimit * recoverBy)
87
+		if newLimit != oldLimit && limiters.Logger != nil {
88
+			limiters.Logger.Log("info", "increasing rate limit", "host", host, "limit", strconv.FormatFloat(newLimit, 'f', 2, 64))
89
+		}
90
+		limiter.SetLimit(rate.Limit(newLimit))
91
+	}
15 92
 }
16 93
 
17 94
 // Limit returns a RoundTripper for a particular host. We expect to do
@@ -27,23 +104,35 @@ func (limiters *RateLimiters) RoundTripper(rt http.RoundTripper, host string) ht
27 104
 		rl := rate.NewLimiter(rate.Limit(limiters.RPS), limiters.Burst)
28 105
 		limiters.perHost[host] = rl
29 106
 	}
107
+	var reduceOnce sync.Once
30 108
 	return &RoundTripRateLimiter{
31 109
 		rl: limiters.perHost[host],
32 110
 		tx: rt,
111
+		slowDown: func() {
112
+			reduceOnce.Do(func() { limiters.BackOff(host) })
113
+		},
33 114
 	}
34 115
 }
35 116
 
36 117
 type RoundTripRateLimiter struct {
37
-	rl *rate.Limiter
38
-	tx http.RoundTripper
118
+	rl       *rate.Limiter
119
+	tx       http.RoundTripper
120
+	slowDown func()
39 121
 }
40 122
 
41 123
 func (t *RoundTripRateLimiter) RoundTrip(r *http.Request) (*http.Response, error) {
42 124
 	// Wait errors out if the request cannot be processed within
43
-	// the deadline. This is preemptive, instead of waiting the
125
+	// the deadline. This is pre-emptive, instead of waiting the
44 126
 	// entire duration.
45 127
 	if err := t.rl.Wait(r.Context()); err != nil {
46 128
 		return nil, errors.Wrap(err, "rate limited")
47 129
 	}
48
-	return t.tx.RoundTrip(r)
130
+	resp, err := t.tx.RoundTrip(r)
131
+	if err != nil {
132
+		return nil, err
133
+	}
134
+	if resp.StatusCode == http.StatusTooManyRequests {
135
+		t.slowDown()
136
+	}
137
+	return resp, err
49 138
 }

Loading…
Cancel
Save