Browse Source

Move cache implementation to its own package

This moves the "cache" (looking less like a cache every day) into its
own package, with the memcached implementation a subpackage therein.
Michael Bridgen 2 years ago
parent
commit
6048af5e87

+ 16
- 28
cmd/fluxd/main.go View File

@@ -31,7 +31,8 @@ import (
31 31
 	"github.com/weaveworks/flux/image"
32 32
 	"github.com/weaveworks/flux/job"
33 33
 	"github.com/weaveworks/flux/registry"
34
-	registryMemcache "github.com/weaveworks/flux/registry/cache"
34
+	"github.com/weaveworks/flux/registry/cache"
35
+	registryMemcache "github.com/weaveworks/flux/registry/cache/memcached"
35 36
 	registryMiddleware "github.com/weaveworks/flux/registry/middleware"
36 37
 	"github.com/weaveworks/flux/remote"
37 38
 	"github.com/weaveworks/flux/ssh"
@@ -229,26 +230,13 @@ func main() {
229 230
 	}
230 231
 
231 232
 	// Registry components
232
-	var cache registry.Registry
233
-	var cacheWarmer registry.Warmer
233
+	var cacheRegistry registry.Registry
234
+	var cacheWarmer *cache.Warmer
234 235
 	{
235
-		// Cache
236
-		var memcacheRegistry registryMemcache.Client
236
+		// Cache client, for use by registry and cache warmer
237
+		var cacheClient cache.Client
237 238
 		if *memcachedHostname != "" {
238
-			memcacheRegistry = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
239
-				Host:           *memcachedHostname,
240
-				Service:        *memcachedService,
241
-				Timeout:        *memcachedTimeout,
242
-				UpdateInterval: 1 * time.Minute,
243
-				Logger:         log.With(logger, "component", "memcached"),
244
-				MaxIdleConns:   defaultMemcacheConnections,
245
-			})
246
-			memcacheRegistry = registryMemcache.InstrumentMemcacheClient(memcacheRegistry)
247
-			defer memcacheRegistry.Stop()
248
-		}
249
-		var memcacheWarmer registryMemcache.Client
250
-		if *memcachedHostname != "" {
251
-			memcacheWarmer = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
239
+			cacheClient = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
252 240
 				Host:           *memcachedHostname,
253 241
 				Service:        *memcachedService,
254 242
 				Timeout:        *memcachedTimeout,
@@ -256,16 +244,16 @@ func main() {
256 244
 				Logger:         log.With(logger, "component", "memcached"),
257 245
 				MaxIdleConns:   *registryBurst,
258 246
 			})
259
-			memcacheWarmer = registryMemcache.InstrumentMemcacheClient(memcacheWarmer)
260
-			defer memcacheWarmer.Stop()
247
+			cacheClient = cache.InstrumentClient(cacheClient)
248
+			defer cacheClient.Stop()
261 249
 		}
262 250
 
263
-		cache = &registry.Cache{
264
-			Reader: memcacheRegistry,
251
+		cacheRegistry = &cache.Cache{
252
+			Reader: cacheClient,
265 253
 		}
266
-		cache = registry.NewInstrumentedRegistry(cache)
254
+		cacheRegistry = registry.NewInstrumentedRegistry(cacheRegistry)
267 255
 
268
-		// Remote
256
+		// Remote client, for warmer to refresh entries
269 257
 		registryLogger := log.With(logger, "component", "registry")
270 258
 		registryLimits := &registryMiddleware.RateLimiters{
271 259
 			RPS:   *registryRPS,
@@ -278,11 +266,11 @@ func main() {
278 266
 
279 267
 		// Warmer
280 268
 		warmerLogger := log.With(logger, "component", "warmer")
281
-		cacheWarmer = registry.Warmer{
269
+		cacheWarmer = &cache.Warmer{
282 270
 			Logger:        warmerLogger,
283 271
 			ClientFactory: remoteFactory,
284 272
 			Expiry:        *registryCacheExpiry,
285
-			Cache:         memcacheWarmer,
273
+			Cache:         cacheClient,
286 274
 			Burst:         *registryBurst,
287 275
 		}
288 276
 	}
@@ -439,7 +427,7 @@ func main() {
439 427
 		V:            version,
440 428
 		Cluster:      k8s,
441 429
 		Manifests:    k8sManifests,
442
-		Registry:     cache,
430
+		Registry:     cacheRegistry,
443 431
 		ImageRefresh: make(chan image.Name, 100), // size chosen by fair dice roll
444 432
 		Repo:         repo, Checkout: checkout,
445 433
 		Jobs:           jobs,

+ 78
- 0
registry/cache/cache.go View File

@@ -0,0 +1,78 @@
1
+package cache
2
+
3
+import (
4
+	"strings"
5
+	"time"
6
+
7
+	"github.com/pkg/errors"
8
+
9
+	fluxerr "github.com/weaveworks/flux/errors"
10
+	"github.com/weaveworks/flux/image"
11
+)
12
+
13
+type Reader interface {
14
+	GetKey(k Keyer) ([]byte, time.Time, error)
15
+}
16
+
17
+type Writer interface {
18
+	SetKey(k Keyer, v []byte) error
19
+}
20
+
21
+type Client interface {
22
+	Reader
23
+	Writer
24
+	Stop()
25
+}
26
+
27
+// An interface to provide the key under which to store the data
28
+// Use the full path to image for the memcache key because there
29
+// might be duplicates from other registries
30
+type Keyer interface {
31
+	Key() string
32
+}
33
+
34
+type manifestKey struct {
35
+	fullRepositoryPath, reference string
36
+}
37
+
38
+func NewManifestKey(image image.CanonicalRef) Keyer {
39
+	return &manifestKey{image.CanonicalName().String(), image.Tag}
40
+}
41
+
42
+func (k *manifestKey) Key() string {
43
+	return strings.Join([]string{
44
+		"registryhistoryv3", // Just to version in case we need to change format later.
45
+		k.fullRepositoryPath,
46
+		k.reference,
47
+	}, "|")
48
+}
49
+
50
+type tagKey struct {
51
+	fullRepositoryPath string
52
+}
53
+
54
+func NewTagKey(id image.CanonicalName) Keyer {
55
+	return &tagKey{id.String()}
56
+}
57
+
58
+func (k *tagKey) Key() string {
59
+	return strings.Join([]string{
60
+		"registrytagsv3", // Just to version in case we need to change format later.
61
+		k.fullRepositoryPath,
62
+	}, "|")
63
+}
64
+
65
+type repoKey struct {
66
+	fullRepositoryPath string
67
+}
68
+
69
+func NewRepositoryKey(repo image.CanonicalName) Keyer {
70
+	return &repoKey{repo.String()}
71
+}
72
+
73
+func (k *repoKey) Key() string {
74
+	return strings.Join([]string{
75
+		"registryrepov3",
76
+		k.fullRepositoryPath,
77
+	}, "|")
78
+}

+ 13
- 0
registry/cache/doc.go View File

@@ -0,0 +1,13 @@
1
+/*
2
+
3
+This package implements an image metadata cache given a backing k-v
4
+store.
5
+
6
+The interface `Client` stands in the k-v store; `Cache` implements
7
+registry.Registry given a `Client`.
8
+
9
+The `Warmer` is for continually refreshing the cache by fetching new
10
+metadata from the original image registries.
11
+
12
+*/
13
+package cache

registry/cache/memcached.go → registry/cache/memcached/memcached.go View File

@@ -1,11 +1,10 @@
1
-package cache
1
+package memcached
2 2
 
3 3
 import (
4 4
 	"encoding/binary"
5 5
 	"fmt"
6 6
 	"net"
7 7
 	"sort"
8
-	"strings"
9 8
 	"sync"
10 9
 	"time"
11 10
 
@@ -13,43 +12,13 @@ import (
13 12
 	"github.com/go-kit/kit/log"
14 13
 	"github.com/pkg/errors"
15 14
 
16
-	fluxerr "github.com/weaveworks/flux/errors"
17
-	"github.com/weaveworks/flux/image"
15
+	"github.com/weaveworks/flux/registry/cache"
18 16
 )
19 17
 
20 18
 const (
21 19
 	expiry = time.Hour
22 20
 )
23 21
 
24
-var (
25
-	ErrNotCached = &fluxerr.Error{
26
-		Type: fluxerr.Missing,
27
-		Err:  memcache.ErrCacheMiss,
28
-		Help: `Image not yet cached
29
-
30
-It takes time to initially cache all the images. Please wait.
31
-
32
-If you have waited for a long time, check the flux logs. Potential
33
-reasons for the error are: no internet, no cache, error with the remote
34
-repository.
35
-`,
36
-	}
37
-)
38
-
39
-type Reader interface {
40
-	GetKey(k Keyer) ([]byte, time.Time, error)
41
-}
42
-
43
-type Writer interface {
44
-	SetKey(k Keyer, v []byte) error
45
-}
46
-
47
-type Client interface {
48
-	Reader
49
-	Writer
50
-	Stop()
51
-}
52
-
53 22
 // MemcacheClient is a memcache client that gets its server list from SRV
54 23
 // records, and periodically updates that ServerList.
55 24
 type memcacheClient struct {
@@ -73,7 +42,7 @@ type MemcacheConfig struct {
73 42
 	MaxIdleConns   int
74 43
 }
75 44
 
76
-func NewMemcacheClient(config MemcacheConfig) Client {
45
+func NewMemcacheClient(config MemcacheConfig) cache.Client {
77 46
 	var servers memcache.ServerList
78 47
 	client := memcache.NewFromSelector(&servers)
79 48
 	client.Timeout = config.Timeout
@@ -99,7 +68,7 @@ func NewMemcacheClient(config MemcacheConfig) Client {
99 68
 }
100 69
 
101 70
 // Does not use DNS, accepts static list of servers.
102
-func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) Client {
71
+func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) cache.Client {
103 72
 	var servers memcache.ServerList
104 73
 	servers.SetServers(addresses...)
105 74
 	client := memcache.NewFromSelector(&servers)
@@ -114,16 +83,15 @@ func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) Cl
114 83
 		quit:       make(chan struct{}),
115 84
 	}
116 85
 
117
-	client.FlushAll()
118 86
 	return newClient
119 87
 }
120 88
 
121
-func (c *memcacheClient) GetKey(k Keyer) ([]byte, time.Time, error) {
89
+func (c *memcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) {
122 90
 	cacheItem, err := c.Get(k.Key())
123 91
 	if err != nil {
124 92
 		if err == memcache.ErrCacheMiss {
125 93
 			// Don't log on cache miss
126
-			return []byte{}, time.Time{}, ErrNotCached
94
+			return []byte{}, time.Time{}, cache.ErrNotCached
127 95
 		} else {
128 96
 			c.logger.Log("err", errors.Wrap(err, "Fetching tag from memcache"))
129 97
 			return []byte{}, time.Time{}, err
@@ -133,7 +101,7 @@ func (c *memcacheClient) GetKey(k Keyer) ([]byte, time.Time, error) {
133 101
 	return cacheItem.Value[4:], time.Unix(int64(expiry), 0), nil
134 102
 }
135 103
 
136
-func (c *memcacheClient) SetKey(k Keyer, v []byte) error {
104
+func (c *memcacheClient) SetKey(k cache.Keyer, v []byte) error {
137 105
 	expiry := time.Now().Add(expiry).Unix()
138 106
 	expiryBytes := make([]byte, 4, 4)
139 107
 	binary.BigEndian.PutUint32(expiryBytes, uint32(expiry))
@@ -187,56 +155,3 @@ func (c *memcacheClient) updateMemcacheServers() error {
187 155
 	sort.Strings(servers)
188 156
 	return c.serverList.SetServers(servers...)
189 157
 }
190
-
191
-// An interface to provide the key under which to store the data
192
-// Use the full path to image for the memcache key because there
193
-// might be duplicates from other registries
194
-type Keyer interface {
195
-	Key() string
196
-}
197
-
198
-type manifestKey struct {
199
-	fullRepositoryPath, reference string
200
-}
201
-
202
-func NewManifestKey(image image.CanonicalRef) Keyer {
203
-	return &manifestKey{image.CanonicalName().String(), image.Tag}
204
-}
205
-
206
-func (k *manifestKey) Key() string {
207
-	return strings.Join([]string{
208
-		"registryhistoryv3", // Just to version in case we need to change format later.
209
-		k.fullRepositoryPath,
210
-		k.reference,
211
-	}, "|")
212
-}
213
-
214
-type tagKey struct {
215
-	fullRepositoryPath string
216
-}
217
-
218
-func NewTagKey(id image.CanonicalName) Keyer {
219
-	return &tagKey{id.String()}
220
-}
221
-
222
-func (k *tagKey) Key() string {
223
-	return strings.Join([]string{
224
-		"registrytagsv3", // Just to version in case we need to change format later.
225
-		k.fullRepositoryPath,
226
-	}, "|")
227
-}
228
-
229
-type repoKey struct {
230
-	fullRepositoryPath string
231
-}
232
-
233
-func NewRepositoryKey(repo image.CanonicalName) Keyer {
234
-	return &repoKey{repo.String()}
235
-}
236
-
237
-func (k *repoKey) Key() string {
238
-	return strings.Join([]string{
239
-		"registryrepov3",
240
-		k.fullRepositoryPath,
241
-	}, "|")
242
-}

registry/cache/memcached_test.go → registry/cache/memcached/memcached_test.go View File

@@ -1,6 +1,5 @@
1 1
 // +build integration
2
-
3
-package cache
2
+package memcached
4 3
 
5 4
 import (
6 5
 	"flag"

+ 12
- 12
registry/cache/monitoring.go View File

@@ -11,28 +11,28 @@ import (
11 11
 )
12 12
 
13 13
 var (
14
-	memcacheRequestDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
14
+	cacheRequestDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
15 15
 		Namespace: "flux",
16
-		Subsystem: "memcache",
16
+		Subsystem: "cache",
17 17
 		Name:      "request_duration_seconds",
18
-		Help:      "Duration of memcache requests, in seconds.",
18
+		Help:      "Duration of cache requests, in seconds.",
19 19
 		Buckets:   stdprometheus.DefBuckets,
20 20
 	}, []string{fluxmetrics.LabelMethod, fluxmetrics.LabelSuccess})
21 21
 )
22 22
 
23
-type instrumentedMemcacheClient struct {
23
+type instrumentedClient struct {
24 24
 	next Client
25 25
 }
26 26
 
27
-func InstrumentMemcacheClient(c Client) Client {
28
-	return &instrumentedMemcacheClient{
27
+func InstrumentClient(c Client) Client {
28
+	return &instrumentedClient{
29 29
 		next: c,
30 30
 	}
31 31
 }
32 32
 
33
-func (i *instrumentedMemcacheClient) GetKey(k Keyer) (_ []byte, ex time.Time, err error) {
33
+func (i *instrumentedClient) GetKey(k Keyer) (_ []byte, ex time.Time, err error) {
34 34
 	defer func(begin time.Time) {
35
-		memcacheRequestDuration.With(
35
+		cacheRequestDuration.With(
36 36
 			fluxmetrics.LabelMethod, "GetKey",
37 37
 			fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
38 38
 		).Observe(time.Since(begin).Seconds())
@@ -40,9 +40,9 @@ func (i *instrumentedMemcacheClient) GetKey(k Keyer) (_ []byte, ex time.Time, er
40 40
 	return i.next.GetKey(k)
41 41
 }
42 42
 
43
-func (i *instrumentedMemcacheClient) SetKey(k Keyer, v []byte) (err error) {
43
+func (i *instrumentedClient) SetKey(k Keyer, v []byte) (err error) {
44 44
 	defer func(begin time.Time) {
45
-		memcacheRequestDuration.With(
45
+		cacheRequestDuration.With(
46 46
 			fluxmetrics.LabelMethod, "SetKey",
47 47
 			fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
48 48
 		).Observe(time.Since(begin).Seconds())
@@ -50,9 +50,9 @@ func (i *instrumentedMemcacheClient) SetKey(k Keyer, v []byte) (err error) {
50 50
 	return i.next.SetKey(k, v)
51 51
 }
52 52
 
53
-func (i *instrumentedMemcacheClient) Stop() {
53
+func (i *instrumentedClient) Stop() {
54 54
 	defer func(begin time.Time) {
55
-		memcacheRequestDuration.With(
55
+		cacheRequestDuration.With(
56 56
 			fluxmetrics.LabelMethod, "Stop",
57 57
 			fluxmetrics.LabelSuccess, "true",
58 58
 		).Observe(time.Since(begin).Seconds())

+ 101
- 0
registry/cache/registry.go View File

@@ -0,0 +1,101 @@
1
+package cache
2
+
3
+import (
4
+	"encoding/json"
5
+	"sort"
6
+
7
+	"github.com/pkg/errors"
8
+
9
+	"github.com/weaveworks/flux/image"
10
+)
11
+
12
+var (
13
+	ErrNotCached = &fluxerr.Error{
14
+		Type: fluxerr.Missing,
15
+		Err:  errors.New("item not in cache"),
16
+		Help: `Image not yet cached
17
+
18
+It takes time to initially cache all the images. Please wait.
19
+
20
+If you have waited for a long time, check the flux logs. Potential
21
+reasons for the error are: no internet, no cache, error with the remote
22
+repository.
23
+`,
24
+	}
25
+)
26
+
27
+// Cache is a local cache of image metadata.
28
+type Cache struct {
29
+	Reader Reader
30
+}
31
+
32
+// GetRepository returns the list of image manifests in an image
33
+// repository (e.g,. at "quay.io/weaveworks/flux")
34
+func (c *Cache) GetRepository(id image.Name) ([]image.Info, error) {
35
+	repoKey := NewRepositoryKey(id.CanonicalName())
36
+	bytes, _, err := c.Reader.GetKey(repoKey)
37
+	if err != nil {
38
+		return nil, err
39
+	}
40
+	var repo ImageRepository
41
+	if err = json.Unmarshal(bytes, &repo); err != nil {
42
+		return nil, err
43
+	}
44
+
45
+	// We only care about the error if we've never successfully
46
+	// updated the result.
47
+	if repo.LastUpdate.IsZero() {
48
+		if repo.LastError != "" {
49
+			return nil, errors.New(repo.LastError)
50
+		}
51
+		return nil, ErNotCached
52
+	}
53
+
54
+	images := make([]image.Info, len(repo.Images))
55
+	var i int
56
+	for _, im := range repo.Images {
57
+		images[i] = im
58
+		i++
59
+	}
60
+	sort.Sort(image.ByCreatedDesc(images))
61
+	return images, nil
62
+}
63
+
64
+// GetImage gets the manifest of a specific image ref, from its
65
+// registry.
66
+func (c *Cache) GetImage(id image.Ref) (image.Info, error) {
67
+	key := NewManifestKey(id.CanonicalRef())
68
+
69
+	val, _, err := c.Reader.GetKey(key)
70
+	if err != nil {
71
+		return image.Info{}, err
72
+	}
73
+	var img image.Info
74
+	err = json.Unmarshal(val, &img)
75
+	if err != nil {
76
+		return image.Info{}, err
77
+	}
78
+	return img, nil
79
+}
80
+
81
+// ImageRepository holds the last good information on an image
82
+// repository.
83
+//
84
+// Whenever we successfully fetch a full set of image info,
85
+// `LastUpdate` and `Images` shall each be assigned a value, and
86
+// `LastError` will be cleared.
87
+//
88
+// If we cannot for any reason obtain a full set of image info,
89
+// `LastError` shall be assigned a value, and the other fields left
90
+// alone.
91
+//
92
+// It's possible to have all fields populated: this means at some
93
+// point it was successfully fetched, but since then, there's been an
94
+// error. It's then up to the caller to decide what to do with the
95
+// value (show the images, but also indicate there's a problem, for
96
+// example).
97
+type ImageRepository struct {
98
+	LastError  string
99
+	LastUpdate time.Time
100
+	Images     map[string]image.Info
101
+}

registry/warming.go → registry/cache/warming.go View File

@@ -1,5 +1,4 @@
1
-// Runs a daemon to continuously warm the registry cache.
2
-package registry
1
+package cache
3 2
 
4 3
 import (
5 4
 	"context"
@@ -12,34 +11,33 @@ import (
12 11
 	"github.com/go-kit/kit/log"
13 12
 	"github.com/pkg/errors"
14 13
 	"github.com/weaveworks/flux/image"
15
-	"github.com/weaveworks/flux/registry/cache"
14
+	"github.com/weaveworks/flux/registry"
16 15
 )
17 16
 
18 17
 const refreshWhenExpiryWithin = time.Minute
19 18
 const askForNewImagesInterval = time.Minute
20 19
 
20
+// Warmer refreshes the information kept in the cache from remote
21
+// registries.
21 22
 type Warmer struct {
22 23
 	Logger        log.Logger
23
-	ClientFactory *RemoteClientFactory
24
+	ClientFactory *registry.RemoteClientFactory
24 25
 	Expiry        time.Duration
25
-	Cache         cache.Client
26
+	Cache         Client
26 27
 	Burst         int
27 28
 	Priority      chan image.Name
28 29
 	Notify        func()
29 30
 }
30 31
 
31
-// This is what we get from the callback handed to us
32
-type ImageCreds map[image.Name]Credentials
33
-
34 32
 // .. and this is what we keep in the backlog
35 33
 type backlogItem struct {
36 34
 	image.Name
37
-	Credentials
35
+	registry.Credentials
38 36
 }
39 37
 
40 38
 // Continuously get the images to populate the cache with, and
41 39
 // populate the cache with them.
42
-func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() ImageCreds) {
40
+func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) {
43 41
 	defer wg.Done()
44 42
 
45 43
 	if w.Logger == nil || w.ClientFactory == nil || w.Expiry == 0 || w.Cache == nil {
@@ -90,7 +88,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
90 88
 	}
91 89
 }
92 90
 
93
-func imageCredsToBacklog(imageCreds ImageCreds) []backlogItem {
91
+func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
94 92
 	backlog := make([]backlogItem, len(imageCreds))
95 93
 	var i int
96 94
 	for name, cred := range imageCreds {
@@ -100,29 +98,7 @@ func imageCredsToBacklog(imageCreds ImageCreds) []backlogItem {
100 98
 	return backlog
101 99
 }
102 100
 
103
-// ImageRepository holds the last good information on an image
104
-// repository.
105
-//
106
-// Whenever we successfully fetch a full set of image info,
107
-// `LastUpdate` and `Images` shall each be assigned a value, and
108
-// `LastError` will be cleared.
109
-//
110
-// If we cannot for any reason obtain a full set of image info,
111
-// `LastError` shall be assigned a value, and the other fields left
112
-// alone.
113
-//
114
-// It's possible to have all fields populated: this means at some
115
-// point it was successfully fetched, but since then, there's been an
116
-// error. It's then up to the caller to decide what to do with the
117
-// value (show the images, but also indicate there's a problem, for
118
-// example).
119
-type ImageRepository struct {
120
-	LastError  string
121
-	LastUpdate time.Time
122
-	Images     map[string]image.Info
123
-}
124
-
125
-func (w *Warmer) warm(id image.Name, creds Credentials) {
101
+func (w *Warmer) warm(id image.Name, creds registry.Credentials) {
126 102
 	client, err := w.ClientFactory.ClientFor(id.Registry(), creds)
127 103
 	if err != nil {
128 104
 		w.Logger.Log("err", err.Error())
@@ -131,11 +107,11 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
131 107
 
132 108
 	// This is what we're going to write back to the cache
133 109
 	var repo ImageRepository
134
-	repoKey := cache.NewRepositoryKey(id.CanonicalName())
110
+	repoKey := NewRepositoryKey(id.CanonicalName())
135 111
 	bytes, _, err := w.Cache.GetKey(repoKey)
136 112
 	if err == nil {
137 113
 		err = json.Unmarshal(bytes, &repo)
138
-	} else if err == cache.ErrNotCached {
114
+	} else if err == ErrNotCached {
139 115
 		err = nil
140 116
 	}
141 117
 
@@ -177,11 +153,7 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
177 153
 	for _, tag := range tags {
178 154
 		// See if we have the manifest already cached
179 155
 		newID := id.ToRef(tag)
180
-		key := cache.NewManifestKey(newID.CanonicalRef())
181
-		if err != nil {
182
-			w.Logger.Log("err", errors.Wrap(err, "creating key for memcache"))
183
-			continue
184
-		}
156
+		key := NewManifestKey(newID.CanonicalRef())
185 157
 		bytes, expiry, err := w.Cache.GetKey(key)
186 158
 		// If err, then we don't have it yet. Update.
187 159
 		switch {
@@ -226,7 +198,7 @@ func (w *Warmer) warm(id image.Name, creds Credentials) {
226 198
 					return
227 199
 				}
228 200
 
229
-				key := cache.NewManifestKey(img.ID.CanonicalRef())
201
+				key := NewManifestKey(img.ID.CanonicalRef())
230 202
 				// Write back to memcached
231 203
 				val, err := json.Marshal(img)
232 204
 				if err != nil {

registry/warming_test.go → registry/cache/warming_test.go View File

@@ -1,4 +1,4 @@
1
-package registry
1
+package cache
2 2
 
3 3
 import (
4 4
 	"context"

+ 0
- 67
registry/client.go View File

@@ -5,25 +5,14 @@ import (
5 5
 	"fmt"
6 6
 	"net/http"
7 7
 	"net/url"
8
-	"sort"
9 8
 	"time"
10 9
 
11 10
 	dockerregistry "github.com/heroku/docker-registry-client/registry"
12 11
 	"github.com/pkg/errors"
13 12
 
14 13
 	"github.com/weaveworks/flux/image"
15
-	"github.com/weaveworks/flux/registry/cache"
16 14
 )
17 15
 
18
-// ---
19
-
20
-// Client is a registry client. It is an interface so we can wrap it
21
-// in instrumentation, write fake implementations, and so on.
22
-type Client interface {
23
-	Tags(name image.Name) ([]string, error)
24
-	Manifest(name image.Ref) (image.Info, error)
25
-}
26
-
27 16
 // An implementation of Client that represents a Remote registry.
28 17
 // E.g. docker hub.
29 18
 type Remote struct {
@@ -109,59 +98,3 @@ func (a *Remote) ManifestFromV1(id image.Ref) (image.Info, error) {
109 98
 
110 99
 	return img, nil
111 100
 }
112
-
113
-// ---
114
-
115
-// Cache is a local cache of image metadata.
116
-type Cache struct {
117
-	Reader cache.Reader
118
-}
119
-
120
-// GetRepository returns the list of image manifests in an image
121
-// repository (e.g,. at "quay.io/weaveworks/flux")
122
-func (c *Cache) GetRepository(id image.Name) ([]image.Info, error) {
123
-	repoKey := cache.NewRepositoryKey(id.CanonicalName())
124
-	bytes, _, err := c.Reader.GetKey(repoKey)
125
-	if err != nil {
126
-		return nil, err
127
-	}
128
-	var repo ImageRepository
129
-	if err = json.Unmarshal(bytes, &repo); err != nil {
130
-		return nil, err
131
-	}
132
-
133
-	// We only care about the error if we've never successfully
134
-	// updated the result.
135
-	if repo.LastUpdate.IsZero() {
136
-		if repo.LastError != "" {
137
-			return nil, errors.New(repo.LastError)
138
-		}
139
-		return nil, errors.New("image metadata not fetched yet")
140
-	}
141
-
142
-	images := make([]image.Info, len(repo.Images))
143
-	var i int
144
-	for _, im := range repo.Images {
145
-		images[i] = im
146
-		i++
147
-	}
148
-	sort.Sort(image.ByCreatedDesc(images))
149
-	return images, nil
150
-}
151
-
152
-// GetImage gets the manifest of a specific image ref, from its
153
-// registry.
154
-func (c *Cache) GetImage(id image.Ref) (image.Info, error) {
155
-	key := cache.NewManifestKey(id.CanonicalRef())
156
-
157
-	val, _, err := c.Reader.GetKey(key)
158
-	if err != nil {
159
-		return image.Info{}, err
160
-	}
161
-	var img image.Info
162
-	err = json.Unmarshal(val, &img)
163
-	if err != nil {
164
-		return image.Info{}, err
165
-	}
166
-	return img, nil
167
-}

+ 5
- 0
registry/doc.go View File

@@ -0,0 +1,5 @@
1
+/*
2
+This package has types for dealing with image registries (e.g.,
3
+quay.io, DockerHub, Google Container Registry, ..).
4
+*/
5
+package registry

+ 12
- 0
registry/registry.go View File

@@ -10,7 +10,19 @@ const (
10 10
 	requestTimeout = 10 * time.Second
11 11
 )
12 12
 
13
+// Registry is a store of image metadata.
13 14
 type Registry interface {
14 15
 	GetRepository(image.Name) ([]image.Info, error)
15 16
 	GetImage(image.Ref) (image.Info, error)
16 17
 }
18
+
19
+// Client is a remote registry client. It is an interface so we can
20
+// wrap it in instrumentation, write fake implementations, and so on.
21
+type Client interface {
22
+	Tags(name image.Name) ([]string, error)
23
+	Manifest(name image.Ref) (image.Info, error)
24
+}
25
+
26
+// ImageCreds is a record of which images need which credentials,
27
+// which is supplied to us (probably by interrogating the cluster)
28
+type ImageCreds map[image.Name]Credentials

Loading…
Cancel
Save