Browse Source

Require contexts in remote client methods

Replace the `context.TODO()`s given to with a context passed in.

Also: the values returned by the docker distribution registry client
will be one of the types in
github.com/docker/distribution/manifest/{schema1,schema2,manifestlist},
so instead of dispatching on the media type and doing the
deserialisation ourselves, just dispatch on the value's type.
Michael Bridgen 2 years ago
parent
commit
1752710cb1
5 changed files with 62 additions and 75 deletions
  1. 17
    6
      registry/cache/warming.go
  2. 35
    49
      registry/client.go
  3. 4
    2
      registry/mock.go
  4. 6
    9
      registry/monitoring.go
  5. 0
    9
      registry/registry.go

+ 17
- 6
registry/cache/warming.go View File

@@ -48,6 +48,11 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
48 48
 	imageCreds := imagesToFetchFunc()
49 49
 	backlog := imageCredsToBacklog(imageCreds)
50 50
 
51
+	// We have some fine control over how long to spend on each fetch
52
+	// operation, since they are given a `context`. For now though,
53
+	// just rattle through them one by one, however long they take.
54
+	ctx := context.Background()
55
+
51 56
 	// This loop acts keeps a kind of priority queue, whereby image
52 57
 	// names coming in on the `Priority` channel are looked up first.
53 58
 	// If there are none, images used in the cluster are refreshed;
@@ -65,7 +70,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
65 70
 			// image has to have been running the last time we
66 71
 			// requested the credentials.
67 72
 			if creds, ok := imageCreds[name]; ok {
68
-				w.warm(name, creds)
73
+				w.warm(ctx, name, creds)
69 74
 			} else {
70 75
 				w.Logger.Log("priority", name.String(), "err", "no creds available")
71 76
 			}
@@ -76,7 +81,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
76 81
 		if len(backlog) > 0 {
77 82
 			im := backlog[0]
78 83
 			backlog = backlog[1:]
79
-			w.warm(im.Name, im.Credentials)
84
+			w.warm(ctx, im.Name, im.Credentials)
80 85
 		} else {
81 86
 			select {
82 87
 			case <-refresh:
@@ -98,7 +103,7 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
98 103
 	return backlog
99 104
 }
100 105
 
101
-func (w *Warmer) warm(id image.Name, creds registry.Credentials) {
106
+func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credentials) {
102 107
 	client, err := w.ClientFactory.ClientFor(id.CanonicalName(), creds)
103 108
 	if err != nil {
104 109
 		w.Logger.Log("err", err.Error())
@@ -136,7 +141,7 @@ func (w *Warmer) warm(id image.Name, creds registry.Credentials) {
136 141
 		}
137 142
 	}()
138 143
 
139
-	tags, err := client.Tags()
144
+	tags, err := client.Tags(ctx)
140 145
 	if err != nil {
141 146
 		if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
142 147
 			w.Logger.Log("err", errors.Wrap(err, "requesting tags"))
@@ -182,13 +187,19 @@ func (w *Warmer) warm(id image.Name, creds registry.Credentials) {
182 187
 		// w.Burst, so limit the number of fetching goroutines to that.
183 188
 		fetchers := make(chan struct{}, w.Burst)
184 189
 		awaitFetchers := &sync.WaitGroup{}
190
+	updates:
185 191
 		for _, imID := range toUpdate {
192
+			select {
193
+			case <-ctx.Done():
194
+				break updates
195
+			case fetchers <- struct{}{}:
196
+			}
197
+
186 198
 			awaitFetchers.Add(1)
187
-			fetchers <- struct{}{}
188 199
 			go func(imageID image.Ref) {
189 200
 				defer func() { awaitFetchers.Done(); <-fetchers }()
190 201
 				// Get the image from the remote
191
-				img, err := client.Manifest(imageID.Tag)
202
+				img, err := client.Manifest(ctx, imageID.Tag)
192 203
 				if err != nil {
193 204
 					if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
194 205
 						// This was due to a context timeout, don't bother logging

+ 35
- 49
registry/client.go View File

@@ -5,6 +5,7 @@ import (
5 5
 	"encoding/json"
6 6
 	"errors"
7 7
 	"net/http"
8
+	"reflect"
8 9
 	"time"
9 10
 
10 11
 	"github.com/docker/distribution"
@@ -17,27 +18,35 @@ import (
17 18
 	"github.com/weaveworks/flux/image"
18 19
 )
19 20
 
21
+// Client is a remote registry client for a particular image
22
+// repository (e.g., for quay.io/weaveworks/flux). It is an interface
23
+// so we can wrap it in instrumentation, write fake implementations,
24
+// and so on.
25
+type Client interface {
26
+	Tags(context.Context) ([]string, error)
27
+	Manifest(ctx context.Context, ref string) (image.Info, error)
28
+}
29
+
20 30
 type Remote struct {
21 31
 	transport http.RoundTripper
22 32
 	repo      image.CanonicalName
23 33
 }
24 34
 
25
-// Adapt to docker distribution reference.Named
35
+// Adapt to docker distribution `reference.Named`.
26 36
 type named struct {
27 37
 	image.CanonicalName
28 38
 }
29 39
 
40
+// Name returns the name of the repository. These values are used to
41
+// build API URLs, and (it turns out) are _not_ expected to include a
42
+// domain (e.g., quay.io). Hence, the implementation here just returns
43
+// the path.
30 44
 func (n named) Name() string {
31 45
 	return n.Image
32 46
 }
33 47
 
34
-func (n named) String() string {
35
-	return n.String()
36
-}
37
-
38 48
 // Return the tags for this repository.
39
-func (a *Remote) Tags() ([]string, error) {
40
-	ctx := context.TODO()
49
+func (a *Remote) Tags(ctx context.Context) ([]string, error) {
41 50
 	repository, err := client.NewRepository(named{a.repo}, "https://"+a.repo.Domain, a.transport)
42 51
 	if err != nil {
43 52
 		return nil, err
@@ -47,8 +56,7 @@ func (a *Remote) Tags() ([]string, error) {
47 56
 
48 57
 // Manifest fetches the metadata for an image reference; currently
49 58
 // assumed to be in the same repo as that provided to `NewRemote(...)`
50
-func (a *Remote) Manifest(ref string) (image.Info, error) {
51
-	ctx := context.TODO()
59
+func (a *Remote) Manifest(ctx context.Context, ref string) (image.Info, error) {
52 60
 	repository, err := client.NewRepository(named{a.repo}, "https://"+a.repo.Domain, a.transport)
53 61
 	if err != nil {
54 62
 		return image.Info{}, err
@@ -64,49 +72,27 @@ interpret:
64 72
 		return image.Info{}, err
65 73
 	}
66 74
 
67
-	mt, bytes, err := manifest.Payload()
68
-	if err != nil {
69
-		return image.Info{}, err
70
-	}
71
-
72 75
 	info := image.Info{ID: a.repo.ToRef(ref)}
73 76
 
74
-	// for decoding the v1-compatibility entry in schema1 manifests
75
-	var v1 struct {
76
-		ID      string    `json:"id"`
77
-		Created time.Time `json:"created"`
78
-		OS      string    `json:"os"`
79
-		Arch    string    `json:"architecture"`
80
-	}
81
-
82 77
 	// TODO(michael): can we type switch? Not sure how dependable the
83 78
 	// underlying types are.
84
-	switch mt {
85
-	case schema1.MediaTypeManifest:
86
-		// TODO: can this be fallthrough? Find something to check on...
87
-		var man schema1.Manifest
88
-		if err = json.Unmarshal(bytes, &man); err != nil {
89
-			return image.Info{}, err
90
-		}
91
-		if err = json.Unmarshal([]byte(man.History[0].V1Compatibility), &v1); err != nil {
92
-			return image.Info{}, err
93
-		}
94
-		info.CreatedAt = v1.Created
95
-	case schema1.MediaTypeSignedManifest:
96
-		var man schema1.SignedManifest
97
-		if err = json.Unmarshal(bytes, &man); err != nil {
98
-			return image.Info{}, err
79
+	switch deserialised := manifest.(type) {
80
+	case *schema1.SignedManifest:
81
+		var man schema1.Manifest = deserialised.Manifest
82
+		// for decoding the v1-compatibility entry in schema1 manifests
83
+		var v1 struct {
84
+			ID      string    `json:"id"`
85
+			Created time.Time `json:"created"`
86
+			OS      string    `json:"os"`
87
+			Arch    string    `json:"architecture"`
99 88
 		}
89
+
100 90
 		if err = json.Unmarshal([]byte(man.History[0].V1Compatibility), &v1); err != nil {
101 91
 			return image.Info{}, err
102 92
 		}
103 93
 		info.CreatedAt = v1.Created
104
-	case schema2.MediaTypeManifest:
105
-		var man schema2.Manifest
106
-		if err = json.Unmarshal(bytes, &man); err != nil {
107
-			return image.Info{}, err
108
-		}
109
-
94
+	case *schema2.DeserializedManifest:
95
+		var man schema2.Manifest = deserialised.Manifest
110 96
 		configBytes, err := repository.Blobs(ctx).Get(ctx, man.Config.Digest)
111 97
 		if err != nil {
112 98
 			return image.Info{}, err
@@ -121,12 +107,9 @@ interpret:
121 107
 			return image.Info{}, err
122 108
 		}
123 109
 		info.CreatedAt = config.Created
124
-	case manifestlist.MediaTypeManifestList:
125
-		var list manifestlist.ManifestList
126
-		if err = json.Unmarshal(bytes, &list); err != nil {
127
-			return image.Info{}, err
128
-		}
129
-		// TODO(michael): can we just pick the first one that matches?
110
+	case *manifestlist.DeserializedManifestList:
111
+		var list manifestlist.ManifestList = deserialised.ManifestList
112
+		// TODO(michael): is it valid to just pick the first one that matches?
130 113
 		for _, m := range list.Manifests {
131 114
 			if m.Platform.OS == "linux" && m.Platform.Architecture == "amd64" {
132 115
 				manifest, fetchErr = manifests.Get(ctx, m.Digest)
@@ -134,6 +117,9 @@ interpret:
134 117
 			}
135 118
 		}
136 119
 		return image.Info{}, errors.New("no suitable manifest (linux amd64) in manifestlist")
120
+	default:
121
+		t := reflect.TypeOf(manifest)
122
+		return image.Info{}, errors.New("unknown manifest type: " + t.String())
137 123
 	}
138 124
 	return info, nil
139 125
 }

+ 4
- 2
registry/mock.go View File

@@ -1,6 +1,8 @@
1 1
 package registry
2 2
 
3 3
 import (
4
+	"context"
5
+
4 6
 	"github.com/pkg/errors"
5 7
 
6 8
 	"github.com/weaveworks/flux/image"
@@ -31,11 +33,11 @@ func NewMockClient(manifest ManifestFunc, tags TagsFunc) Client {
31 33
 	}
32 34
 }
33 35
 
34
-func (m *mockDockerClient) Manifest(tag string) (image.Info, error) {
36
+func (m *mockDockerClient) Manifest(ctx context.Context, tag string) (image.Info, error) {
35 37
 	return m.manifest(tag)
36 38
 }
37 39
 
38
-func (m *mockDockerClient) Tags() ([]string, error) {
40
+func (m *mockDockerClient) Tags(context.Context) ([]string, error) {
39 41
 	return m.tags()
40 42
 }
41 43
 

+ 6
- 9
registry/monitoring.go View File

@@ -3,6 +3,7 @@ package registry
3 3
 // Monitoring middlewares for registry interfaces
4 4
 
5 5
 import (
6
+	"context"
6 7
 	"strconv"
7 8
 	"time"
8 9
 
@@ -35,13 +36,11 @@ var (
35 36
 	}, []string{LabelRequestKind, fluxmetrics.LabelSuccess})
36 37
 )
37 38
 
38
-type InstrumentedRegistry Registry
39
-
40 39
 type instrumentedRegistry struct {
41 40
 	next Registry
42 41
 }
43 42
 
44
-func NewInstrumentedRegistry(next Registry) InstrumentedRegistry {
43
+func NewInstrumentedRegistry(next Registry) Registry {
45 44
 	return &instrumentedRegistry{
46 45
 		next: next,
47 46
 	}
@@ -65,8 +64,6 @@ func (m *instrumentedRegistry) GetImage(id image.Ref) (res image.Info, err error
65 64
 	return
66 65
 }
67 66
 
68
-type InstrumentedClient Client
69
-
70 67
 type instrumentedClient struct {
71 68
 	next Client
72 69
 }
@@ -77,9 +74,9 @@ func NewInstrumentedClient(next Client) Client {
77 74
 	}
78 75
 }
79 76
 
80
-func (m *instrumentedClient) Manifest(ref string) (res image.Info, err error) {
77
+func (m *instrumentedClient) Manifest(ctx context.Context, ref string) (res image.Info, err error) {
81 78
 	start := time.Now()
82
-	res, err = m.next.Manifest(ref)
79
+	res, err = m.next.Manifest(ctx, ref)
83 80
 	remoteDuration.With(
84 81
 		LabelRequestKind, RequestKindMetadata,
85 82
 		fluxmetrics.LabelSuccess, strconv.FormatBool(err == nil),
@@ -87,9 +84,9 @@ func (m *instrumentedClient) Manifest(ref string) (res image.Info, err error) {
87 84
 	return
88 85
 }
89 86
 
90
-func (m *instrumentedClient) Tags() (res []string, err error) {
87
+func (m *instrumentedClient) Tags(ctx context.Context) (res []string, err error) {
91 88
 	start := time.Now()
92
-	res, err = m.next.Tags()
89
+	res, err = m.next.Tags(ctx)
93 90
 	remoteDuration.With(
94 91
 		LabelRequestKind, RequestKindTags,
95 92
 		fluxmetrics.LabelSuccess, strconv.FormatBool(err == nil),

+ 0
- 9
registry/registry.go View File

@@ -16,15 +16,6 @@ type Registry interface {
16 16
 	GetImage(image.Ref) (image.Info, error)
17 17
 }
18 18
 
19
-// Client is a remote registry client for a particular image
20
-// repository (e.g., for quay.io/weaveworks/flux). It is an interface
21
-// so we can wrap it in instrumentation, write fake implementations,
22
-// and so on.
23
-type Client interface {
24
-	Tags() ([]string, error)
25
-	Manifest(ref string) (image.Info, error)
26
-}
27
-
28 19
 // ImageCreds is a record of which images need which credentials,
29 20
 // which is supplied to us (probably by interrogating the cluster)
30 21
 type ImageCreds map[image.Name]Credentials

Loading…
Cancel
Save