Browse Source

Add timeouts to all registry requests

Alfonso Acosta 1 month ago
parent
commit
d1d3147da7
3 changed files with 114 additions and 25 deletions
  1. 52
    20
      registry/cache/repocachemanager.go
  2. 59
    0
      registry/cache/repocachemanager_test.go
  3. 3
    5
      registry/cache/warming.go

+ 52
- 20
registry/cache/repocachemanager.go View File

@@ -24,25 +24,35 @@ type imageToUpdate struct {
24 24
 
25 25
 // repoCacheManager handles cache operations for a container image repository
26 26
 type repoCacheManager struct {
27
-	now         time.Time
28
-	repoID      image.Name
29
-	burst       int
30
-	trace       bool
31
-	logger      log.Logger
32
-	cacheClient Client
27
+	now           time.Time
28
+	repoID        image.Name
29
+	client        registry.Client
30
+	clientTimeout time.Duration
31
+	burst         int
32
+	trace         bool
33
+	logger        log.Logger
34
+	cacheClient   Client
33 35
 	sync.Mutex
34 36
 }
35 37
 
36
-func newRepoCacheManager(now time.Time, repoId image.Name, burst int, trace bool, logger log.Logger,
37
-	cacheClient Client) *repoCacheManager {
38
-	return &repoCacheManager{
39
-		now:         now,
40
-		repoID:      repoId,
41
-		burst:       burst,
42
-		trace:       trace,
43
-		logger:      logger,
44
-		cacheClient: cacheClient,
38
+func newRepoCacheManager(now time.Time,
39
+	repoID image.Name, clientFactory registry.ClientFactory, creds registry.Credentials, repoClientTimeout time.Duration,
40
+	burst int, trace bool, logger log.Logger, cacheClient Client) (*repoCacheManager, error) {
41
+	client, err := clientFactory.ClientFor(repoID.CanonicalName(), creds)
42
+	if err != nil {
43
+		return nil, err
44
+	}
45
+	manager := &repoCacheManager{
46
+		now:           now,
47
+		repoID:        repoID,
48
+		client:        client,
49
+		clientTimeout: repoClientTimeout,
50
+		burst:         burst,
51
+		trace:         trace,
52
+		logger:        logger,
53
+		cacheClient:   cacheClient,
45 54
 	}
55
+	return manager, nil
46 56
 }
47 57
 
48 58
 // fetchRepository fetches the repository from the cache
@@ -59,6 +69,17 @@ func (c *repoCacheManager) fetchRepository() (ImageRepository, error) {
59 69
 	return result, nil
60 70
 }
61 71
 
72
+// getTags gets the tags from the repository
73
+func (c *repoCacheManager) getTags(ctx context.Context) ([]string, error) {
74
+	ctx, cancel := context.WithTimeout(ctx, c.clientTimeout)
75
+	defer cancel()
76
+	tags, err := c.client.Tags(ctx)
77
+	if ctx.Err() == context.DeadlineExceeded {
78
+		return nil, c.clientTimeoutError()
79
+	}
80
+	return tags, err
81
+}
82
+
62 83
 // storeRepository stores the repository from the cache
63 84
 func (c *repoCacheManager) storeRepository(repo ImageRepository) error {
64 85
 	repoKey := NewRepositoryKey(c.repoID.CanonicalName())
@@ -154,7 +175,7 @@ func (c *repoCacheManager) fetchImages(tags []string) (fetchImagesResult, error)
154 175
 // updateImages, refreshes the cache entries for the images passed. It may not succeed for all images.
155 176
 // It returns the values stored in cache, the number of images it succeeded for and the number
156 177
 // of images whose manifest wasn't found in the registry.
157
-func (c *repoCacheManager) updateImages(ctx context.Context, registryClient registry.Client, images []imageToUpdate) (map[string]image.Info, int, int) {
178
+func (c *repoCacheManager) updateImages(ctx context.Context, images []imageToUpdate) (map[string]image.Info, int, int) {
158 179
 	// The upper bound for concurrent fetches against a single host is
159 180
 	// w.Burst, so limit the number of fetching goroutines to that.
160 181
 	fetchers := make(chan struct{}, c.burst)
@@ -179,9 +200,11 @@ updates:
179 200
 		awaitFetchers.Add(1)
180 201
 		go func() {
181 202
 			defer func() { awaitFetchers.Done(); <-fetchers }()
182
-			entry, err := c.updateImage(ctxc, registryClient, upCopy)
203
+			ctxcc, cancel := context.WithTimeout(ctxc, c.clientTimeout)
204
+			defer cancel()
205
+			entry, err := c.updateImage(ctxcc, upCopy)
183 206
 			if err != nil {
184
-				if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
207
+				if err, ok := errors.Cause(err).(net.Error); (ok && err.Timeout()) || ctxcc.Err() == context.DeadlineExceeded {
185 208
 					// This was due to a context timeout, don't bother logging
186 209
 					return
187 210
 				}
@@ -216,16 +239,21 @@ updates:
216 239
 	return result, successCount, manifestUnknownCount
217 240
 }
218 241
 
219
-func (c *repoCacheManager) updateImage(ctx context.Context, registryClient registry.Client, update imageToUpdate) (registry.ImageEntry, error) {
242
+func (c *repoCacheManager) updateImage(ctx context.Context, update imageToUpdate) (registry.ImageEntry, error) {
220 243
 	imageID := update.ref
221 244
 
222 245
 	if c.trace {
223 246
 		c.logger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String())
224 247
 	}
225 248
 
249
+	ctx, cancel := context.WithTimeout(ctx, c.clientTimeout)
250
+	defer cancel()
226 251
 	// Get the image from the remote
227
-	entry, err := registryClient.Manifest(ctx, imageID.Tag)
252
+	entry, err := c.client.Manifest(ctx, imageID.Tag)
228 253
 	if err != nil {
254
+		if ctx.Err() == context.DeadlineExceeded {
255
+			return registry.ImageEntry{}, c.clientTimeoutError()
256
+		}
229 257
 		return registry.ImageEntry{}, err
230 258
 	}
231 259
 
@@ -266,3 +294,7 @@ func (c *repoCacheManager) updateImage(ctx context.Context, registryClient regis
266 294
 	}
267 295
 	return entry, nil
268 296
 }
297
+
298
+func (r *repoCacheManager) clientTimeoutError() error {
299
+	return fmt.Errorf("client timeout (%s) exceeded", r.clientTimeout)
300
+}

+ 59
- 0
registry/cache/repocachemanager_test.go View File

@@ -0,0 +1,59 @@
1
+package cache
2
+
3
+import (
4
+	"context"
5
+	"net/http"
6
+	"net/http/httptest"
7
+	"net/url"
8
+	"os"
9
+	"testing"
10
+	"time"
11
+
12
+	"github.com/go-kit/kit/log"
13
+	"github.com/stretchr/testify/assert"
14
+
15
+	"github.com/weaveworks/flux/image"
16
+	"github.com/weaveworks/flux/registry"
17
+	"github.com/weaveworks/flux/registry/middleware"
18
+)
19
+
20
+func Test_ClientTimeouts(t *testing.T) {
21
+	timeout := 2 * time.Millisecond
22
+	server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
23
+		// make sure we exceed the timeout
24
+		time.Sleep(timeout * 10)
25
+	}))
26
+	defer server.Close()
27
+	url, err := url.Parse(server.URL)
28
+	assert.NoError(t, err)
29
+	logger := log.NewLogfmtLogger(os.Stdout)
30
+	cf := &registry.RemoteClientFactory{
31
+		Logger: log.NewLogfmtLogger(os.Stdout),
32
+		Limiters: &middleware.RateLimiters{
33
+			RPS:    100,
34
+			Burst:  100,
35
+			Logger: logger,
36
+		},
37
+		Trace:         false,
38
+		InsecureHosts: []string{url.Host},
39
+	}
40
+	name := image.Name{
41
+		Domain: url.Host,
42
+		Image:  "foo/bar",
43
+	}
44
+	rcm, err := newRepoCacheManager(
45
+		time.Now(),
46
+		name,
47
+		cf,
48
+		registry.NoCredentials(),
49
+		timeout,
50
+		100,
51
+		false,
52
+		logger,
53
+		nil,
54
+	)
55
+	assert.NoError(t, err)
56
+	_, err = rcm.getTags(context.Background())
57
+	assert.Error(t, err)
58
+	assert.Equal(t, "client timeout (2ms) exceeded", err.Error())
59
+}

+ 3
- 5
registry/cache/warming.go View File

@@ -149,14 +149,12 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
149 149
 func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id image.Name, creds registry.Credentials) {
150 150
 	errorLogger := log.With(logger, "canonical_name", id.CanonicalName(), "auth", creds)
151 151
 
152
-	client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds)
152
+	cacheManager, err := newRepoCacheManager(now, id, w.clientFactory, creds, time.Minute, w.burst, w.Trace, errorLogger, w.cache)
153 153
 	if err != nil {
154 154
 		errorLogger.Log("err", err.Error())
155 155
 		return
156 156
 	}
157 157
 
158
-	cacheManager := newRepoCacheManager(now, id, w.burst, w.Trace, errorLogger, w.cache)
159
-
160 158
 	// This is what we're going to write back to the cache
161 159
 	var repo ImageRepository
162 160
 	repo, err = cacheManager.fetchRepository()
@@ -176,7 +174,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
176 174
 		}
177 175
 	}()
178 176
 
179
-	tags, err := client.Tags(ctx)
177
+	tags, err := cacheManager.getTags(ctx)
180 178
 	if err != nil {
181 179
 		if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
182 180
 			errorLogger.Log("err", errors.Wrap(err, "requesting tags"))
@@ -201,7 +199,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
201 199
 			"to_update", len(fetchResult.imagesToUpdate),
202 200
 			"of_which_refresh", fetchResult.imagesToUpdateRefreshCount, "of_which_missing", fetchResult.imagesToUpdateMissingCount)
203 201
 		var images map[string]image.Info
204
-		images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, client, fetchResult.imagesToUpdate)
202
+		images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, fetchResult.imagesToUpdate)
205 203
 		for k, v := range images {
206 204
 			newImages[k] = v
207 205
 		}

Loading…
Cancel
Save