Browse Source

Consolidated code.

Many separate files were hard to refactor.
This change is purely copy/pasting code into other places and fixing their paths.
Phil Winder 3 years ago
parent
commit
cd557e9419

+ 6
- 4
cmd/fluxd/main.go View File

@@ -30,6 +30,8 @@ import (
30 30
 	daemonhttp "github.com/weaveworks/flux/http/daemon"
31 31
 	"github.com/weaveworks/flux/job"
32 32
 	"github.com/weaveworks/flux/registry"
33
+	registryMemcache "github.com/weaveworks/flux/registry/memcache"
34
+	registryMiddleware "github.com/weaveworks/flux/registry/middleware"
33 35
 	"github.com/weaveworks/flux/remote"
34 36
 	"github.com/weaveworks/flux/ssh"
35 37
 )
@@ -194,16 +196,16 @@ func main() {
194 196
 	var warmerQueue registry.Queue
195 197
 	{
196 198
 		// Cache
197
-		var memcacheClient registry.MemcacheClient
199
+		var memcacheClient registryMemcache.MemcacheClient
198 200
 		if *memcachedHostname != "" {
199
-			memcacheClient = registry.NewMemcacheClient(registry.MemcacheConfig{
201
+			memcacheClient = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
200 202
 				Host:           *memcachedHostname,
201 203
 				Service:        *memcachedService,
202 204
 				Timeout:        *memcachedTimeout,
203 205
 				UpdateInterval: 1 * time.Minute,
204 206
 				Logger:         log.NewContext(logger).With("component", "memcached"),
205 207
 			})
206
-			memcacheClient = registry.InstrumentMemcacheClient(memcacheClient)
208
+			memcacheClient = registryMemcache.InstrumentMemcacheClient(memcacheClient)
207 209
 			defer memcacheClient.Stop()
208 210
 		}
209 211
 
@@ -220,7 +222,7 @@ func main() {
220 222
 
221 223
 		// Remote
222 224
 		registryLogger := log.NewContext(logger).With("component", "registry")
223
-		remoteFactory := registry.NewRemoteClientFactory(creds, registryLogger, registry.RateLimiterConfig{
225
+		remoteFactory := registry.NewRemoteClientFactory(creds, registryLogger, registryMiddleware.RateLimiterConfig{
224 226
 			RPS:   *registryRPS,
225 227
 			Burst: *registryBurst,
226 228
 		})

+ 0
- 105
registry/cache.go View File

@@ -1,105 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"encoding/json"
5
-	"strings"
6
-	"time"
7
-
8
-	"github.com/bradfitz/gomemcache/memcache"
9
-	"github.com/docker/distribution/manifest/schema1"
10
-	"github.com/go-kit/kit/log"
11
-	"github.com/pkg/errors"
12
-)
13
-
14
-type Cache struct {
15
-	creds  Credentials
16
-	expiry time.Duration
17
-	Client MemcacheClient
18
-	logger log.Logger
19
-}
20
-
21
-func NewCache(creds Credentials, cache MemcacheClient, expiry time.Duration, logger log.Logger) dockerRegistryInterface {
22
-	return &Cache{
23
-		creds:  creds,
24
-		expiry: expiry,
25
-		Client: cache,
26
-		logger: logger,
27
-	}
28
-}
29
-
30
-func (c *Cache) Manifest(repository, reference string) (history []schema1.History, err error) {
31
-	repo, err := ParseRepository(repository)
32
-	if err != nil {
33
-		c.logger.Log("err", errors.Wrap(err, "Parsing repository"))
34
-		return
35
-	}
36
-	creds := c.creds.credsFor(repo.Host())
37
-
38
-	// Try the cache
39
-	key := manifestKey(creds.username, repo.String(), reference)
40
-	cacheItem, err := c.Client.Get(key)
41
-	if err != nil {
42
-		if err != memcache.ErrCacheMiss {
43
-			c.logger.Log("err", errors.Wrap(err, "Fetching tag from memcache"))
44
-		}
45
-		return
46
-	}
47
-
48
-	// Return the cache item
49
-	err = json.Unmarshal(cacheItem.Value, &history)
50
-	if err != nil {
51
-		c.logger.Log("err", err.Error)
52
-		return
53
-	}
54
-	return
55
-}
56
-
57
-func (c *Cache) Tags(repository string) (tags []string, err error) {
58
-	repo, err := ParseRepository(repository)
59
-	if err != nil {
60
-		c.logger.Log("err", errors.Wrap(err, "Parsing repository"))
61
-		return
62
-	}
63
-	creds := c.creds.credsFor(repo.Host())
64
-
65
-	// Try the cache
66
-	key := tagKey(creds.username, repo.String())
67
-	cacheItem, err := c.Client.Get(key)
68
-	if err != nil {
69
-		if err != memcache.ErrCacheMiss {
70
-			c.logger.Log("err", errors.Wrap(err, "Fetching tag from memcache"))
71
-		}
72
-		return
73
-	}
74
-
75
-	// Return the cache item
76
-	err = json.Unmarshal(cacheItem.Value, &tags)
77
-	if err != nil {
78
-		c.logger.Log("err", err.Error)
79
-		return
80
-	}
81
-	return
82
-}
83
-
84
-func manifestKey(username, repository, reference string) string {
85
-	return strings.Join([]string{
86
-		"registryhistoryv1", // Just to version in case we need to change format later.
87
-		// Just the username here means we won't invalidate the cache when user
88
-		// changes password, but that should be rare. And, it also means we're not
89
-		// putting user passwords in plaintext into memcache.
90
-		username,
91
-		repository,
92
-		reference,
93
-	}, "|")
94
-}
95
-
96
-func tagKey(username, repository string) string {
97
-	return strings.Join([]string{
98
-		"registrytagsv1", // Just to version in case we need to change format later.
99
-		// Just the username here means we won't invalidate the cache when user
100
-		// changes password, but that should be rare. And, it also means we're not
101
-		// putting user passwords in plaintext into memcache.
102
-		username,
103
-		repository,
104
-	}, "|")
105
-}

+ 2
- 1
registry/cache_test.go View File

@@ -13,6 +13,7 @@ import (
13 13
 	"github.com/bradfitz/gomemcache/memcache"
14 14
 	"github.com/docker/distribution/manifest/schema1"
15 15
 	"github.com/go-kit/kit/log"
16
+	registryMemcache "github.com/weaveworks/flux/registry/memcache"
16 17
 )
17 18
 
18 19
 var (
@@ -26,7 +27,7 @@ type stoppableMemcacheClient struct {
26 27
 func (s *stoppableMemcacheClient) Stop() {}
27 28
 
28 29
 // Setup sets up stuff for testing
29
-func Setup(t *testing.T) MemcacheClient {
30
+func Setup(t *testing.T) registryMemcache.MemcacheClient {
30 31
 	mc := memcache.New(strings.Fields(*memcachedIPs)...)
31 32
 	if err := mc.FlushAll(); err != nil {
32 33
 		t.Fatal(err)

+ 0
- 26
registry/heroku_wrapper.go View File

@@ -1,26 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"github.com/docker/distribution/manifest/schema1"
5
-	dockerregistry "github.com/heroku/docker-registry-client/registry"
6
-)
7
-
8
-type herokuWrapper struct {
9
-	*dockerregistry.Registry
10
-}
11
-
12
-// Convert between types. dockerregistry returns the *same* type but from a
13
-// vendored library so go freaks out. Eugh.
14
-// TODO: The only thing we care about here for now is history. Frankly it might
15
-// be easier to convert it to JSON and back.
16
-func (h herokuWrapper) Manifest(repository, reference string) ([]schema1.History, error) {
17
-	manifest, err := h.Registry.Manifest(repository, reference)
18
-	if err != nil || manifest == nil {
19
-		return nil, err
20
-	}
21
-	var result []schema1.History
22
-	for _, item := range manifest.History {
23
-		result = append(result, schema1.History{item.V1Compatibility})
24
-	}
25
-	return result, err
26
-}

registry/memcached.go → registry/memcache/memcached.go View File

@@ -1,4 +1,4 @@
1
-package registry
1
+package memcache
2 2
 
3 3
 import (
4 4
 	"fmt"

registry/metrics.go → registry/memcache/monitoring.go View File

@@ -1,4 +1,4 @@
1
-package registry
1
+package memcache
2 2
 
3 3
 import (
4 4
 	"fmt"
@@ -11,27 +11,7 @@ import (
11 11
 	fluxmetrics "github.com/weaveworks/flux/metrics"
12 12
 )
13 13
 
14
-const (
15
-	LabelRequestKind = "kind"
16
-
17
-	RequestKindTags     = "tags"
18
-	RequestKindMetadata = "metadata"
19
-)
20
-
21 14
 var (
22
-	fetchDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
23
-		Namespace: "flux",
24
-		Subsystem: "registry",
25
-		Name:      "fetch_duration_seconds",
26
-		Help:      "Duration of Image metadata fetches, in seconds.",
27
-		Buckets:   stdprometheus.DefBuckets,
28
-	}, []string{fluxmetrics.LabelSuccess})
29
-	requestDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
30
-		Namespace: "flux",
31
-		Subsystem: "registry",
32
-		Name:      "request_duration_seconds",
33
-		Help:      "Duration of HTTP requests made in the course of fetching Image metadata",
34
-	}, []string{LabelRequestKind, fluxmetrics.LabelSuccess})
35 15
 	memcacheRequestDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
36 16
 		Namespace: "flux",
37 17
 		Subsystem: "memcache",

registry/backoff.go → registry/middleware/backoff.go View File

@@ -1,4 +1,4 @@
1
-package registry
1
+package middleware
2 2
 
3 3
 import (
4 4
 	"errors"
@@ -10,8 +10,8 @@ import (
10 10
 )
11 11
 
12 12
 const (
13
-	initialBackoff = 500 * time.Millisecond
14
-	maxBackoff     = 10 * time.Second
13
+	InitialBackoff = 500 * time.Millisecond
14
+	MaxBackoff     = 10 * time.Second
15 15
 )
16 16
 
17 17
 var (

registry/backoff_test.go → registry/middleware/backoff_test.go View File

@@ -1,4 +1,4 @@
1
-package registry
1
+package middleware
2 2
 
3 3
 import (
4 4
 	"errors"

registry/quay.go → registry/middleware/quay.go View File

@@ -1,4 +1,4 @@
1
-package registry
1
+package middleware
2 2
 
3 3
 import (
4 4
 	"net/http"
@@ -11,14 +11,14 @@ import (
11 11
 // token, so once you have authenticated, it can keep using it rather
12 12
 // than authenticating each time.
13 13
 
14
-type wwwAuthenticateFixer struct {
15
-	transport   http.RoundTripper
14
+type WWWAuthenticateFixer struct {
15
+	Transport   http.RoundTripper
16 16
 	tokenHeader string
17 17
 }
18 18
 
19
-func (t *wwwAuthenticateFixer) RoundTrip(req *http.Request) (*http.Response, error) {
19
+func (t *WWWAuthenticateFixer) RoundTrip(req *http.Request) (*http.Response, error) {
20 20
 	t.maybeAddToken(req)
21
-	res, err := t.transport.RoundTrip(req)
21
+	res, err := t.Transport.RoundTrip(req)
22 22
 	if err == nil {
23 23
 		newAuthHeaders := []string{}
24 24
 		for _, h := range res.Header[http.CanonicalHeaderKey("WWW-Authenticate")] {
@@ -46,7 +46,7 @@ func replaceUnquoted(h string) string {
46 46
 // again. BEWARE: this means this transport should only be used when
47 47
 // asking (repeatedly) about a single repository, otherwise we may
48 48
 // leak authorisation.
49
-func (t *wwwAuthenticateFixer) maybeAddToken(req *http.Request) {
49
+func (t *WWWAuthenticateFixer) maybeAddToken(req *http.Request) {
50 50
 	authHeaders := req.Header[http.CanonicalHeaderKey("Authorization")]
51 51
 	for _, h := range authHeaders {
52 52
 		if strings.EqualFold(h[:7], "bearer ") {

registry/rate_limiter.go → registry/middleware/rate_limiter.go View File

@@ -1,6 +1,7 @@
1
-package registry
1
+package middleware
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"github.com/pkg/errors"
5 6
 	"golang.org/x/time/rate"
6 7
 	"net/http"
@@ -38,3 +39,12 @@ func (rl *RoundTripRateLimiter) RoundTrip(r *http.Request) (*http.Response, erro
38 39
 	}
39 40
 	return rl.Transport.RoundTrip(r)
40 41
 }
42
+
43
+type ContextRoundTripper struct {
44
+	Transport http.RoundTripper
45
+	Ctx       context.Context
46
+}
47
+
48
+func (rt *ContextRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
49
+	return rt.Transport.RoundTrip(r.WithContext(rt.Ctx))
50
+}

registry/rate_limiter_test.go → registry/middleware/rate_limiter_test.go View File

@@ -1,4 +1,4 @@
1
-package registry
1
+package middleware
2 2
 
3 3
 import (
4 4
 	"context"
@@ -10,6 +10,8 @@ import (
10 10
 	"time"
11 11
 )
12 12
 
13
+const requestTimeout = 10 * time.Second
14
+
13 15
 func TestRemoteFactory_RateLimit(t *testing.T) {
14 16
 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15 17
 		fmt.Fprintln(w, "Hello, client")

+ 24
- 0
registry/monitoring.go View File

@@ -5,10 +5,34 @@ import (
5 5
 	"strconv"
6 6
 	"time"
7 7
 
8
+	"github.com/go-kit/kit/metrics/prometheus"
9
+	stdprometheus "github.com/prometheus/client_golang/prometheus"
8 10
 	"github.com/weaveworks/flux"
9 11
 	fluxmetrics "github.com/weaveworks/flux/metrics"
10 12
 )
11 13
 
14
+const (
15
+	LabelRequestKind    = "kind"
16
+	RequestKindTags     = "tags"
17
+	RequestKindMetadata = "metadata"
18
+)
19
+
20
+var (
21
+	fetchDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
22
+		Namespace: "flux",
23
+		Subsystem: "registry",
24
+		Name:      "fetch_duration_seconds",
25
+		Help:      "Duration of Image metadata fetches, in seconds.",
26
+		Buckets:   stdprometheus.DefBuckets,
27
+	}, []string{fluxmetrics.LabelSuccess})
28
+	requestDuration = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
29
+		Namespace: "flux",
30
+		Subsystem: "registry",
31
+		Name:      "request_duration_seconds",
32
+		Help:      "Duration of HTTP requests made in the course of fetching Image metadata",
33
+	}, []string{LabelRequestKind, fluxmetrics.LabelSuccess})
34
+)
35
+
12 36
 type InstrumentedRegistry Registry
13 37
 
14 38
 type instrumentedRegistry struct {

+ 0
- 62
registry/queue.go View File

@@ -1,62 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"github.com/go-kit/kit/log"
5
-	"math/rand"
6
-	"sync"
7
-	"time"
8
-)
9
-
10
-// Queue provides an updating repository queue for the warmer.
11
-// If no items are added to the queue this will randomly add a new
12
-// registry to warm
13
-type Queue struct {
14
-	RunningContainers    func() []Repository
15
-	Logger               log.Logger
16
-	RegistryPollInterval time.Duration
17
-	warmQueue            chan Repository
18
-	queueLock            sync.Mutex
19
-}
20
-
21
-func NewQueue(runningContainersFunc func() []Repository, l log.Logger, emptyQueueTick time.Duration) Queue {
22
-	return Queue{
23
-		RunningContainers:    runningContainersFunc,
24
-		Logger:               l,
25
-		RegistryPollInterval: emptyQueueTick,
26
-		warmQueue:            make(chan Repository, 100), // Don't close this. It will be GC'ed when this instance is destroyed.
27
-	}
28
-}
29
-
30
-// Queue loop to maintain the queue and periodically add a random
31
-// repository that is running in the cluster.
32
-func (w *Queue) Loop(stop chan struct{}, wg *sync.WaitGroup) {
33
-	defer wg.Done()
34
-
35
-	if w.RunningContainers == nil || w.Logger == nil || w.RegistryPollInterval == 0 {
36
-		panic("registry.Queue fields are nil")
37
-	}
38
-
39
-	pollImages := time.Tick(w.RegistryPollInterval)
40
-
41
-	for {
42
-		select {
43
-		case <-stop:
44
-			w.Logger.Log("stopping", "true")
45
-			return
46
-		case <-pollImages:
47
-			c := w.RunningContainers()
48
-			if len(c) > 0 { // Only add random registry if there are running containers
49
-				i := rand.Intn(len(c)) // Pick random registry
50
-				w.queueLock.Lock()
51
-				w.warmQueue <- c[i] // Add registry to queue
52
-				w.queueLock.Unlock()
53
-			}
54
-		}
55
-	}
56
-}
57
-
58
-func (w *Queue) Queue() chan Repository {
59
-	w.queueLock.Lock()
60
-	defer w.queueLock.Unlock()
61
-	return w.warmQueue
62
-}

+ 0
- 59
registry/queue_test.go View File

@@ -1,59 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"github.com/go-kit/kit/log"
5
-	"os"
6
-	"sync"
7
-	"testing"
8
-	"time"
9
-)
10
-
11
-func TestQueue_Usage(t *testing.T) {
12
-
13
-	queue := NewQueue(
14
-		func() []Repository {
15
-			r, _ := ParseRepository("test/image")
16
-			return []Repository{r}
17
-		},
18
-		log.NewLogfmtLogger(os.Stderr),
19
-		1*time.Millisecond,
20
-	)
21
-
22
-	shutdown := make(chan struct{})
23
-	shutdownWg := &sync.WaitGroup{}
24
-	shutdownWg.Add(1)
25
-	go queue.Loop(shutdown, shutdownWg)
26
-	defer func() {
27
-		shutdown <- struct{}{}
28
-		shutdownWg.Wait()
29
-	}()
30
-
31
-	time.Sleep(10 * time.Millisecond)
32
-	if len(queue.Queue()) == 0 {
33
-		t.Fatal("Should have randomly added containers to queue")
34
-	}
35
-}
36
-
37
-func TestQueue_NoContainers(t *testing.T) {
38
-	queue := NewQueue(
39
-		func() []Repository {
40
-			return []Repository{}
41
-		},
42
-		log.NewLogfmtLogger(os.Stderr),
43
-		1*time.Millisecond,
44
-	)
45
-
46
-	shutdown := make(chan struct{})
47
-	shutdownWg := &sync.WaitGroup{}
48
-	shutdownWg.Add(1)
49
-	go queue.Loop(shutdown, shutdownWg)
50
-	defer func() {
51
-		shutdown <- struct{}{}
52
-		shutdownWg.Wait()
53
-	}()
54
-
55
-	time.Sleep(10 * time.Millisecond)
56
-	if len(queue.Queue()) != 0 {
57
-		t.Fatal("There were no containers, so there should be no repositories in the queue")
58
-	}
59
-}

+ 337
- 4
registry/registry.go View File

@@ -2,12 +2,25 @@
2 2
 package registry
3 3
 
4 4
 import (
5
-	"sort"
6
-	"time"
7
-
5
+	"context"
6
+	"encoding/json"
7
+	"errors"
8
+	"fmt"
9
+	officialMemcache "github.com/bradfitz/gomemcache/memcache"
10
+	"github.com/docker/distribution/manifest/schema1"
8 11
 	"github.com/go-kit/kit/log"
9
-
12
+	dockerregistry "github.com/heroku/docker-registry-client/registry"
13
+	"github.com/jonboulle/clockwork"
14
+	wraperrors "github.com/pkg/errors"
10 15
 	"github.com/weaveworks/flux"
16
+	"github.com/weaveworks/flux/registry/memcache"
17
+	"github.com/weaveworks/flux/registry/middleware"
18
+	"golang.org/x/net/publicsuffix"
19
+	"net/http"
20
+	"net/http/cookiejar"
21
+	"sort"
22
+	"strings"
23
+	"time"
11 24
 )
12 25
 
13 26
 const (
@@ -140,3 +153,323 @@ func (is byCreatedDesc) Less(i, j int) bool {
140 153
 		return is[i].CreatedAt.After(is[j].CreatedAt)
141 154
 	}
142 155
 }
156
+
157
+type Remote interface {
158
+	Tags(repository Repository) ([]string, error)
159
+	Manifest(repository Repository, tag string) (flux.Image, error)
160
+	Cancel()
161
+}
162
+
163
+type remote struct {
164
+	client dockerRegistryInterface
165
+	cancel context.CancelFunc
166
+}
167
+
168
+func newRemote(client dockerRegistryInterface, cancel context.CancelFunc) Remote {
169
+	return &remote{
170
+		client: client,
171
+		cancel: cancel,
172
+	}
173
+}
174
+
175
+func (rc *remote) Tags(repository Repository) (_ []string, err error) {
176
+	return rc.client.Tags(repository.String())
177
+}
178
+
179
+func (rc *remote) Manifest(repository Repository, tag string) (img flux.Image, err error) {
180
+	img, err = flux.ParseImage(fmt.Sprintf("%s:%s", repository.String(), tag), time.Time{})
181
+	if err != nil {
182
+		return
183
+	}
184
+	history, err := rc.client.Manifest(repository.String(), tag)
185
+	if err != nil {
186
+		return
187
+	}
188
+
189
+	// the manifest includes some v1-backwards-compatibility data,
190
+	// oddly called "History", which are layer metadata as JSON
191
+	// strings; these appear most-recent (i.e., topmost layer) first,
192
+	// so happily we can just decode the first entry to get a created
193
+	// time.
194
+	type v1image struct {
195
+		Created time.Time `json:"created"`
196
+	}
197
+	var topmost v1image
198
+	if len(history) > 0 {
199
+		if err = json.Unmarshal([]byte(history[0].V1Compatibility), &topmost); err == nil {
200
+			if !topmost.Created.IsZero() {
201
+				img.CreatedAt = topmost.Created
202
+			}
203
+		}
204
+	}
205
+
206
+	return
207
+}
208
+
209
+func (rc *remote) Cancel() {
210
+	rc.cancel()
211
+}
212
+
213
+// This is an interface that represents the heroku docker registry library
214
+type dockerRegistryInterface interface {
215
+	Tags(repository string) ([]string, error)
216
+	Manifest(repository, reference string) ([]schema1.History, error)
217
+}
218
+
219
+var (
220
+	ErrNoMemcache = errors.New("no memecache")
221
+)
222
+
223
+type creds struct {
224
+	username, password string
225
+}
226
+
227
+// Credentials to a (Docker) registry.
228
+type Credentials struct {
229
+	m map[string]creds
230
+}
231
+
232
+type ClientFactory interface {
233
+	ClientFor(host string) (client dockerRegistryInterface, cancel context.CancelFunc, err error)
234
+}
235
+
236
+func NewCacheClientFactory(c Credentials, l log.Logger, mc memcache.MemcacheClient, ce time.Duration) ClientFactory {
237
+	for host, creds := range c.m {
238
+		l.Log("host", host, "username", creds.username)
239
+	}
240
+	return &cacheClientFactory{
241
+		creds:          c,
242
+		Logger:         l,
243
+		MemcacheClient: mc,
244
+		CacheExpiry:    ce,
245
+	}
246
+}
247
+
248
+type cacheClientFactory struct {
249
+	creds          Credentials
250
+	Logger         log.Logger
251
+	MemcacheClient memcache.MemcacheClient
252
+	CacheExpiry    time.Duration
253
+}
254
+
255
+func (f *cacheClientFactory) ClientFor(host string) (dockerRegistryInterface, context.CancelFunc, error) {
256
+	if f.MemcacheClient == nil {
257
+		return nil, nil, ErrNoMemcache
258
+	}
259
+	client := NewCache(f.creds, f.MemcacheClient, f.CacheExpiry, f.Logger)
260
+	return client, func() {}, nil
261
+}
262
+
263
+func NewRemoteClientFactory(c Credentials, l log.Logger, rlc middleware.RateLimiterConfig) ClientFactory {
264
+	for host, creds := range c.m {
265
+		l.Log("host", host, "username", creds.username)
266
+	}
267
+	return &remoteClientFactory{
268
+		creds:  c,
269
+		Logger: l,
270
+		rlConf: rlc,
271
+	}
272
+}
273
+
274
+type remoteClientFactory struct {
275
+	creds  Credentials
276
+	Logger log.Logger
277
+	rlConf middleware.RateLimiterConfig
278
+}
279
+
280
+func (f *remoteClientFactory) ClientFor(host string) (dockerRegistryInterface, context.CancelFunc, error) {
281
+	httphost := "https://" + host
282
+
283
+	// quay.io wants us to use cookies for authorisation, so we have
284
+	// to construct one (the default client has none). This means a
285
+	// bit more constructing things to be able to make a registry
286
+	// client literal, rather than calling .New()
287
+	jar, err := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
288
+	if err != nil {
289
+		return nil, nil, err
290
+	}
291
+	auth := f.creds.credsFor(host)
292
+
293
+	// A context we'll use to cancel requests on error
294
+	ctx, cancel := context.WithCancel(context.Background())
295
+	// Add a timeout to the request
296
+	ctx, cancel = context.WithTimeout(ctx, requestTimeout)
297
+
298
+	// Use the wrapper to fix headers for quay.io, and remember bearer tokens
299
+	var transport http.RoundTripper
300
+	{
301
+		transport = &middleware.WWWAuthenticateFixer{Transport: http.DefaultTransport}
302
+		// Now the auth-handling wrappers that come with the library
303
+		transport = dockerregistry.WrapTransport(transport, httphost, auth.username, auth.password)
304
+		// Add the backoff mechanism so we don't DOS registries
305
+		transport = middleware.BackoffRoundTripper(transport, middleware.InitialBackoff, middleware.MaxBackoff, clockwork.NewRealClock())
306
+		// Add timeout context
307
+		transport = &middleware.ContextRoundTripper{Transport: transport, Ctx: ctx}
308
+		// Rate limit
309
+		transport = middleware.RateLimitedRoundTripper(transport, f.rlConf, host)
310
+	}
311
+
312
+	client := herokuWrapper{
313
+		&dockerregistry.Registry{
314
+			URL: httphost,
315
+			Client: &http.Client{
316
+				Transport: transport,
317
+				Jar:       jar,
318
+				Timeout:   requestTimeout,
319
+			},
320
+			Logf: dockerregistry.Quiet,
321
+		},
322
+	}
323
+	return client, cancel, nil
324
+}
325
+
326
+type Repository struct {
327
+	img flux.Image // Internally we use an image to store data
328
+}
329
+
330
+func RepositoryFromImage(img flux.Image) Repository {
331
+	return Repository{
332
+		img: img,
333
+	}
334
+}
335
+
336
+func ParseRepository(imgStr string) (Repository, error) {
337
+	i, err := flux.ParseImage(imgStr, time.Time{})
338
+	if err != nil {
339
+		return Repository{}, err
340
+	}
341
+	return Repository{
342
+		img: i,
343
+	}, nil
344
+}
345
+
346
+func (r Repository) NamespaceImage() string {
347
+	return r.img.ID.NamespaceImage()
348
+}
349
+
350
+func (r Repository) Host() string {
351
+	return r.img.ID.Host
352
+}
353
+
354
+func (r Repository) String() string {
355
+	return r.img.ID.HostNamespaceImage()
356
+}
357
+
358
+func (r Repository) ToImage(tag string) flux.Image {
359
+	newImage := r.img
360
+	newImage.ID.Tag = tag
361
+	return newImage
362
+}
363
+
364
+type herokuWrapper struct {
365
+	*dockerregistry.Registry
366
+}
367
+
368
+// Convert between types. dockerregistry returns the *same* type but from a
369
+// vendored library so go freaks out. Eugh.
370
+// TODO: The only thing we care about here for now is history. Frankly it might
371
+// be easier to convert it to JSON and back.
372
+func (h herokuWrapper) Manifest(repository, reference string) ([]schema1.History, error) {
373
+	manifest, err := h.Registry.Manifest(repository, reference)
374
+	if err != nil || manifest == nil {
375
+		return nil, err
376
+	}
377
+	var result []schema1.History
378
+	for _, item := range manifest.History {
379
+		result = append(result, schema1.History{item.V1Compatibility})
380
+	}
381
+	return result, err
382
+}
383
+
384
+type Cache struct {
385
+	creds  Credentials
386
+	expiry time.Duration
387
+	Client memcache.MemcacheClient
388
+	logger log.Logger
389
+}
390
+
391
+func NewCache(creds Credentials, cache memcache.MemcacheClient, expiry time.Duration, logger log.Logger) dockerRegistryInterface {
392
+	return &Cache{
393
+		creds:  creds,
394
+		expiry: expiry,
395
+		Client: cache,
396
+		logger: logger,
397
+	}
398
+}
399
+
400
+func (c *Cache) Manifest(repository, reference string) (history []schema1.History, err error) {
401
+	repo, err := ParseRepository(repository)
402
+	if err != nil {
403
+		c.logger.Log("err", wraperrors.Wrap(err, "Parsing repository"))
404
+		return
405
+	}
406
+	creds := c.creds.credsFor(repo.Host())
407
+
408
+	// Try the cache
409
+	key := manifestKey(creds.username, repo.String(), reference)
410
+	cacheItem, err := c.Client.Get(key)
411
+	if err != nil {
412
+		if err != officialMemcache.ErrCacheMiss {
413
+			c.logger.Log("err", wraperrors.Wrap(err, "Fetching tag from memcache"))
414
+		}
415
+		return
416
+	}
417
+
418
+	// Return the cache item
419
+	err = json.Unmarshal(cacheItem.Value, &history)
420
+	if err != nil {
421
+		c.logger.Log("err", err.Error)
422
+		return
423
+	}
424
+	return
425
+}
426
+
427
+func (c *Cache) Tags(repository string) (tags []string, err error) {
428
+	repo, err := ParseRepository(repository)
429
+	if err != nil {
430
+		c.logger.Log("err", wraperrors.Wrap(err, "Parsing repository"))
431
+		return
432
+	}
433
+	creds := c.creds.credsFor(repo.Host())
434
+
435
+	// Try the cache
436
+	key := tagKey(creds.username, repo.String())
437
+	cacheItem, err := c.Client.Get(key)
438
+	if err != nil {
439
+		if err != officialMemcache.ErrCacheMiss {
440
+			c.logger.Log("err", wraperrors.Wrap(err, "Fetching tag from memcache"))
441
+		}
442
+		return
443
+	}
444
+
445
+	// Return the cache item
446
+	err = json.Unmarshal(cacheItem.Value, &tags)
447
+	if err != nil {
448
+		c.logger.Log("err", err.Error)
449
+		return
450
+	}
451
+	return
452
+}
453
+
454
+func manifestKey(username, repository, reference string) string {
455
+	return strings.Join([]string{
456
+		"registryhistoryv1", // Just to version in case we need to change format later.
457
+		// Just the username here means we won't invalidate the cache when user
458
+		// changes password, but that should be rare. And, it also means we're not
459
+		// putting user passwords in plaintext into memcache.
460
+		username,
461
+		repository,
462
+		reference,
463
+	}, "|")
464
+}
465
+
466
+func tagKey(username, repository string) string {
467
+	return strings.Join([]string{
468
+		"registrytagsv1", // Just to version in case we need to change format later.
469
+		// Just the username here means we won't invalidate the cache when user
470
+		// changes password, but that should be rare. And, it also means we're not
471
+		// putting user passwords in plaintext into memcache.
472
+		username,
473
+		repository,
474
+	}, "|")
475
+}

+ 344
- 0
registry/registry_test.go View File

@@ -10,8 +10,12 @@ import (
10 10
 	"github.com/go-kit/kit/log"
11 11
 	"github.com/pkg/errors"
12 12
 
13
+	"encoding/base64"
13 14
 	"github.com/docker/distribution/manifest/schema1"
14 15
 	"github.com/weaveworks/flux"
16
+	"github.com/weaveworks/flux/registry/middleware"
17
+	"io/ioutil"
18
+	"os"
15 19
 )
16 20
 
17 21
 var (
@@ -98,3 +102,343 @@ func TestRegistry_OrderByCreationDate(t *testing.T) {
98 102
 		}
99 103
 	}
100 104
 }
105
+
106
+// Note: This actually goes off to docker hub to find the Image.
107
+// It will fail if there is not internet connection
108
+func TestRemoteFactory_RawClient(t *testing.T) {
109
+	// No credentials required for public Image
110
+	fact := NewRemoteClientFactory(Credentials{}, log.NewNopLogger(), middleware.RateLimiterConfig{
111
+		RPS:   200,
112
+		Burst: 1,
113
+	})
114
+	img, err := flux.ParseImage("alpine:latest", time.Time{})
115
+	if err != nil {
116
+		t.Fatal(err)
117
+	}
118
+	testRepository = RepositoryFromImage(img)
119
+
120
+	// Refresh tags first
121
+	var tags []string
122
+	client, cancel, err := fact.ClientFor(testRepository.Host())
123
+	if err != nil {
124
+		t.Fatal(err)
125
+	}
126
+
127
+	tags, err = client.Tags(testRepository.NamespaceImage())
128
+	if err != nil {
129
+		t.Fatal(err)
130
+	}
131
+	cancel()
132
+	if len(tags) == 0 {
133
+		t.Fatal("Should have some tags")
134
+	}
135
+
136
+	client, cancel, err = fact.ClientFor(testRepository.Host())
137
+	if err != nil {
138
+		t.Fatal(err)
139
+	}
140
+	history, err := client.Manifest(testRepository.NamespaceImage(), tags[0])
141
+	if err != nil {
142
+		t.Fatal(err)
143
+	}
144
+	if len(history) == 0 {
145
+		t.Fatal("Should have some history")
146
+	}
147
+	cancel()
148
+}
149
+
150
+func TestRemoteFactory_InvalidHost(t *testing.T) {
151
+	fact := NewRemoteClientFactory(Credentials{}, log.NewNopLogger(), middleware.RateLimiterConfig{})
152
+	img, err := flux.ParseImage("invalid.host/library/alpine:latest", time.Time{})
153
+	if err != nil {
154
+		t.Fatal(err)
155
+	}
156
+	testRepository = RepositoryFromImage(img)
157
+	client, cancel, err := fact.ClientFor(testRepository.Host())
158
+	if err != nil {
159
+		return
160
+	}
161
+	r := newRemote(client, cancel)
162
+	_, err = r.Manifest(testRepository, img.ID.Tag)
163
+	if err == nil {
164
+		t.Fatal("Expected error due to invalid host but got none.")
165
+	}
166
+}
167
+
168
+var (
169
+	user string = "user"
170
+	pass string = "pass"
171
+	host string = "host"
172
+	tmpl string = `
173
+    {
174
+        "auths": {
175
+            %q: {"auth": %q}
176
+        }
177
+    }`
178
+	okCreds string = base64.StdEncoding.EncodeToString([]byte(user + ":" + pass))
179
+)
180
+
181
+func writeCreds(t *testing.T, creds string) (string, func()) {
182
+	file, err := ioutil.TempFile("", "testcreds")
183
+	file.Write([]byte(creds))
184
+	file.Close()
185
+	if err != nil {
186
+		t.Fatal(err)
187
+	}
188
+	return file.Name(), func() {
189
+		os.Remove(file.Name())
190
+	}
191
+}
192
+
193
+func TestRemoteFactory_CredentialsFromFile(t *testing.T) {
194
+	file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, host, okCreds))
195
+	defer cleanup()
196
+
197
+	creds, err := CredentialsFromFile(file)
198
+	if err != nil {
199
+		t.Fatal(err)
200
+	}
201
+	c := creds.credsFor(host)
202
+	if user != c.username {
203
+		t.Fatalf("Expected %q, got %q.", user, c.username)
204
+	}
205
+	if pass != c.password {
206
+		t.Fatalf("Expected %q, got %q.", pass, c.password)
207
+	}
208
+	if len(creds.Hosts()) != 1 || host != creds.Hosts()[0] {
209
+		t.Fatalf("Expected %q, got %q.", host, creds.Hosts()[0])
210
+	}
211
+}
212
+
213
+func TestRemoteFactory_CredentialsFromConfigDecodeError(t *testing.T) {
214
+	file, cleanup := writeCreds(t, `{
215
+    "auths": {
216
+        "host": {"auth": "credentials:notencoded"}
217
+    }
218
+}`)
219
+	defer cleanup()
220
+	_, err := CredentialsFromFile(file)
221
+	if err == nil {
222
+		t.Fatal("Expected error")
223
+	}
224
+}
225
+
226
+func TestRemoteFactory_CredentialsFromConfigHTTPSHosts(t *testing.T) {
227
+	httpsHost := fmt.Sprintf("https://%s/v1/", host)
228
+	file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, httpsHost, okCreds))
229
+	defer cleanup()
230
+
231
+	creds, err := CredentialsFromFile(file)
232
+	if err != nil {
233
+		t.Fatal(err)
234
+	}
235
+	c := creds.credsFor(host)
236
+	if user != c.username {
237
+		t.Fatalf("Expected %q, got %q.", user, c.username)
238
+	}
239
+	if pass != c.password {
240
+		t.Fatalf("Expected %q, got %q.", pass, c.password)
241
+	}
242
+	if len(creds.Hosts()) != 1 || host != creds.Hosts()[0] {
243
+		t.Fatalf("Expected %q, got %q.", httpsHost, creds.Hosts()[0])
244
+	}
245
+}
246
+
247
+func TestRemoteFactory_ParseHost(t *testing.T) {
248
+	for _, v := range []struct {
249
+		host        string
250
+		imagePrefix string
251
+		error       bool
252
+	}{
253
+		{
254
+			host:        "host",
255
+			imagePrefix: "host",
256
+		},
257
+		{
258
+			host:        "gcr.io",
259
+			imagePrefix: "gcr.io",
260
+		},
261
+		{
262
+			host:        "https://gcr.io",
263
+			imagePrefix: "gcr.io",
264
+		},
265
+		{
266
+			host:        "https://gcr.io/v1",
267
+			imagePrefix: "gcr.io",
268
+		},
269
+		{
270
+			host:        "https://gcr.io/v1/",
271
+			imagePrefix: "gcr.io",
272
+		},
273
+		{
274
+			host:        "gcr.io/v1",
275
+			imagePrefix: "gcr.io",
276
+		},
277
+		{
278
+			host:        "telnet://gcr.io/v1",
279
+			imagePrefix: "gcr.io",
280
+		},
281
+		{
282
+			host:        "",
283
+			imagePrefix: "gcr.io",
284
+			error:       true,
285
+		},
286
+		{
287
+			host:        "https://",
288
+			imagePrefix: "gcr.io",
289
+			error:       true,
290
+		},
291
+		{
292
+			host:        "^#invalid.io/v1/",
293
+			imagePrefix: "gcr.io",
294
+			error:       true,
295
+		},
296
+		{
297
+			host:        "/var/user",
298
+			imagePrefix: "gcr.io",
299
+			error:       true,
300
+		},
301
+	} {
302
+
303
+		file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, v.host, okCreds))
304
+		defer cleanup()
305
+		creds, err := CredentialsFromFile(file)
306
+		if (err != nil) != v.error {
307
+			t.Fatalf("For test %q, expected error = %v but got %v", v.host, v.error, err != nil)
308
+		}
309
+		if v.error {
310
+			continue
311
+		}
312
+		if u := creds.credsFor(v.imagePrefix).username; u != user {
313
+			t.Fatalf("For test %q, expected %q but got %v", v.host, user, u)
314
+		}
315
+	}
316
+}
317
+
318
+const testTagStr = "tag"
319
+const testImageStr = "index.docker.io/test/Image:" + testTagStr
320
+const constTime = "2017-01-13T16:22:58.009923189Z"
321
+
322
+var (
323
+	img, _         = flux.ParseImage(testImageStr, time.Time{})
324
+	testRepository = RepositoryFromImage(img)
325
+
326
+	man = schema1.SignedManifest{
327
+		Manifest: schema1.Manifest{
328
+			History: []schema1.History{
329
+				{
330
+					V1Compatibility: `{"created":"` + constTime + `"}`,
331
+				},
332
+			},
333
+		},
334
+	}
335
+)
336
+
337
+// Need to create a dummy manifest here
338
+func TestRemoteClient_ParseManifest(t *testing.T) {
339
+	manifestFunc := func(repo, ref string) ([]schema1.History, error) {
340
+		return man.Manifest.History, nil
341
+	}
342
+	c := newRemote(
343
+		NewMockDockerClient(manifestFunc, nil),
344
+		nil,
345
+	)
346
+	testRepository = RepositoryFromImage(img)
347
+	desc, err := c.Manifest(testRepository, img.ID.Tag)
348
+	if err != nil {
349
+		t.Fatal(err.Error())
350
+	}
351
+	if string(desc.ID.FullID()) != testImageStr {
352
+		t.Fatalf("Expecting %q but got %q", testImageStr, string(desc.ID.FullID()))
353
+	}
354
+	if desc.CreatedAt.Format(time.RFC3339Nano) != constTime {
355
+		t.Fatalf("Expecting %q but got %q", constTime, desc.CreatedAt.Format(time.RFC3339Nano))
356
+	}
357
+}
358
+
359
+// Just a simple pass through.
360
+func TestRemoteClient_GetTags(t *testing.T) {
361
+	c := remote{
362
+		client: NewMockDockerClient(nil, func(repository string) ([]string, error) {
363
+			return []string{
364
+				testTagStr,
365
+			}, nil
366
+		}),
367
+	}
368
+	tags, err := c.Tags(testRepository)
369
+	if err != nil {
370
+		t.Fatal(err.Error())
371
+	}
372
+	if tags[0] != testTagStr {
373
+		t.Fatalf("Expecting %q but got %q", testTagStr, tags[0])
374
+	}
375
+}
376
+
377
+func TestRemoteClient_IsCancelCalled(t *testing.T) {
378
+	var didCancel bool
379
+	r := remote{
380
+		cancel: func() { didCancel = true },
381
+	}
382
+	r.Cancel()
383
+	if !didCancel {
384
+		t.Fatal("Expected it to call the cancel func")
385
+	}
386
+}
387
+
388
+func TestRemoteClient_RemoteErrors(t *testing.T) {
389
+	manifestFunc := func(repo, ref string) ([]schema1.History, error) {
390
+		return man.Manifest.History, errors.New("dummy")
391
+	}
392
+	tagsFunc := func(repository string) ([]string, error) {
393
+		return []string{
394
+			testTagStr,
395
+		}, errors.New("dummy")
396
+	}
397
+	c := remote{
398
+		client: NewMockDockerClient(manifestFunc, tagsFunc),
399
+	}
400
+	_, err := c.Tags(testRepository)
401
+	if err == nil {
402
+		t.Fatal("Expected error")
403
+	}
404
+	_, err = c.Manifest(testRepository, img.ID.Tag)
405
+	if err == nil {
406
+		t.Fatal("Expected error")
407
+	}
408
+}
409
+
410
+func TestRemoteClient_TestNew(t *testing.T) {
411
+	r := &herokuWrapper{}
412
+	var flag bool
413
+	f := func() { flag = true }
414
+	c := newRemote(r, f)
415
+	if c.(*remote).client != r { // Test that client was set
416
+		t.Fatal("Client was not set")
417
+	}
418
+	c.(*remote).cancel()
419
+	if !flag { // Test that our cancel function, when called, works
420
+		t.Fatal("Expected it to call the cancel func")
421
+	}
422
+}
423
+
424
+func TestRepository_ParseImage(t *testing.T) {
425
+	for _, x := range []struct {
426
+		test     string
427
+		expected string
428
+	}{
429
+		{"alpine", "index.docker.io/library/alpine"},
430
+		{"library/alpine", "index.docker.io/library/alpine"},
431
+		{"alpine:mytag", "index.docker.io/library/alpine"},
432
+		{"quay.io/library/alpine", "quay.io/library/alpine"},
433
+		{"quay.io/library/alpine:latest", "quay.io/library/alpine"},
434
+		{"quay.io/library/alpine:mytag", "quay.io/library/alpine"},
435
+	} {
436
+		i, err := ParseRepository(x.test)
437
+		if err != nil {
438
+			t.Fatalf("Failed parsing %q, expected %q", x.test, x.expected)
439
+		}
440
+		if i.String() != x.expected {
441
+			t.Fatalf("%q does not match expected %q", i.String(), x.expected)
442
+		}
443
+	}
444
+}

+ 0
- 74
registry/remote.go View File

@@ -1,74 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"context"
5
-	"encoding/json"
6
-	"time"
7
-
8
-	"github.com/docker/distribution/manifest/schema1"
9
-
10
-	"fmt"
11
-	"github.com/weaveworks/flux"
12
-)
13
-
14
-type Remote interface {
15
-	Tags(repository Repository) ([]string, error)
16
-	Manifest(repository Repository, tag string) (flux.Image, error)
17
-	Cancel()
18
-}
19
-
20
-type remote struct {
21
-	client dockerRegistryInterface
22
-	cancel context.CancelFunc
23
-}
24
-
25
-func newRemote(client dockerRegistryInterface, cancel context.CancelFunc) Remote {
26
-	return &remote{
27
-		client: client,
28
-		cancel: cancel,
29
-	}
30
-}
31
-
32
-func (rc *remote) Tags(repository Repository) (_ []string, err error) {
33
-	return rc.client.Tags(repository.String())
34
-}
35
-
36
-func (rc *remote) Manifest(repository Repository, tag string) (img flux.Image, err error) {
37
-	img, err = flux.ParseImage(fmt.Sprintf("%s:%s", repository.String(), tag), time.Time{})
38
-	if err != nil {
39
-		return
40
-	}
41
-	history, err := rc.client.Manifest(repository.String(), tag)
42
-	if err != nil {
43
-		return
44
-	}
45
-
46
-	// the manifest includes some v1-backwards-compatibility data,
47
-	// oddly called "History", which are layer metadata as JSON
48
-	// strings; these appear most-recent (i.e., topmost layer) first,
49
-	// so happily we can just decode the first entry to get a created
50
-	// time.
51
-	type v1image struct {
52
-		Created time.Time `json:"created"`
53
-	}
54
-	var topmost v1image
55
-	if len(history) > 0 {
56
-		if err = json.Unmarshal([]byte(history[0].V1Compatibility), &topmost); err == nil {
57
-			if !topmost.Created.IsZero() {
58
-				img.CreatedAt = topmost.Created
59
-			}
60
-		}
61
-	}
62
-
63
-	return
64
-}
65
-
66
-func (rc *remote) Cancel() {
67
-	rc.cancel()
68
-}
69
-
70
-// This is an interface that represents the heroku docker registry library
71
-type dockerRegistryInterface interface {
72
-	Tags(repository string) ([]string, error)
73
-	Manifest(repository, reference string) ([]schema1.History, error)
74
-}

+ 0
- 132
registry/remote_factory.go View File

@@ -1,132 +0,0 @@
1
-// We can't inject a remote client directly because each repository request might be to a different
2
-// registry provider. E.g. both docker hub and quay containers. So a new remote client must be
3
-// created for each new Image. This factory provides that and can be mocked out.
4
-package registry
5
-
6
-import (
7
-	"context"
8
-	"errors"
9
-	"github.com/go-kit/kit/log"
10
-	dockerregistry "github.com/heroku/docker-registry-client/registry"
11
-	"github.com/jonboulle/clockwork"
12
-	"golang.org/x/net/publicsuffix"
13
-	"net/http"
14
-	"net/http/cookiejar"
15
-	"time"
16
-)
17
-
18
-var (
19
-	ErrNoMemcache = errors.New("no memecache")
20
-)
21
-
22
-type creds struct {
23
-	username, password string
24
-}
25
-
26
-// Credentials to a (Docker) registry.
27
-type Credentials struct {
28
-	m map[string]creds
29
-}
30
-
31
-type ClientFactory interface {
32
-	ClientFor(host string) (client dockerRegistryInterface, cancel context.CancelFunc, err error)
33
-}
34
-
35
-func NewCacheClientFactory(c Credentials, l log.Logger, mc MemcacheClient, ce time.Duration) ClientFactory {
36
-	for host, creds := range c.m {
37
-		l.Log("host", host, "username", creds.username)
38
-	}
39
-	return &cacheClientFactory{
40
-		creds:          c,
41
-		Logger:         l,
42
-		MemcacheClient: mc,
43
-		CacheExpiry:    ce,
44
-	}
45
-}
46
-
47
-type cacheClientFactory struct {
48
-	creds          Credentials
49
-	Logger         log.Logger
50
-	MemcacheClient MemcacheClient
51
-	CacheExpiry    time.Duration
52
-}
53
-
54
-func (f *cacheClientFactory) ClientFor(host string) (dockerRegistryInterface, context.CancelFunc, error) {
55
-	if f.MemcacheClient == nil {
56
-		return nil, nil, ErrNoMemcache
57
-	}
58
-	client := NewCache(f.creds, f.MemcacheClient, f.CacheExpiry, f.Logger)
59
-	return client, func() {}, nil
60
-}
61
-
62
-func NewRemoteClientFactory(c Credentials, l log.Logger, rlc RateLimiterConfig) ClientFactory {
63
-	for host, creds := range c.m {
64
-		l.Log("host", host, "username", creds.username)
65
-	}
66
-	return &remoteClientFactory{
67
-		creds:  c,
68
-		Logger: l,
69
-		rlConf: rlc,
70
-	}
71
-}
72
-
73
-type remoteClientFactory struct {
74
-	creds  Credentials
75
-	Logger log.Logger
76
-	rlConf RateLimiterConfig
77
-}
78
-
79
-func (f *remoteClientFactory) ClientFor(host string) (dockerRegistryInterface, context.CancelFunc, error) {
80
-	httphost := "https://" + host
81
-
82
-	// quay.io wants us to use cookies for authorisation, so we have
83
-	// to construct one (the default client has none). This means a
84
-	// bit more constructing things to be able to make a registry
85
-	// client literal, rather than calling .New()
86
-	jar, err := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
87
-	if err != nil {
88
-		return nil, nil, err
89
-	}
90
-	auth := f.creds.credsFor(host)
91
-
92
-	// A context we'll use to cancel requests on error
93
-	ctx, cancel := context.WithCancel(context.Background())
94
-	// Add a timeout to the request
95
-	ctx, cancel = context.WithTimeout(ctx, requestTimeout)
96
-
97
-	// Use the wrapper to fix headers for quay.io, and remember bearer tokens
98
-	var transport http.RoundTripper
99
-	{
100
-		transport = &wwwAuthenticateFixer{transport: http.DefaultTransport}
101
-		// Now the auth-handling wrappers that come with the library
102
-		transport = dockerregistry.WrapTransport(transport, httphost, auth.username, auth.password)
103
-		// Add the backoff mechanism so we don't DOS registries
104
-		transport = BackoffRoundTripper(transport, initialBackoff, maxBackoff, clockwork.NewRealClock())
105
-		// Add timeout context
106
-		transport = &ContextRoundTripper{Transport: transport, Ctx: ctx}
107
-		// Rate limit
108
-		transport = RateLimitedRoundTripper(transport, f.rlConf, host)
109
-	}
110
-
111
-	client := herokuWrapper{
112
-		&dockerregistry.Registry{
113
-			URL: httphost,
114
-			Client: &http.Client{
115
-				Transport: transport,
116
-				Jar:       jar,
117
-				Timeout:   requestTimeout,
118
-			},
119
-			Logf: dockerregistry.Quiet,
120
-		},
121
-	}
122
-	return client, cancel, nil
123
-}
124
-
125
-type ContextRoundTripper struct {
126
-	Transport http.RoundTripper
127
-	Ctx       context.Context
128
-}
129
-
130
-func (rt *ContextRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
131
-	return rt.Transport.RoundTrip(r.WithContext(rt.Ctx))
132
-}

+ 0
- 226
registry/remote_factory_test.go View File

@@ -1,226 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"encoding/base64"
5
-	"fmt"
6
-	"io/ioutil"
7
-	"os"
8
-	"testing"
9
-	"time"
10
-
11
-	"github.com/go-kit/kit/log"
12
-
13
-	"github.com/weaveworks/flux"
14
-)
15
-
16
-// Note: This actually goes off to docker hub to find the Image.
17
-// It will fail if there is not internet connection
18
-func TestRemoteFactory_RawClient(t *testing.T) {
19
-	// No credentials required for public Image
20
-	fact := NewRemoteClientFactory(Credentials{}, log.NewNopLogger(), RateLimiterConfig{
21
-		RPS:   200,
22
-		Burst: 1,
23
-	})
24
-	img, err := flux.ParseImage("alpine:latest", time.Time{})
25
-	if err != nil {
26
-		t.Fatal(err)
27
-	}
28
-	testRepository = RepositoryFromImage(img)
29
-
30
-	// Refresh tags first
31
-	var tags []string
32
-	client, cancel, err := fact.ClientFor(testRepository.Host())
33
-	if err != nil {
34
-		t.Fatal(err)
35
-	}
36
-
37
-	tags, err = client.Tags(testRepository.NamespaceImage())
38
-	if err != nil {
39
-		t.Fatal(err)
40
-	}
41
-	cancel()
42
-	if len(tags) == 0 {
43
-		t.Fatal("Should have some tags")
44
-	}
45
-
46
-	client, cancel, err = fact.ClientFor(testRepository.Host())
47
-	if err != nil {
48
-		t.Fatal(err)
49
-	}
50
-	history, err := client.Manifest(testRepository.NamespaceImage(), tags[0])
51
-	if err != nil {
52
-		t.Fatal(err)
53
-	}
54
-	if len(history) == 0 {
55
-		t.Fatal("Should have some history")
56
-	}
57
-	cancel()
58
-}
59
-
60
-func TestRemoteFactory_InvalidHost(t *testing.T) {
61
-	fact := NewRemoteClientFactory(Credentials{}, log.NewNopLogger(), RateLimiterConfig{})
62
-	img, err := flux.ParseImage("invalid.host/library/alpine:latest", time.Time{})
63
-	if err != nil {
64
-		t.Fatal(err)
65
-	}
66
-	testRepository = RepositoryFromImage(img)
67
-	client, cancel, err := fact.ClientFor(testRepository.Host())
68
-	if err != nil {
69
-		return
70
-	}
71
-	r := newRemote(client, cancel)
72
-	_, err = r.Manifest(testRepository, img.ID.Tag)
73
-	if err == nil {
74
-		t.Fatal("Expected error due to invalid host but got none.")
75
-	}
76
-}
77
-
78
-var (
79
-	user string = "user"
80
-	pass string = "pass"
81
-	host string = "host"
82
-	tmpl string = `
83
-    {
84
-        "auths": {
85
-            %q: {"auth": %q}
86
-        }
87
-    }`
88
-	okCreds string = base64.StdEncoding.EncodeToString([]byte(user + ":" + pass))
89
-)
90
-
91
-func writeCreds(t *testing.T, creds string) (string, func()) {
92
-	file, err := ioutil.TempFile("", "testcreds")
93
-	file.Write([]byte(creds))
94
-	file.Close()
95
-	if err != nil {
96
-		t.Fatal(err)
97
-	}
98
-	return file.Name(), func() {
99
-		os.Remove(file.Name())
100
-	}
101
-}
102
-
103
-func TestRemoteFactory_CredentialsFromFile(t *testing.T) {
104
-	file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, host, okCreds))
105
-	defer cleanup()
106
-
107
-	creds, err := CredentialsFromFile(file)
108
-	if err != nil {
109
-		t.Fatal(err)
110
-	}
111
-	c := creds.credsFor(host)
112
-	if user != c.username {
113
-		t.Fatalf("Expected %q, got %q.", user, c.username)
114
-	}
115
-	if pass != c.password {
116
-		t.Fatalf("Expected %q, got %q.", pass, c.password)
117
-	}
118
-	if len(creds.Hosts()) != 1 || host != creds.Hosts()[0] {
119
-		t.Fatalf("Expected %q, got %q.", host, creds.Hosts()[0])
120
-	}
121
-}
122
-
123
-func TestRemoteFactory_CredentialsFromConfigDecodeError(t *testing.T) {
124
-	file, cleanup := writeCreds(t, `{
125
-    "auths": {
126
-        "host": {"auth": "credentials:notencoded"}
127
-    }
128
-}`)
129
-	defer cleanup()
130
-	_, err := CredentialsFromFile(file)
131
-	if err == nil {
132
-		t.Fatal("Expected error")
133
-	}
134
-}
135
-
136
-func TestRemoteFactory_CredentialsFromConfigHTTPSHosts(t *testing.T) {
137
-	httpsHost := fmt.Sprintf("https://%s/v1/", host)
138
-	file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, httpsHost, okCreds))
139
-	defer cleanup()
140
-
141
-	creds, err := CredentialsFromFile(file)
142
-	if err != nil {
143
-		t.Fatal(err)
144
-	}
145
-	c := creds.credsFor(host)
146
-	if user != c.username {
147
-		t.Fatalf("Expected %q, got %q.", user, c.username)
148
-	}
149
-	if pass != c.password {
150
-		t.Fatalf("Expected %q, got %q.", pass, c.password)
151
-	}
152
-	if len(creds.Hosts()) != 1 || host != creds.Hosts()[0] {
153
-		t.Fatalf("Expected %q, got %q.", httpsHost, creds.Hosts()[0])
154
-	}
155
-}
156
-
157
-func TestRemoteFactory_ParseHost(t *testing.T) {
158
-	for _, v := range []struct {
159
-		host        string
160
-		imagePrefix string
161
-		error       bool
162
-	}{
163
-		{
164
-			host:        "host",
165
-			imagePrefix: "host",
166
-		},
167
-		{
168
-			host:        "gcr.io",
169
-			imagePrefix: "gcr.io",
170
-		},
171
-		{
172
-			host:        "https://gcr.io",
173
-			imagePrefix: "gcr.io",
174
-		},
175
-		{
176
-			host:        "https://gcr.io/v1",
177
-			imagePrefix: "gcr.io",
178
-		},
179
-		{
180
-			host:        "https://gcr.io/v1/",
181
-			imagePrefix: "gcr.io",
182
-		},
183
-		{
184
-			host:        "gcr.io/v1",
185
-			imagePrefix: "gcr.io",
186
-		},
187
-		{
188
-			host:        "telnet://gcr.io/v1",
189
-			imagePrefix: "gcr.io",
190
-		},
191
-		{
192
-			host:        "",
193
-			imagePrefix: "gcr.io",
194
-			error:       true,
195
-		},
196
-		{
197
-			host:        "https://",
198
-			imagePrefix: "gcr.io",
199
-			error:       true,
200
-		},
201
-		{
202
-			host:        "^#invalid.io/v1/",
203
-			imagePrefix: "gcr.io",
204
-			error:       true,
205
-		},
206
-		{
207
-			host:        "/var/user",
208
-			imagePrefix: "gcr.io",
209
-			error:       true,
210
-		},
211
-	} {
212
-
213
-		file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, v.host, okCreds))
214
-		defer cleanup()
215
-		creds, err := CredentialsFromFile(file)
216
-		if (err != nil) != v.error {
217
-			t.Fatalf("For test %q, expected error = %v but got %v", v.host, v.error, err != nil)
218
-		}
219
-		if v.error {
220
-			continue
221
-		}
222
-		if u := creds.credsFor(v.imagePrefix).username; u != user {
223
-			t.Fatalf("For test %q, expected %q but got %v", v.host, user, u)
224
-		}
225
-	}
226
-}

+ 0
- 117
registry/remote_test.go View File

@@ -1,117 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"testing"
5
-	"time"
6
-
7
-	"github.com/docker/distribution/manifest/schema1"
8
-	"github.com/pkg/errors"
9
-
10
-	"github.com/weaveworks/flux"
11
-)
12
-
13
-const testTagStr = "tag"
14
-const testImageStr = "index.docker.io/test/Image:" + testTagStr
15
-const constTime = "2017-01-13T16:22:58.009923189Z"
16
-
17
-var (
18
-	img, _         = flux.ParseImage(testImageStr, time.Time{})
19
-	testRepository = RepositoryFromImage(img)
20
-
21
-	man = schema1.SignedManifest{
22
-		Manifest: schema1.Manifest{
23
-			History: []schema1.History{
24
-				{
25
-					V1Compatibility: `{"created":"` + constTime + `"}`,
26
-				},
27
-			},
28
-		},
29
-	}
30
-)
31
-
32
-// Need to create a dummy manifest here
33
-func TestRemoteClient_ParseManifest(t *testing.T) {
34
-	manifestFunc := func(repo, ref string) ([]schema1.History, error) {
35
-		return man.Manifest.History, nil
36
-	}
37
-	c := newRemote(
38
-		NewMockDockerClient(manifestFunc, nil),
39
-		nil,
40
-	)
41
-	testRepository = RepositoryFromImage(img)
42
-	desc, err := c.Manifest(testRepository, img.ID.Tag)
43
-	if err != nil {
44
-		t.Fatal(err.Error())
45
-	}
46
-	if string(desc.ID.FullID()) != testImageStr {
47
-		t.Fatalf("Expecting %q but got %q", testImageStr, string(desc.ID.FullID()))
48
-	}
49
-	if desc.CreatedAt.Format(time.RFC3339Nano) != constTime {
50
-		t.Fatalf("Expecting %q but got %q", constTime, desc.CreatedAt.Format(time.RFC3339Nano))
51
-	}
52
-}
53
-
54
-// Just a simple pass through.
55
-func TestRemoteClient_GetTags(t *testing.T) {
56
-	c := remote{
57
-		client: NewMockDockerClient(nil, func(repository string) ([]string, error) {
58
-			return []string{
59
-				testTagStr,
60
-			}, nil
61
-		}),
62
-	}
63
-	tags, err := c.Tags(testRepository)
64
-	if err != nil {
65
-		t.Fatal(err.Error())
66
-	}
67
-	if tags[0] != testTagStr {
68
-		t.Fatalf("Expecting %q but got %q", testTagStr, tags[0])
69
-	}
70
-}
71
-
72
-func TestRemoteClient_IsCancelCalled(t *testing.T) {
73
-	var didCancel bool
74
-	r := remote{
75
-		cancel: func() { didCancel = true },
76
-	}
77
-	r.Cancel()
78
-	if !didCancel {
79
-		t.Fatal("Expected it to call the cancel func")
80
-	}
81
-}
82
-
83
-func TestRemoteClient_RemoteErrors(t *testing.T) {
84
-	manifestFunc := func(repo, ref string) ([]schema1.History, error) {
85
-		return man.Manifest.History, errors.New("dummy")
86
-	}
87
-	tagsFunc := func(repository string) ([]string, error) {
88
-		return []string{
89
-			testTagStr,
90
-		}, errors.New("dummy")
91
-	}
92
-	c := remote{
93
-		client: NewMockDockerClient(manifestFunc, tagsFunc),
94
-	}
95
-	_, err := c.Tags(testRepository)
96
-	if err == nil {
97
-		t.Fatal("Expected error")
98
-	}
99
-	_, err = c.Manifest(testRepository, img.ID.Tag)
100
-	if err == nil {
101
-		t.Fatal("Expected error")
102
-	}
103
-}
104
-
105
-func TestRemoteClient_TestNew(t *testing.T) {
106
-	r := &herokuWrapper{}
107
-	var flag bool
108
-	f := func() { flag = true }
109
-	c := newRemote(r, f)
110
-	if c.(*remote).client != r { // Test that client was set
111
-		t.Fatal("Client was not set")
112
-	}
113
-	c.(*remote).cancel()
114
-	if !flag { // Test that our cancel function, when called, works
115
-		t.Fatal("Expected it to call the cancel func")
116
-	}
117
-}

+ 0
- 45
registry/repository.go View File

@@ -1,45 +0,0 @@
1
-package registry
2
-
3
-import (
4
-	"time"
5
-
6
-	"github.com/weaveworks/flux"
7
-)
8
-
9
-type Repository struct {
10
-	img flux.Image // Internally we use an image to store data
11
-}
12
-
13
-func RepositoryFromImage(img flux.Image) Repository {
14
-	return Repository{
15
-		img: img,
16
-	}
17
-}
18
-
19
-func ParseRepository(imgStr string) (Repository, error) {
20
-	i, err := flux.ParseImage(imgStr, time.Time{})
21
-	if err != nil {
22
-		return Repository{}, err
23
-	}
24
-	return Repository{
25
-		img: i,
26
-	}, nil
27
-}
28
-
29
-func (r Repository) NamespaceImage() string {
30
-	return r.img.ID.NamespaceImage()
31
-}
32
-
33
-func (r Repository) Host() string {
34
-	return r.img.ID.Host
35
-}
36
-
37
-func (r Repository) String() string {
38
-	return r.img.ID.HostNamespaceImage()
39
-}
40
-
41
-func (r Repository) ToImage(tag string) flux.Image {
42
-	newImage := r.img
43
-	newImage.ID.Tag = tag
44
-	return newImage
45
-}

+ 0
- 25
registry/repository_test.go View File

@@ -1,25 +0,0 @@
1
-package registry
2
-
3
-import "testing"
4
-
5
-func TestRepository_ParseImage(t *testing.T) {
6
-	for _, x := range []struct {
7
-		test     string
8
-		expected string
9
-	}{
10
-		{"alpine", "index.docker.io/library/alpine"},
11
-		{"library/alpine", "index.docker.io/library/alpine"},
12
-		{"alpine:mytag", "index.docker.io/library/alpine"},
13
-		{"quay.io/library/alpine", "quay.io/library/alpine"},
14
-		{"quay.io/library/alpine:latest", "quay.io/library/alpine"},
15
-		{"quay.io/library/alpine:mytag", "quay.io/library/alpine"},
16
-	} {
17
-		i, err := ParseRepository(x.test)
18
-		if err != nil {
19
-			t.Fatalf("Failed parsing %q, expected %q", x.test, x.expected)
20
-		}
21
-		if i.String() != x.expected {
22
-			t.Fatalf("%q does not match expected %q", i.String(), x.expected)
23
-		}
24
-	}
25
-}

registry/warmer.go → registry/warming.go View File

@@ -6,9 +6,11 @@ import (
6 6
 	"sync"
7 7
 	"time"
8 8
 
9
-	"github.com/bradfitz/gomemcache/memcache"
9
+	officialMemcache "github.com/bradfitz/gomemcache/memcache"
10 10
 	"github.com/go-kit/kit/log"
11 11
 	"github.com/pkg/errors"
12
+	"github.com/weaveworks/flux/registry/memcache"
13
+	"math/rand"
12 14
 	"strings"
13 15
 )
14 16
 
@@ -17,7 +19,7 @@ type Warmer struct {
17 19
 	ClientFactory ClientFactory
18 20
 	Creds         Credentials
19 21
 	Expiry        time.Duration
20
-	Client        MemcacheClient
22
+	Client        memcache.MemcacheClient
21 23
 }
22 24
 
23 25
 // Continuously wait for a new repository to warm
@@ -69,7 +71,7 @@ func (w *Warmer) warm(repository Repository) {
69 71
 	// might be duplicates from other registries
70 72
 	key := tagKey(username, repository.String())
71 73
 
72
-	if err := w.Client.Set(&memcache.Item{
74
+	if err := w.Client.Set(&officialMemcache.Item{
73 75
 		Key:        key,
74 76
 		Value:      val,
75 77
 		Expiration: int32(w.Expiry.Seconds()),
@@ -91,7 +93,7 @@ func (w *Warmer) warm(repository Repository) {
91 93
 
92 94
 		// Full path to image again.
93 95
 		key := manifestKey(username, repository.String(), tag)
94
-		if err := w.Client.Set(&memcache.Item{
96
+		if err := w.Client.Set(&officialMemcache.Item{
95 97
 			Key:        key,
96 98
 			Value:      val,
97 99
 			Expiration: int32(w.Expiry.Seconds()),
@@ -101,3 +103,57 @@ func (w *Warmer) warm(repository Repository) {
101 103
 		}
102 104
 	}
103 105
 }
106
+
107
+// Queue provides an updating repository queue for the warmer.
108
+// If no items are added to the queue this will randomly add a new
109
+// registry to warm
110
+type Queue struct {
111
+	RunningContainers    func() []Repository
112
+	Logger               log.Logger
113
+	RegistryPollInterval time.Duration
114
+	warmQueue            chan Repository
115
+	queueLock            sync.Mutex
116
+}
117
+
118
+func NewQueue(runningContainersFunc func() []Repository, l log.Logger, emptyQueueTick time.Duration) Queue {
119
+	return Queue{
120
+		RunningContainers:    runningContainersFunc,
121
+		Logger:               l,
122
+		RegistryPollInterval: emptyQueueTick,
123
+		warmQueue:            make(chan Repository, 100), // Don't close this. It will be GC'ed when this instance is destroyed.
124
+	}
125
+}
126
+
127
+// Queue loop to maintain the queue and periodically add a random
128
+// repository that is running in the cluster.
129
+func (w *Queue) Loop(stop chan struct{}, wg *sync.WaitGroup) {
130
+	defer wg.Done()
131
+
132
+	if w.RunningContainers == nil || w.Logger == nil || w.RegistryPollInterval == 0 {
133
+		panic("registry.Queue fields are nil")
134
+	}
135
+
136
+	pollImages := time.Tick(w.RegistryPollInterval)
137
+
138
+	for {
139
+		select {
140
+		case <-stop:
141
+			w.Logger.Log("stopping", "true")
142
+			return
143
+		case <-pollImages:
144
+			c := w.RunningContainers()
145
+			if len(c) > 0 { // Only add random registry if there are running containers
146
+				i := rand.Intn(len(c)) // Pick random registry
147
+				w.queueLock.Lock()
148
+				w.warmQueue <- c[i] // Add registry to queue
149
+				w.queueLock.Unlock()
150
+			}
151
+		}
152
+	}
153
+}
154
+
155
+func (w *Queue) Queue() chan Repository {
156
+	w.queueLock.Lock()
157
+	defer w.queueLock.Unlock()
158
+	return w.warmQueue
159
+}

registry/warmer_test.go → registry/warming_test.go View File

@@ -84,3 +84,53 @@ func TestWarmer_CacheNewRepo(t *testing.T) {
84 84
 		t.Fatalf("Expected  history item: %v, got %v", expectedManifest, manifests[0])
85 85
 	}
86 86
 }
87
+
88
+func TestQueue_Usage(t *testing.T) {
89
+
90
+	queue := NewQueue(
91
+		func() []Repository {
92
+			r, _ := ParseRepository("test/image")
93
+			return []Repository{r}
94
+		},
95
+		log.NewLogfmtLogger(os.Stderr),
96
+		1*time.Millisecond,
97
+	)
98
+
99
+	shutdown := make(chan struct{})
100
+	shutdownWg := &sync.WaitGroup{}
101
+	shutdownWg.Add(1)
102
+	go queue.Loop(shutdown, shutdownWg)
103
+	defer func() {
104
+		shutdown <- struct{}{}
105
+		shutdownWg.Wait()
106
+	}()
107
+
108
+	time.Sleep(10 * time.Millisecond)
109
+	if len(queue.Queue()) == 0 {
110
+		t.Fatal("Should have randomly added containers to queue")
111
+	}
112
+}
113
+
114
+func TestQueue_NoContainers(t *testing.T) {
115
+	queue := NewQueue(
116
+		func() []Repository {
117
+			return []Repository{}
118
+		},
119
+		log.NewLogfmtLogger(os.Stderr),
120
+		1*time.Millisecond,
121
+	)
122
+
123
+	shutdown := make(chan struct{})
124
+	shutdownWg := &sync.WaitGroup{}
125
+	shutdownWg.Add(1)
126
+	go queue.Loop(shutdown, shutdownWg)
127
+	defer func() {
128
+		shutdown <- struct{}{}
129
+		shutdownWg.Wait()
130
+	}()
131
+
132
+	time.Sleep(10 * time.Millisecond)
133
+	if len(queue.Queue()) != 0 {
134
+		t.Fatal("There were no containers, so there should be no repositories in the queue")
135
+	}
136
+}

Loading…
Cancel
Save