Browse Source

Huge refactor.

Removed concept of "Remote". Replaced with Client.
Added adapter for heroku registry.
Rationalised registry out into client and clientfactory.
Removed a lot of unnecessary tests.
Fixed introduced bug where we were storing empty data.
Added optimisation to only request new manifests if we need it.
Phil Winder 2 years ago
parent
commit
1b83288b27
11 changed files with 480 additions and 527 deletions
  1. 18
    0
      image.go
  2. 30
    0
      image_test.go
  3. 14
    14
      registry/cache_test.go
  4. 172
    0
      registry/client.go
  5. 124
    0
      registry/client_factory.go
  6. 16
    34
      registry/mock.go
  7. 8
    8
      registry/monitoring.go
  8. 47
    309
      registry/registry.go
  9. 21
    141
      registry/registry_test.go
  10. 21
    8
      registry/warming.go
  11. 9
    13
      registry/warming_test.go

+ 18
- 0
image.go View File

@@ -167,3 +167,21 @@ func ParseImage(s string, createdAt time.Time) (Image, error) {
167 167
 		CreatedAt: createdAt,
168 168
 	}, nil
169 169
 }
170
+
171
+// Sort image by creation date
172
+type ByCreatedDesc []Image
173
+
174
+func (is ByCreatedDesc) Len() int      { return len(is) }
175
+func (is ByCreatedDesc) Swap(i, j int) { is[i], is[j] = is[j], is[i] }
176
+func (is ByCreatedDesc) Less(i, j int) bool {
177
+	switch {
178
+	case is[i].CreatedAt.IsZero():
179
+		return true
180
+	case is[j].CreatedAt.IsZero():
181
+		return false
182
+	case is[i].CreatedAt.Equal(is[j].CreatedAt):
183
+		return is[i].ID.String() < is[j].ID.String()
184
+	default:
185
+		return is[i].CreatedAt.After(is[j].CreatedAt)
186
+	}
187
+}

+ 30
- 0
image_test.go View File

@@ -3,7 +3,16 @@ package flux
3 3
 import (
4 4
 	"encoding/json"
5 5
 	"fmt"
6
+	"sort"
7
+	"strconv"
6 8
 	"testing"
9
+	"time"
10
+)
11
+
12
+const constTime = "2017-01-13T16:22:58.009923189Z"
13
+
14
+var (
15
+	testTime, _ = time.Parse(time.RFC3339Nano, constTime)
7 16
 )
8 17
 
9 18
 func TestImageID_ParseImageID(t *testing.T) {
@@ -97,3 +106,24 @@ func TestImageID_Serialization(t *testing.T) {
97 106
 		}
98 107
 	}
99 108
 }
109
+
110
+func TestImage_OrderByCreationDate(t *testing.T) {
111
+	fmt.Printf("testTime: %s\n", testTime)
112
+	time0 := testTime.Add(time.Second)
113
+	time2 := testTime.Add(-time.Second)
114
+	imA, _ := ParseImage("my/Image:3", testTime)
115
+	imB, _ := ParseImage("my/Image:1", time0)
116
+	imC, _ := ParseImage("my/Image:4", time2)
117
+	imD, _ := ParseImage("my/Image:0", time.Time{}) // test nil
118
+	imE, _ := ParseImage("my/Image:2", testTime)    // test equal
119
+	imgs := []Image{imA, imB, imC, imD, imE}
120
+	sort.Sort(ByCreatedDesc(imgs))
121
+	for i, im := range imgs {
122
+		if strconv.Itoa(i) != im.ID.Tag {
123
+			for j, jim := range imgs {
124
+				t.Logf("%v: %v %s", j, jim.ID.String(), jim.CreatedAt)
125
+			}
126
+			t.Fatalf("Not sorted in expected order: %#v", imgs)
127
+		}
128
+	}
129
+}

+ 14
- 14
registry/cache_test.go View File

@@ -11,7 +11,6 @@ import (
11 11
 
12 12
 	"encoding/json"
13 13
 	"github.com/bradfitz/gomemcache/memcache"
14
-	"github.com/docker/distribution/manifest/schema1"
15 14
 	"github.com/go-kit/kit/log"
16 15
 	registryMemcache "github.com/weaveworks/flux/registry/memcache"
17 16
 )
@@ -47,8 +46,10 @@ func TestCache_Manifests(t *testing.T) {
47 46
 		log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)),
48 47
 	)
49 48
 
50
-	val, _ := json.Marshal([]schema1.History{{`{"test":"json"}`}})
51
-	key := manifestKey(creds.credsFor("").username, "index.docker.io/weaveworks/foorepo", "tag1")
49
+	r, _ := ParseRepository("index.docker.io/weaveworks/foorepo")
50
+	img := r.ToImage("tag1")
51
+	val, _ := json.Marshal(img)
52
+	key := manifestKey(creds.credsFor(r.Host()).username, r.String(), img.ID.Tag)
52 53
 	if err := mc.Set(&memcache.Item{
53 54
 		Key:        key,
54 55
 		Value:      val,
@@ -58,20 +59,17 @@ func TestCache_Manifests(t *testing.T) {
58 59
 	}
59 60
 
60 61
 	// It should fetch stuff
61
-	response, err := c.Manifest("index.docker.io/weaveworks/foorepo", "tag1")
62
+	response, err := c.Manifest(r, img.ID.Tag)
62 63
 	if err != nil {
63 64
 		t.Fatal(err)
64 65
 	}
65
-	if len(response) != 1 {
66
-		t.Fatalf("Expected 1 history item, got %v", response)
67
-	}
68
-	expected := schema1.History{`{"test":"json"}`}
69
-	if response[0] != expected {
70
-		t.Fatalf("Expected  history item: %v, got %v", expected, response[0])
66
+	if response.ID.String() == "" {
67
+		t.Fatal("Should have returned image")
71 68
 	}
72 69
 
70
+	r2, _ := ParseRepository("index.docker.io/weaveworks/another")
73 71
 	// It should miss if not in cache
74
-	_, err = c.Manifest("index.docker.io/weaveworks/another", "tag1")
72
+	_, err = c.Manifest(r2, "tag1")
75 73
 	if err != memcache.ErrCacheMiss {
76 74
 		t.Fatal("Expected cache miss")
77 75
 	}
@@ -88,8 +86,9 @@ func TestCache_Tags(t *testing.T) {
88 86
 		log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)),
89 87
 	)
90 88
 
89
+	r, _ := ParseRepository("index.docker.io/weaveworks/foorepo")
91 90
 	val, _ := json.Marshal([]string{"tag1", "tag2"})
92
-	key := tagKey(creds.credsFor("").username, "index.docker.io/weaveworks/foorepo")
91
+	key := tagKey(creds.credsFor(r.Host()).username, r.String())
93 92
 	if err := mc.Set(&memcache.Item{
94 93
 		Key:        key,
95 94
 		Value:      val,
@@ -99,7 +98,7 @@ func TestCache_Tags(t *testing.T) {
99 98
 	}
100 99
 
101 100
 	// It should fetch stuff
102
-	response, err := c.Tags("index.docker.io/weaveworks/foorepo")
101
+	response, err := c.Tags(r)
103 102
 	if err != nil {
104 103
 		t.Fatal(err)
105 104
 	}
@@ -111,8 +110,9 @@ func TestCache_Tags(t *testing.T) {
111 110
 		t.Fatalf("Expected  history item: %v, got %v", expected, response[0])
112 111
 	}
113 112
 
113
+	r2, _ := ParseRepository("index.docker.io/weaveworks/anotherrepo")
114 114
 	// It should miss if not in cache
115
-	_, err = c.Tags("index.docker.io/weaveworks/anotherrepo")
115
+	_, err = c.Tags(r2)
116 116
 	if err != memcache.ErrCacheMiss {
117 117
 		t.Fatal("Expected cache miss")
118 118
 	}

+ 172
- 0
registry/client.go View File

@@ -0,0 +1,172 @@
1
+package registry
2
+
3
+import (
4
+	"context"
5
+	"encoding/json"
6
+	"fmt"
7
+	officialMemcache "github.com/bradfitz/gomemcache/memcache"
8
+	"github.com/go-kit/kit/log"
9
+	wraperrors "github.com/pkg/errors"
10
+	"github.com/weaveworks/flux"
11
+	"github.com/weaveworks/flux/registry/memcache"
12
+	"strings"
13
+	"time"
14
+)
15
+
16
+// A client represents an entity that returns manifest and tags information.
17
+// It might be a chache, it might be a real registry.
18
+type Client interface {
19
+	Tags(repository Repository) ([]string, error)
20
+	Manifest(repository Repository, tag string) (flux.Image, error)
21
+	Cancel()
22
+}
23
+
24
+// ---
25
+
26
+// An implementation of Client that represents a Remote registry.
27
+// E.g. docker hub.
28
+type Remote struct {
29
+	Registry   HerokuRegistryLibrary
30
+	CancelFunc context.CancelFunc
31
+}
32
+
33
+// Return the tags for this repository.
34
+func (a *Remote) Tags(repository Repository) ([]string, error) {
35
+	return a.Registry.Tags(repository.NamespaceImage())
36
+}
37
+
38
+// We need to do some adapting here to convert from the return values
39
+// from dockerregistry to our domain types.
40
+func (a *Remote) Manifest(repository Repository, tag string) (flux.Image, error) {
41
+	img, err := flux.ParseImage(fmt.Sprintf("%s:%s", repository.String(), tag), time.Time{})
42
+	if err != nil {
43
+		return flux.Image{}, err
44
+	}
45
+
46
+	history, err := a.Registry.Manifest(repository.NamespaceImage(), tag)
47
+	if err != nil || history == nil {
48
+		return flux.Image{}, err
49
+	}
50
+
51
+	// the manifest includes some v1-backwards-compatibility data,
52
+	// oddly called "History", which are layer metadata as JSON
53
+	// strings; these appear most-recent (i.e., topmost layer) first,
54
+	// so happily we can just decode the first entry to get a created
55
+	// time.
56
+	type v1image struct {
57
+		Created time.Time `json:"created"`
58
+	}
59
+	var topmost v1image
60
+	if len(history) > 0 {
61
+		if err = json.Unmarshal([]byte(history[0].V1Compatibility), &topmost); err == nil {
62
+			if !topmost.Created.IsZero() {
63
+				img.CreatedAt = topmost.Created
64
+			}
65
+		}
66
+	}
67
+
68
+	return img, nil
69
+}
70
+
71
+// Cancel the remote request
72
+func (a *Remote) Cancel() {
73
+	a.CancelFunc()
74
+}
75
+
76
+// ---
77
+
78
+// An implementation of Client backed by Memcache
79
+type Cache struct {
80
+	creds  Credentials
81
+	expiry time.Duration
82
+	Client memcache.MemcacheClient
83
+	logger log.Logger
84
+}
85
+
86
+func (*Cache) Cancel() {
87
+	return
88
+}
89
+
90
+func NewCache(creds Credentials, cache memcache.MemcacheClient, expiry time.Duration, logger log.Logger) Client {
91
+	return &Cache{
92
+		creds:  creds,
93
+		expiry: expiry,
94
+		Client: cache,
95
+		logger: logger,
96
+	}
97
+}
98
+
99
+func (c *Cache) Manifest(repository Repository, tag string) (flux.Image, error) {
100
+	img, err := flux.ParseImage(fmt.Sprintf("%s:%s", repository.String(), tag), time.Time{})
101
+	if err != nil {
102
+		return flux.Image{}, err
103
+	}
104
+
105
+	// Try the cache
106
+	creds := c.creds.credsFor(repository.Host())
107
+	key := manifestKey(creds.username, repository.String(), tag)
108
+	cacheItem, err := c.Client.Get(key)
109
+	if err != nil {
110
+		if err != officialMemcache.ErrCacheMiss {
111
+			c.logger.Log("err", wraperrors.Wrap(err, "Fetching tag from memcache"))
112
+		}
113
+		return flux.Image{}, err
114
+	}
115
+	err = json.Unmarshal(cacheItem.Value, &img)
116
+	if err != nil {
117
+		c.logger.Log("err", err.Error)
118
+		return flux.Image{}, err
119
+	}
120
+
121
+	return img, nil
122
+}
123
+
124
+func (c *Cache) Tags(repository Repository) (tags []string, err error) {
125
+	repo, err := ParseRepository(repository.String())
126
+	if err != nil {
127
+		c.logger.Log("err", wraperrors.Wrap(err, "Parsing repository"))
128
+		return
129
+	}
130
+	creds := c.creds.credsFor(repo.Host())
131
+
132
+	// Try the cache
133
+	key := tagKey(creds.username, repo.String())
134
+	cacheItem, err := c.Client.Get(key)
135
+	if err != nil {
136
+		if err != officialMemcache.ErrCacheMiss {
137
+			c.logger.Log("err", wraperrors.Wrap(err, "Fetching tag from memcache"))
138
+		}
139
+		return
140
+	}
141
+
142
+	// Return the cache item
143
+	err = json.Unmarshal(cacheItem.Value, &tags)
144
+	if err != nil {
145
+		c.logger.Log("err", err.Error)
146
+		return
147
+	}
148
+	return
149
+}
150
+
151
+func manifestKey(username, repository, reference string) string {
152
+	return strings.Join([]string{
153
+		"registryhistoryv1", // Just to version in case we need to change format later.
154
+		// Just the username here means we won't invalidate the cache when user
155
+		// changes password, but that should be rare. And, it also means we're not
156
+		// putting user passwords in plaintext into memcache.
157
+		username,
158
+		repository,
159
+		reference,
160
+	}, "|")
161
+}
162
+
163
+func tagKey(username, repository string) string {
164
+	return strings.Join([]string{
165
+		"registrytagsv1", // Just to version in case we need to change format later.
166
+		// Just the username here means we won't invalidate the cache when user
167
+		// changes password, but that should be rare. And, it also means we're not
168
+		// putting user passwords in plaintext into memcache.
169
+		username,
170
+		repository,
171
+	}, "|")
172
+}

+ 124
- 0
registry/client_factory.go View File

@@ -0,0 +1,124 @@
1
+package registry
2
+
3
+import (
4
+	"context"
5
+	"errors"
6
+	"github.com/go-kit/kit/log"
7
+	dockerregistry "github.com/heroku/docker-registry-client/registry"
8
+	"github.com/jonboulle/clockwork"
9
+	"github.com/weaveworks/flux/registry/memcache"
10
+	"github.com/weaveworks/flux/registry/middleware"
11
+	"golang.org/x/net/publicsuffix"
12
+	"net/http"
13
+	"net/http/cookiejar"
14
+	"time"
15
+)
16
+
17
+var (
18
+	ErrNoMemcache = errors.New("no memecache")
19
+)
20
+
21
+// ClientFactory creates a new client for the given host.
22
+// Each request might require a new client. E.g. when retrieving docker
23
+// images from docker hub, then a second from quay.io
24
+type ClientFactory interface {
25
+	ClientFor(host string) (client Client, err error)
26
+}
27
+
28
+// ---
29
+// A new ClientFactory for a Remote.
30
+func NewRemoteClientFactory(c Credentials, l log.Logger, rlc middleware.RateLimiterConfig) ClientFactory {
31
+	for host, creds := range c.m {
32
+		l.Log("host", host, "username", creds.username)
33
+	}
34
+	return &remoteClientFactory{
35
+		creds:  c,
36
+		Logger: l,
37
+		rlConf: rlc,
38
+	}
39
+}
40
+
41
+type remoteClientFactory struct {
42
+	creds  Credentials
43
+	Logger log.Logger
44
+	rlConf middleware.RateLimiterConfig
45
+}
46
+
47
+func (f *remoteClientFactory) ClientFor(host string) (Client, error) {
48
+	httphost := "https://" + host
49
+
50
+	// quay.io wants us to use cookies for authorisation, so we have
51
+	// to construct one (the default client has none). This means a
52
+	// bit more constructing things to be able to make a registry
53
+	// client literal, rather than calling .New()
54
+	jar, err := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
55
+	if err != nil {
56
+		return nil, err
57
+	}
58
+	auth := f.creds.credsFor(host)
59
+
60
+	// A context we'll use to cancel requests on error
61
+	ctx, cancel := context.WithCancel(context.Background())
62
+	// Add a timeout to the request
63
+	ctx, cancel = context.WithTimeout(ctx, requestTimeout)
64
+
65
+	// Use the wrapper to fix headers for quay.io, and remember bearer tokens
66
+	var transport http.RoundTripper
67
+	{
68
+		transport = &middleware.WWWAuthenticateFixer{Transport: http.DefaultTransport}
69
+		// Now the auth-handling wrappers that come with the library
70
+		transport = dockerregistry.WrapTransport(transport, httphost, auth.username, auth.password)
71
+		// Add the backoff mechanism so we don't DOS registries
72
+		transport = middleware.BackoffRoundTripper(transport, middleware.InitialBackoff, middleware.MaxBackoff, clockwork.NewRealClock())
73
+		// Add timeout context
74
+		transport = &middleware.ContextRoundTripper{Transport: transport, Ctx: ctx}
75
+		// Rate limit
76
+		transport = middleware.RateLimitedRoundTripper(transport, f.rlConf, host)
77
+	}
78
+
79
+	herokuRegistry := herokuManifestAdaptor{
80
+		&dockerregistry.Registry{
81
+			URL: httphost,
82
+			Client: &http.Client{
83
+				Transport: transport,
84
+				Jar:       jar,
85
+				Timeout:   requestTimeout,
86
+			},
87
+			Logf: dockerregistry.Quiet,
88
+		},
89
+	}
90
+	client := &Remote{
91
+		Registry:   &herokuRegistry,
92
+		CancelFunc: cancel,
93
+	}
94
+	return client, nil
95
+}
96
+
97
+// ---
98
+// A new ClientFactory implementation for a Cache
99
+func NewCacheClientFactory(c Credentials, l log.Logger, mc memcache.MemcacheClient, ce time.Duration) ClientFactory {
100
+	for host, creds := range c.m {
101
+		l.Log("host", host, "username", creds.username)
102
+	}
103
+	return &cacheClientFactory{
104
+		creds:          c,
105
+		Logger:         l,
106
+		MemcacheClient: mc,
107
+		CacheExpiry:    ce,
108
+	}
109
+}
110
+
111
+type cacheClientFactory struct {
112
+	creds          Credentials
113
+	Logger         log.Logger
114
+	MemcacheClient memcache.MemcacheClient
115
+	CacheExpiry    time.Duration
116
+}
117
+
118
+func (f *cacheClientFactory) ClientFor(host string) (Client, error) {
119
+	if f.MemcacheClient == nil {
120
+		return nil, ErrNoMemcache
121
+	}
122
+	client := NewCache(f.creds, f.MemcacheClient, f.CacheExpiry, f.Logger)
123
+	return client, nil
124
+}

+ 16
- 34
registry/mock.go View File

@@ -1,10 +1,8 @@
1 1
 package registry
2 2
 
3 3
 import (
4
-	"github.com/docker/distribution/manifest/schema1"
5 4
 	"github.com/pkg/errors"
6 5
 
7
-	"context"
8 6
 	"github.com/weaveworks/flux"
9 7
 )
10 8
 
@@ -19,62 +17,46 @@ type mockRemote struct {
19 17
 	err  error
20 18
 }
21 19
 
22
-func NewMockRemote(img flux.Image, tags []string, err error) Remote {
23
-	return &mockRemote{
24
-		img:  img,
25
-		tags: tags,
26
-		err:  err,
27
-	}
28
-}
29
-
30
-func (r *mockRemote) Tags(repository Repository) ([]string, error) {
31
-	return r.tags, r.err
32
-}
33
-
34
-func (r *mockRemote) Manifest(repository Repository, tag string) (flux.Image, error) {
35
-	if tag == "error" {
36
-		return flux.Image{}, errors.New("Mock is set to error when tag == error")
37
-	}
38
-	return r.img, r.err
39
-}
40
-
41
-func (r *mockRemote) Cancel() {
42
-}
43
-
20
+type ManifestFunc func(repository Repository, tag string) (flux.Image, error)
21
+type TagsFunc func(repository Repository) ([]string, error)
44 22
 type mockDockerClient struct {
45
-	manifest func(repository, reference string) ([]schema1.History, error)
46
-	tags     func(repository string) ([]string, error)
23
+	manifest ManifestFunc
24
+	tags     TagsFunc
47 25
 }
48 26
 
49
-func NewMockDockerClient(manifest func(repository, reference string) ([]schema1.History, error), tags func(repository string) ([]string, error)) dockerRegistryInterface {
27
+func NewMockClient(manifest ManifestFunc, tags TagsFunc) Client {
50 28
 	return &mockDockerClient{
51 29
 		manifest: manifest,
52 30
 		tags:     tags,
53 31
 	}
54 32
 }
55 33
 
56
-func (m *mockDockerClient) Manifest(repository, reference string) ([]schema1.History, error) {
57
-	return m.manifest(repository, reference)
34
+func (m *mockDockerClient) Manifest(repository Repository, tag string) (flux.Image, error) {
35
+	return m.manifest(repository, tag)
58 36
 }
59 37
 
60
-func (m *mockDockerClient) Tags(repository string) ([]string, error) {
38
+func (m *mockDockerClient) Tags(repository Repository) ([]string, error) {
61 39
 	return m.tags(repository)
62 40
 }
63 41
 
42
+func (*mockDockerClient) Cancel() {
43
+	return
44
+}
45
+
64 46
 type mockRemoteFactory struct {
65
-	c   dockerRegistryInterface
47
+	c   Client
66 48
 	err error
67 49
 }
68 50
 
69
-func NewMockClientFactory(c dockerRegistryInterface, err error) ClientFactory {
51
+func NewMockClientFactory(c Client, err error) ClientFactory {
70 52
 	return &mockRemoteFactory{
71 53
 		c:   c,
72 54
 		err: err,
73 55
 	}
74 56
 }
75 57
 
76
-func (m *mockRemoteFactory) ClientFor(repository string) (dockerRegistryInterface, context.CancelFunc, error) {
77
-	return m.c, func() {}, m.err
58
+func (m *mockRemoteFactory) ClientFor(repository string) (Client, error) {
59
+	return m.c, m.err
78 60
 }
79 61
 
80 62
 type mockRegistry struct {

+ 8
- 8
registry/monitoring.go View File

@@ -63,19 +63,19 @@ func (m *instrumentedRegistry) GetImage(repository Repository, tag string) (res
63 63
 	return
64 64
 }
65 65
 
66
-type InstrumentedRemote Remote
66
+type InstrumentedClient Client
67 67
 
68
-type instrumentedRemote struct {
69
-	next Remote
68
+type instrumentedClient struct {
69
+	next Client
70 70
 }
71 71
 
72
-func NewInstrumentedRemote(next Remote) Remote {
73
-	return &instrumentedRemote{
72
+func NewInstrumentedClient(next Client) Client {
73
+	return &instrumentedClient{
74 74
 		next: next,
75 75
 	}
76 76
 }
77 77
 
78
-func (m *instrumentedRemote) Manifest(repository Repository, tag string) (res flux.Image, err error) {
78
+func (m *instrumentedClient) Manifest(repository Repository, tag string) (res flux.Image, err error) {
79 79
 	start := time.Now()
80 80
 	res, err = m.next.Manifest(repository, tag)
81 81
 	requestDuration.With(
@@ -85,7 +85,7 @@ func (m *instrumentedRemote) Manifest(repository Repository, tag string) (res fl
85 85
 	return
86 86
 }
87 87
 
88
-func (m *instrumentedRemote) Tags(repository Repository) (res []string, err error) {
88
+func (m *instrumentedClient) Tags(repository Repository) (res []string, err error) {
89 89
 	start := time.Now()
90 90
 	res, err = m.next.Tags(repository)
91 91
 	requestDuration.With(
@@ -95,6 +95,6 @@ func (m *instrumentedRemote) Tags(repository Repository) (res []string, err erro
95 95
 	return
96 96
 }
97 97
 
98
-func (m *instrumentedRemote) Cancel() {
98
+func (m *instrumentedClient) Cancel() {
99 99
 	m.next.Cancel()
100 100
 }

+ 47
- 309
registry/registry.go View File

@@ -1,25 +1,20 @@
1 1
 // Package registry provides domain abstractions over container registries.
2
+// The aim is that the user only ever sees the registry information that
3
+// has been cached. A separate process is responsible for ensuring the
4
+// cache is up to date. The benefit of this is that we can rate limit
5
+// the requests to prevent rate limiting on the remote side without
6
+// affecting the UX. To the user, repository information will appear to
7
+// be returned "quickly"
8
+//
9
+// This means that the cache is now a flux requirement.
2 10
 package registry
3 11
 
4 12
 import (
5
-	"context"
6
-	"encoding/json"
7
-	"errors"
8
-	"fmt"
9
-	officialMemcache "github.com/bradfitz/gomemcache/memcache"
10 13
 	"github.com/docker/distribution/manifest/schema1"
11 14
 	"github.com/go-kit/kit/log"
12 15
 	dockerregistry "github.com/heroku/docker-registry-client/registry"
13
-	"github.com/jonboulle/clockwork"
14
-	wraperrors "github.com/pkg/errors"
15 16
 	"github.com/weaveworks/flux"
16
-	"github.com/weaveworks/flux/registry/memcache"
17
-	"github.com/weaveworks/flux/registry/middleware"
18
-	"golang.org/x/net/publicsuffix"
19
-	"net/http"
20
-	"net/http/cookiejar"
21 17
 	"sort"
22
-	"strings"
23 18
 	"time"
24 19
 )
25 20
 
@@ -55,10 +50,10 @@ func NewRegistry(c ClientFactory, l log.Logger) Registry {
55 50
 //   foo/helloworld         -> index.docker.io/foo/helloworld
56 51
 //   quay.io/foo/helloworld -> quay.io/foo/helloworld
57 52
 //
58
-func (reg *registry) GetRepository(img Repository) (_ []flux.Image, err error) {
59
-	rem, err := reg.newRemote(img)
53
+func (reg *registry) GetRepository(img Repository) ([]flux.Image, error) {
54
+	rem, err := reg.newClient(img)
60 55
 	if err != nil {
61
-		return
56
+		return nil, err
62 57
 	}
63 58
 
64 59
 	tags, err := rem.Tags(img)
@@ -77,26 +72,25 @@ func (reg *registry) GetRepository(img Repository) (_ []flux.Image, err error) {
77 72
 
78 73
 // Get a single Image from the registry if it exists
79 74
 func (reg *registry) GetImage(img Repository, tag string) (_ flux.Image, err error) {
80
-	rem, err := reg.newRemote(img)
75
+	rem, err := reg.newClient(img)
81 76
 	if err != nil {
82 77
 		return
83 78
 	}
84 79
 	return rem.Manifest(img, tag)
85 80
 }
86 81
 
87
-func (reg *registry) newRemote(img Repository) (rem Remote, err error) {
88
-	client, cancel, err := reg.factory.ClientFor(img.Host())
82
+func (reg *registry) newClient(img Repository) (Client, error) {
83
+	client, err := reg.factory.ClientFor(img.Host())
89 84
 	if err != nil {
90
-		return
85
+		return nil, err
91 86
 	}
92
-	rem = newRemote(client, cancel)
93
-	rem = NewInstrumentedRemote(rem)
94
-	return
87
+	client = NewInstrumentedClient(client)
88
+	return client, nil
95 89
 }
96 90
 
97
-func (reg *registry) tagsToRepository(remote Remote, repository Repository, tags []string) ([]flux.Image, error) {
91
+func (reg *registry) tagsToRepository(client Client, repository Repository, tags []string) ([]flux.Image, error) {
98 92
 	// one way or another, we'll be finishing all requests
99
-	defer remote.Cancel()
93
+	defer client.Cancel()
100 94
 
101 95
 	type result struct {
102 96
 		image flux.Image
@@ -109,7 +103,7 @@ func (reg *registry) tagsToRepository(remote Remote, repository Repository, tags
109 103
 	for i := 0; i < maxConcurrency; i++ {
110 104
 		go func() {
111 105
 			for tag := range toFetch {
112
-				image, err := remote.Manifest(repository, tag)
106
+				image, err := client.Manifest(repository, tag)
113 107
 				if err != nil {
114 108
 					reg.Logger.Log("registry-metadata-err", err)
115 109
 				}
@@ -129,200 +123,17 @@ func (reg *registry) tagsToRepository(remote Remote, repository Repository, tags
129 123
 			return nil, res.err
130 124
 		}
131 125
 		images[i] = res.image
126
+		reg.Logger.Log("time", res.image.CreatedAt.String())
132 127
 	}
133 128
 
134
-	sort.Sort(byCreatedDesc(images))
129
+	sort.Sort(flux.ByCreatedDesc(images))
135 130
 	return images, nil
136 131
 }
137 132
 
138
-// -----
139
-
140
-type byCreatedDesc []flux.Image
141
-
142
-func (is byCreatedDesc) Len() int      { return len(is) }
143
-func (is byCreatedDesc) Swap(i, j int) { is[i], is[j] = is[j], is[i] }
144
-func (is byCreatedDesc) Less(i, j int) bool {
145
-	switch {
146
-	case is[i].CreatedAt.IsZero():
147
-		return true
148
-	case is[j].CreatedAt.IsZero():
149
-		return false
150
-	case is[i].CreatedAt.Equal(is[j].CreatedAt):
151
-		return is[i].ID.String() < is[j].ID.String()
152
-	default:
153
-		return is[i].CreatedAt.After(is[j].CreatedAt)
154
-	}
155
-}
156
-
157
-type Remote interface {
158
-	Tags(repository Repository) ([]string, error)
159
-	Manifest(repository Repository, tag string) (flux.Image, error)
160
-	Cancel()
161
-}
162
-
163
-type remote struct {
164
-	client dockerRegistryInterface
165
-	cancel context.CancelFunc
166
-}
167
-
168
-func newRemote(client dockerRegistryInterface, cancel context.CancelFunc) Remote {
169
-	return &remote{
170
-		client: client,
171
-		cancel: cancel,
172
-	}
173
-}
174
-
175
-func (rc *remote) Tags(repository Repository) (_ []string, err error) {
176
-	return rc.client.Tags(repository.String())
177
-}
178
-
179
-func (rc *remote) Manifest(repository Repository, tag string) (img flux.Image, err error) {
180
-	img, err = flux.ParseImage(fmt.Sprintf("%s:%s", repository.String(), tag), time.Time{})
181
-	if err != nil {
182
-		return
183
-	}
184
-	history, err := rc.client.Manifest(repository.String(), tag)
185
-	if err != nil {
186
-		return
187
-	}
188
-
189
-	// the manifest includes some v1-backwards-compatibility data,
190
-	// oddly called "History", which are layer metadata as JSON
191
-	// strings; these appear most-recent (i.e., topmost layer) first,
192
-	// so happily we can just decode the first entry to get a created
193
-	// time.
194
-	type v1image struct {
195
-		Created time.Time `json:"created"`
196
-	}
197
-	var topmost v1image
198
-	if len(history) > 0 {
199
-		if err = json.Unmarshal([]byte(history[0].V1Compatibility), &topmost); err == nil {
200
-			if !topmost.Created.IsZero() {
201
-				img.CreatedAt = topmost.Created
202
-			}
203
-		}
204
-	}
205
-
206
-	return
207
-}
208
-
209
-func (rc *remote) Cancel() {
210
-	rc.cancel()
211
-}
212
-
213
-// This is an interface that represents the heroku docker registry library
214
-type dockerRegistryInterface interface {
215
-	Tags(repository string) ([]string, error)
216
-	Manifest(repository, reference string) ([]schema1.History, error)
217
-}
218
-
219
-var (
220
-	ErrNoMemcache = errors.New("no memecache")
221
-)
222
-
223
-type creds struct {
224
-	username, password string
225
-}
226
-
227
-// Credentials to a (Docker) registry.
228
-type Credentials struct {
229
-	m map[string]creds
230
-}
231
-
232
-type ClientFactory interface {
233
-	ClientFor(host string) (client dockerRegistryInterface, cancel context.CancelFunc, err error)
234
-}
235
-
236
-func NewCacheClientFactory(c Credentials, l log.Logger, mc memcache.MemcacheClient, ce time.Duration) ClientFactory {
237
-	for host, creds := range c.m {
238
-		l.Log("host", host, "username", creds.username)
239
-	}
240
-	return &cacheClientFactory{
241
-		creds:          c,
242
-		Logger:         l,
243
-		MemcacheClient: mc,
244
-		CacheExpiry:    ce,
245
-	}
246
-}
247
-
248
-type cacheClientFactory struct {
249
-	creds          Credentials
250
-	Logger         log.Logger
251
-	MemcacheClient memcache.MemcacheClient
252
-	CacheExpiry    time.Duration
253
-}
254
-
255
-func (f *cacheClientFactory) ClientFor(host string) (dockerRegistryInterface, context.CancelFunc, error) {
256
-	if f.MemcacheClient == nil {
257
-		return nil, nil, ErrNoMemcache
258
-	}
259
-	client := NewCache(f.creds, f.MemcacheClient, f.CacheExpiry, f.Logger)
260
-	return client, func() {}, nil
261
-}
262
-
263
-func NewRemoteClientFactory(c Credentials, l log.Logger, rlc middleware.RateLimiterConfig) ClientFactory {
264
-	for host, creds := range c.m {
265
-		l.Log("host", host, "username", creds.username)
266
-	}
267
-	return &remoteClientFactory{
268
-		creds:  c,
269
-		Logger: l,
270
-		rlConf: rlc,
271
-	}
272
-}
273
-
274
-type remoteClientFactory struct {
275
-	creds  Credentials
276
-	Logger log.Logger
277
-	rlConf middleware.RateLimiterConfig
278
-}
279
-
280
-func (f *remoteClientFactory) ClientFor(host string) (dockerRegistryInterface, context.CancelFunc, error) {
281
-	httphost := "https://" + host
282
-
283
-	// quay.io wants us to use cookies for authorisation, so we have
284
-	// to construct one (the default client has none). This means a
285
-	// bit more constructing things to be able to make a registry
286
-	// client literal, rather than calling .New()
287
-	jar, err := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
288
-	if err != nil {
289
-		return nil, nil, err
290
-	}
291
-	auth := f.creds.credsFor(host)
292
-
293
-	// A context we'll use to cancel requests on error
294
-	ctx, cancel := context.WithCancel(context.Background())
295
-	// Add a timeout to the request
296
-	ctx, cancel = context.WithTimeout(ctx, requestTimeout)
297
-
298
-	// Use the wrapper to fix headers for quay.io, and remember bearer tokens
299
-	var transport http.RoundTripper
300
-	{
301
-		transport = &middleware.WWWAuthenticateFixer{Transport: http.DefaultTransport}
302
-		// Now the auth-handling wrappers that come with the library
303
-		transport = dockerregistry.WrapTransport(transport, httphost, auth.username, auth.password)
304
-		// Add the backoff mechanism so we don't DOS registries
305
-		transport = middleware.BackoffRoundTripper(transport, middleware.InitialBackoff, middleware.MaxBackoff, clockwork.NewRealClock())
306
-		// Add timeout context
307
-		transport = &middleware.ContextRoundTripper{Transport: transport, Ctx: ctx}
308
-		// Rate limit
309
-		transport = middleware.RateLimitedRoundTripper(transport, f.rlConf, host)
310
-	}
311
-
312
-	client := herokuWrapper{
313
-		&dockerregistry.Registry{
314
-			URL: httphost,
315
-			Client: &http.Client{
316
-				Transport: transport,
317
-				Jar:       jar,
318
-				Timeout:   requestTimeout,
319
-			},
320
-			Logf: dockerregistry.Quiet,
321
-		},
322
-	}
323
-	return client, cancel, nil
324
-}
133
+// ---
325 134
 
135
+// Repository represents a full image address, including host.
136
+// TODO: This could probably be merged into flux.Image.
326 137
 type Repository struct {
327 138
 	img flux.Image // Internally we use an image to store data
328 139
 }
@@ -361,15 +172,24 @@ func (r Repository) ToImage(tag string) flux.Image {
361 172
 	return newImage
362 173
 }
363 174
 
364
-type herokuWrapper struct {
365
-	*dockerregistry.Registry
175
+// ---
176
+
177
+// This is an interface that represents the heroku docker registry library
178
+type HerokuRegistryLibrary interface {
179
+	Tags(repository string) (tags []string, err error)
180
+	Manifest(repository, reference string) ([]schema1.History, error)
366 181
 }
367 182
 
183
+// ---
184
+
368 185
 // Convert between types. dockerregistry returns the *same* type but from a
369
-// vendored library so go freaks out. Eugh.
370
-// TODO: The only thing we care about here for now is history. Frankly it might
371
-// be easier to convert it to JSON and back.
372
-func (h herokuWrapper) Manifest(repository, reference string) ([]schema1.History, error) {
186
+// vendored library. Because golang doesn't like to apply interfaces to a
187
+// vendored type, we have to provide an adaptor to isolate it.
188
+type herokuManifestAdaptor struct {
189
+	*dockerregistry.Registry
190
+}
191
+
192
+func (h herokuManifestAdaptor) Manifest(repository, reference string) ([]schema1.History, error) {
373 193
 	manifest, err := h.Registry.Manifest(repository, reference)
374 194
 	if err != nil || manifest == nil {
375 195
 		return nil, err
@@ -381,95 +201,13 @@ func (h herokuWrapper) Manifest(repository, reference string) ([]schema1.History
381 201
 	return result, err
382 202
 }
383 203
 
384
-type Cache struct {
385
-	creds  Credentials
386
-	expiry time.Duration
387
-	Client memcache.MemcacheClient
388
-	logger log.Logger
389
-}
390
-
391
-func NewCache(creds Credentials, cache memcache.MemcacheClient, expiry time.Duration, logger log.Logger) dockerRegistryInterface {
392
-	return &Cache{
393
-		creds:  creds,
394
-		expiry: expiry,
395
-		Client: cache,
396
-		logger: logger,
397
-	}
398
-}
399
-
400
-func (c *Cache) Manifest(repository, reference string) (history []schema1.History, err error) {
401
-	repo, err := ParseRepository(repository)
402
-	if err != nil {
403
-		c.logger.Log("err", wraperrors.Wrap(err, "Parsing repository"))
404
-		return
405
-	}
406
-	creds := c.creds.credsFor(repo.Host())
407
-
408
-	// Try the cache
409
-	key := manifestKey(creds.username, repo.String(), reference)
410
-	cacheItem, err := c.Client.Get(key)
411
-	if err != nil {
412
-		if err != officialMemcache.ErrCacheMiss {
413
-			c.logger.Log("err", wraperrors.Wrap(err, "Fetching tag from memcache"))
414
-		}
415
-		return
416
-	}
417
-
418
-	// Return the cache item
419
-	err = json.Unmarshal(cacheItem.Value, &history)
420
-	if err != nil {
421
-		c.logger.Log("err", err.Error)
422
-		return
423
-	}
424
-	return
425
-}
426
-
427
-func (c *Cache) Tags(repository string) (tags []string, err error) {
428
-	repo, err := ParseRepository(repository)
429
-	if err != nil {
430
-		c.logger.Log("err", wraperrors.Wrap(err, "Parsing repository"))
431
-		return
432
-	}
433
-	creds := c.creds.credsFor(repo.Host())
434
-
435
-	// Try the cache
436
-	key := tagKey(creds.username, repo.String())
437
-	cacheItem, err := c.Client.Get(key)
438
-	if err != nil {
439
-		if err != officialMemcache.ErrCacheMiss {
440
-			c.logger.Log("err", wraperrors.Wrap(err, "Fetching tag from memcache"))
441
-		}
442
-		return
443
-	}
444
-
445
-	// Return the cache item
446
-	err = json.Unmarshal(cacheItem.Value, &tags)
447
-	if err != nil {
448
-		c.logger.Log("err", err.Error)
449
-		return
450
-	}
451
-	return
452
-}
453
-
454
-func manifestKey(username, repository, reference string) string {
455
-	return strings.Join([]string{
456
-		"registryhistoryv1", // Just to version in case we need to change format later.
457
-		// Just the username here means we won't invalidate the cache when user
458
-		// changes password, but that should be rare. And, it also means we're not
459
-		// putting user passwords in plaintext into memcache.
460
-		username,
461
-		repository,
462
-		reference,
463
-	}, "|")
204
+// ---
205
+// Registry Credentials
206
+type creds struct {
207
+	username, password string
464 208
 }
465 209
 
466
-func tagKey(username, repository string) string {
467
-	return strings.Join([]string{
468
-		"registrytagsv1", // Just to version in case we need to change format later.
469
-		// Just the username here means we won't invalidate the cache when user
470
-		// changes password, but that should be rare. And, it also means we're not
471
-		// putting user passwords in plaintext into memcache.
472
-		username,
473
-		repository,
474
-	}, "|")
210
+// Credentials to a (Docker) registry.
211
+type Credentials struct {
212
+	m map[string]creds
475 213
 }

+ 21
- 141
registry/registry_test.go View File

@@ -2,8 +2,6 @@ package registry
2 2
 
3 3
 import (
4 4
 	"fmt"
5
-	"sort"
6
-	"strconv"
7 5
 	"testing"
8 6
 	"time"
9 7
 
@@ -20,28 +18,16 @@ import (
20 18
 
21 19
 var (
22 20
 	testTags = []string{testTagStr, "anotherTag"}
23
-	mRemote  = NewMockRemote(img, testTags, nil)
24
-	mClient  = NewMockDockerClient(
25
-		func(repository, reference string) ([]schema1.History, error) {
26
-			return []schema1.History{{`{"test":"json"}`}}, nil
21
+	mClient  = NewMockClient(
22
+		func(repository Repository, tag string) (flux.Image, error) {
23
+			return img, nil
27 24
 		},
28
-		func(repository string) ([]string, error) {
25
+		func(repository Repository) ([]string, error) {
29 26
 			return testTags, nil
30 27
 		},
31 28
 	)
32
-	testTime, _ = time.Parse(time.RFC3339Nano, constTime)
33 29
 )
34 30
 
35
-func TestRegistry_GetImage(t *testing.T) {
36
-	newImg, err := mRemote.Manifest(testRepository, img.ID.Tag)
37
-	if err != nil {
38
-		t.Fatal(err)
39
-	}
40
-	if img.ID.String() != newImg.ID.String() {
41
-		t.Fatal("Expected %v, but got %v", img.ID.String(), newImg.ID.String())
42
-	}
43
-}
44
-
45 31
 func TestRegistry_GetRepository(t *testing.T) {
46 32
 	fact := NewMockClientFactory(mClient, nil)
47 33
 	reg := NewRegistry(fact, log.NewNopLogger())
@@ -66,11 +52,11 @@ func TestRegistry_GetRepositoryFactoryError(t *testing.T) {
66 52
 }
67 53
 
68 54
 func TestRegistry_GetRepositoryManifestError(t *testing.T) {
69
-	errClient := NewMockDockerClient(
70
-		func(repository, reference string) ([]schema1.History, error) {
71
-			return nil, errors.New("")
55
+	errClient := NewMockClient(
56
+		func(repository Repository, tag string) (flux.Image, error) {
57
+			return flux.Image{}, errors.New("")
72 58
 		},
73
-		func(repository string) ([]string, error) {
59
+		func(repository Repository) ([]string, error) {
74 60
 			return testTags, nil
75 61
 		},
76 62
 	)
@@ -82,27 +68,6 @@ func TestRegistry_GetRepositoryManifestError(t *testing.T) {
82 68
 	}
83 69
 }
84 70
 
85
-func TestRegistry_OrderByCreationDate(t *testing.T) {
86
-	fmt.Printf("testTime: %s\n", testTime)
87
-	time0 := testTime.Add(time.Second)
88
-	time2 := testTime.Add(-time.Second)
89
-	imA, _ := flux.ParseImage("my/Image:3", testTime)
90
-	imB, _ := flux.ParseImage("my/Image:1", time0)
91
-	imC, _ := flux.ParseImage("my/Image:4", time2)
92
-	imD, _ := flux.ParseImage("my/Image:0", time.Time{}) // test nil
93
-	imE, _ := flux.ParseImage("my/Image:2", testTime)    // test equal
94
-	imgs := []flux.Image{imA, imB, imC, imD, imE}
95
-	sort.Sort(byCreatedDesc(imgs))
96
-	for i, im := range imgs {
97
-		if strconv.Itoa(i) != im.ID.Tag {
98
-			for j, jim := range imgs {
99
-				t.Logf("%v: %v %s", j, jim.ID.String(), jim.CreatedAt)
100
-			}
101
-			t.Fatalf("Not sorted in expected order: %#v", imgs)
102
-		}
103
-	}
104
-}
105
-
106 71
 // Note: This actually goes off to docker hub to find the Image.
107 72
 // It will fail if there is not internet connection
108 73
 func TestRemoteFactory_RawClient(t *testing.T) {
@@ -119,32 +84,35 @@ func TestRemoteFactory_RawClient(t *testing.T) {
119 84
 
120 85
 	// Refresh tags first
121 86
 	var tags []string
122
-	client, cancel, err := fact.ClientFor(testRepository.Host())
87
+	client, err := fact.ClientFor(testRepository.Host())
123 88
 	if err != nil {
124 89
 		t.Fatal(err)
125 90
 	}
126 91
 
127
-	tags, err = client.Tags(testRepository.NamespaceImage())
92
+	tags, err = client.Tags(testRepository)
128 93
 	if err != nil {
129 94
 		t.Fatal(err)
130 95
 	}
131
-	cancel()
96
+	client.Cancel()
132 97
 	if len(tags) == 0 {
133 98
 		t.Fatal("Should have some tags")
134 99
 	}
135 100
 
136
-	client, cancel, err = fact.ClientFor(testRepository.Host())
101
+	client, err = fact.ClientFor(testRepository.Host())
137 102
 	if err != nil {
138 103
 		t.Fatal(err)
139 104
 	}
140
-	history, err := client.Manifest(testRepository.NamespaceImage(), tags[0])
105
+	newImg, err := client.Manifest(testRepository, tags[0])
141 106
 	if err != nil {
142 107
 		t.Fatal(err)
143 108
 	}
144
-	if len(history) == 0 {
145
-		t.Fatal("Should have some history")
109
+	if newImg.ID.String() == "" {
110
+		t.Fatal("Should image ")
111
+	}
112
+	if newImg.CreatedAt.IsZero() {
113
+		t.Fatal("CreatedAt time was 0")
146 114
 	}
147
-	cancel()
115
+	client.Cancel()
148 116
 }
149 117
 
150 118
 func TestRemoteFactory_InvalidHost(t *testing.T) {
@@ -154,12 +122,11 @@ func TestRemoteFactory_InvalidHost(t *testing.T) {
154 122
 		t.Fatal(err)
155 123
 	}
156 124
 	testRepository = RepositoryFromImage(img)
157
-	client, cancel, err := fact.ClientFor(testRepository.Host())
125
+	client, err := fact.ClientFor(testRepository.Host())
158 126
 	if err != nil {
159 127
 		return
160 128
 	}
161
-	r := newRemote(client, cancel)
162
-	_, err = r.Manifest(testRepository, img.ID.Tag)
129
+	_, err = client.Manifest(testRepository, img.ID.Tag)
163 130
 	if err == nil {
164 131
 		t.Fatal("Expected error due to invalid host but got none.")
165 132
 	}
@@ -334,93 +301,6 @@ var (
334 301
 	}
335 302
 )
336 303
 
337
-// Need to create a dummy manifest here
338
-func TestRemoteClient_ParseManifest(t *testing.T) {
339
-	manifestFunc := func(repo, ref string) ([]schema1.History, error) {
340
-		return man.Manifest.History, nil
341
-	}
342
-	c := newRemote(
343
-		NewMockDockerClient(manifestFunc, nil),
344
-		nil,
345
-	)
346
-	testRepository = RepositoryFromImage(img)
347
-	desc, err := c.Manifest(testRepository, img.ID.Tag)
348
-	if err != nil {
349
-		t.Fatal(err.Error())
350
-	}
351
-	if string(desc.ID.FullID()) != testImageStr {
352
-		t.Fatalf("Expecting %q but got %q", testImageStr, string(desc.ID.FullID()))
353
-	}
354
-	if desc.CreatedAt.Format(time.RFC3339Nano) != constTime {
355
-		t.Fatalf("Expecting %q but got %q", constTime, desc.CreatedAt.Format(time.RFC3339Nano))
356
-	}
357
-}
358
-
359
-// Just a simple pass through.
360
-func TestRemoteClient_GetTags(t *testing.T) {
361
-	c := remote{
362
-		client: NewMockDockerClient(nil, func(repository string) ([]string, error) {
363
-			return []string{
364
-				testTagStr,
365
-			}, nil
366
-		}),
367
-	}
368
-	tags, err := c.Tags(testRepository)
369
-	if err != nil {
370
-		t.Fatal(err.Error())
371
-	}
372
-	if tags[0] != testTagStr {
373
-		t.Fatalf("Expecting %q but got %q", testTagStr, tags[0])
374
-	}
375
-}
376
-
377
-func TestRemoteClient_IsCancelCalled(t *testing.T) {
378
-	var didCancel bool
379
-	r := remote{
380
-		cancel: func() { didCancel = true },
381
-	}
382
-	r.Cancel()
383
-	if !didCancel {
384
-		t.Fatal("Expected it to call the cancel func")
385
-	}
386
-}
387
-
388
-func TestRemoteClient_RemoteErrors(t *testing.T) {
389
-	manifestFunc := func(repo, ref string) ([]schema1.History, error) {
390
-		return man.Manifest.History, errors.New("dummy")
391
-	}
392
-	tagsFunc := func(repository string) ([]string, error) {
393
-		return []string{
394
-			testTagStr,
395
-		}, errors.New("dummy")
396
-	}
397
-	c := remote{
398
-		client: NewMockDockerClient(manifestFunc, tagsFunc),
399
-	}
400
-	_, err := c.Tags(testRepository)
401
-	if err == nil {
402
-		t.Fatal("Expected error")
403
-	}
404
-	_, err = c.Manifest(testRepository, img.ID.Tag)
405
-	if err == nil {
406
-		t.Fatal("Expected error")
407
-	}
408
-}
409
-
410
-func TestRemoteClient_TestNew(t *testing.T) {
411
-	r := &herokuWrapper{}
412
-	var flag bool
413
-	f := func() { flag = true }
414
-	c := newRemote(r, f)
415
-	if c.(*remote).client != r { // Test that client was set
416
-		t.Fatal("Client was not set")
417
-	}
418
-	c.(*remote).cancel()
419
-	if !flag { // Test that our cancel function, when called, works
420
-		t.Fatal("Expected it to call the cancel func")
421
-	}
422
-}
423
-
424 304
 func TestRepository_ParseImage(t *testing.T) {
425 305
 	for _, x := range []struct {
426 306
 		test     string

+ 21
- 8
registry/warming.go View File

@@ -42,18 +42,18 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, warm <-chan Repo
42 42
 }
43 43
 
44 44
 func (w *Warmer) warm(repository Repository) {
45
-	client, cancel, err := w.ClientFactory.ClientFor(repository.Host())
45
+	client, err := w.ClientFactory.ClientFor(repository.Host())
46 46
 	if err != nil {
47 47
 		w.Logger.Log("err", err.Error())
48 48
 		return
49 49
 	}
50
-	defer cancel()
50
+	defer client.Cancel()
51 51
 
52 52
 	username := w.Creds.credsFor(repository.Host()).username
53 53
 
54 54
 	// Refresh tags first
55 55
 	// Only, for example, "library/alpine" because we have the host information in the client above.
56
-	tags, err := client.Tags(repository.NamespaceImage())
56
+	tags, err := client.Tags(repository)
57 57
 	if err != nil {
58 58
 		if !strings.Contains(err.Error(), "status=401") {
59 59
 			w.Logger.Log("err", err.Error())
@@ -70,7 +70,6 @@ func (w *Warmer) warm(repository Repository) {
70 70
 	// Use the full path to image for the memcache key because there
71 71
 	// might be duplicates from other registries
72 72
 	key := tagKey(username, repository.String())
73
-
74 73
 	if err := w.Client.Set(&officialMemcache.Item{
75 74
 		Key:        key,
76 75
 		Value:      val,
@@ -81,9 +80,21 @@ func (w *Warmer) warm(repository Repository) {
81 80
 	}
82 81
 
83 82
 	// Now refresh the manifests for each tag
83
+	var updated bool
84 84
 	for _, tag := range tags {
85
-		// Only, for example, "library/alpine" because we have the host information in the client above.
86
-		history, err := client.Manifest(repository.NamespaceImage(), tag)
85
+		// See if we have the manifest already cached
86
+		// We don't want to re-download a manifest again.
87
+		key := manifestKey(username, repository.String(), tag)
88
+		_, err := w.Client.Get(key)
89
+		if err == nil { // If no error, we've already got it
90
+			continue
91
+		}
92
+
93
+		history, err := client.Manifest(repository, tag)
94
+		if err != nil {
95
+			w.Logger.Log("err", err.Error())
96
+			continue
97
+		}
87 98
 
88 99
 		val, err := json.Marshal(history)
89 100
 		if err != nil {
@@ -91,8 +102,6 @@ func (w *Warmer) warm(repository Repository) {
91 102
 			return
92 103
 		}
93 104
 
94
-		// Full path to image again.
95
-		key := manifestKey(username, repository.String(), tag)
96 105
 		if err := w.Client.Set(&officialMemcache.Item{
97 106
 			Key:        key,
98 107
 			Value:      val,
@@ -101,6 +110,10 @@ func (w *Warmer) warm(repository Repository) {
101 110
 			w.Logger.Log("err", errors.Wrap(err, "storing tags in memcache"))
102 111
 			return
103 112
 		}
113
+		updated = true
114
+	}
115
+	if updated {
116
+		w.Logger.Log("updated", repository.String())
104 117
 	}
105 118
 }
106 119
 

+ 9
- 13
registry/warming_test.go View File

@@ -4,8 +4,8 @@ package registry
4 4
 
5 5
 import (
6 6
 	"encoding/json"
7
-	"github.com/docker/distribution/manifest/schema1"
8 7
 	"github.com/go-kit/kit/log"
8
+	"github.com/weaveworks/flux"
9 9
 	"os"
10 10
 	"sync"
11 11
 	"testing"
@@ -16,11 +16,11 @@ func TestWarmer_CacheNewRepo(t *testing.T) {
16 16
 	mc := Setup(t)
17 17
 	defer mc.Stop()
18 18
 
19
-	dc := NewMockDockerClient(
20
-		func(repository, reference string) ([]schema1.History, error) {
21
-			return []schema1.History{{`{"test":"json"}`}}, nil
19
+	dc := NewMockClient(
20
+		func(repository Repository, tag string) (flux.Image, error) {
21
+			return img, nil
22 22
 		},
23
-		func(repository string) ([]string, error) {
23
+		func(repository Repository) ([]string, error) {
24 24
 			return []string{"tag1"}, nil
25 25
 		},
26 26
 	)
@@ -70,18 +70,14 @@ func TestWarmer_CacheNewRepo(t *testing.T) {
70 70
 	if err != nil {
71 71
 		t.Fatal(err)
72 72
 	}
73
-	var manifests []schema1.History
74
-	err = json.Unmarshal(item.Value, &manifests)
73
+	var i flux.Image
74
+	err = json.Unmarshal(item.Value, &i)
75 75
 	if err != nil {
76 76
 		t.Fatal(err)
77 77
 	}
78 78
 
79
-	if len(manifests) != 1 {
80
-		t.Fatalf("Expected 1 history item, got %v", manifests)
81
-	}
82
-	expectedManifest := schema1.History{`{"test":"json"}`}
83
-	if manifests[0] != expectedManifest {
84
-		t.Fatalf("Expected  history item: %v, got %v", expectedManifest, manifests[0])
79
+	if i.ID.String() != img.ID.String() {
80
+		t.Fatalf("Expected %s, got %s", img.ID.String(), i.ID.String())
85 81
 	}
86 82
 }
87 83
 

Loading…
Cancel
Save