Browse Source

Parse docker credentials from imagePullRequests (not .dockerconfig) (#678)

* Flux should work with registries on different ports.

* Parse docker credentials from imagePullSecrets

This commit moves the method that returns a list of images to warm into
the kubernetes package. This is because the method is specific to k8s
anyway and the new code to parse the secrets needed access to the k8s
cluster.

ImageCreds is a new struct to colocate the ImageID and the credentials
required to pull them.

Credentials are now added to the client via the client factory for each
new client. This is because each image might have different credentials.

* Review fixes. Added documentation.
Michael Bridgen 2 years ago
parent
commit
2d255f83a5

+ 122
- 3
cluster/kubernetes/kubernetes.go View File

@@ -12,16 +12,17 @@ import (
12 12
 	"github.com/go-kit/kit/log"
13 13
 	"github.com/pkg/errors"
14 14
 	"gopkg.in/yaml.v2"
15
-	discovery "k8s.io/client-go/1.5/discovery"
15
+	"k8s.io/client-go/1.5/discovery"
16 16
 	k8sclient "k8s.io/client-go/1.5/kubernetes"
17 17
 	v1core "k8s.io/client-go/1.5/kubernetes/typed/core/v1"
18 18
 	v1beta1extensions "k8s.io/client-go/1.5/kubernetes/typed/extensions/v1beta1"
19
-	api "k8s.io/client-go/1.5/pkg/api"
20
-	v1 "k8s.io/client-go/1.5/pkg/api/v1"
19
+	"k8s.io/client-go/1.5/pkg/api"
20
+	"k8s.io/client-go/1.5/pkg/api/v1"
21 21
 	apiext "k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
22 22
 
23 23
 	"github.com/weaveworks/flux"
24 24
 	"github.com/weaveworks/flux/cluster"
25
+	"github.com/weaveworks/flux/registry"
25 26
 	"github.com/weaveworks/flux/ssh"
26 27
 )
27 28
 
@@ -287,6 +288,16 @@ type podController struct {
287 288
 	Deployment            *apiext.Deployment
288 289
 }
289 290
 
291
+func (p podController) secrets() (rawSecrets []v1.LocalObjectReference) {
292
+	// If deployment doesn't contain any secrets, just return empty secret
293
+	if p.Deployment != nil && p.Deployment.Spec.Template.Spec.ImagePullSecrets != nil {
294
+		rawSecrets = p.Deployment.Spec.Template.Spec.ImagePullSecrets
295
+	} else if p.ReplicationController != nil && p.ReplicationController.Spec.Template.Spec.ImagePullSecrets != nil {
296
+		rawSecrets = p.ReplicationController.Spec.Template.Spec.ImagePullSecrets
297
+	}
298
+	return
299
+}
300
+
290 301
 func (p podController) templateContainers() (res []cluster.Container) {
291 302
 	var apiContainers []v1.Container
292 303
 	if p.Deployment != nil {
@@ -483,6 +494,114 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
483 494
 	return publicKey, nil
484 495
 }
485 496
 
497
+type servicePod struct {
498
+	s  v1.Service
499
+	pc podController
500
+}
501
+
502
+// Internal function to get a collection of service-controller pairs for a given namespace
503
+func (c *Cluster) allServices(ns string) (serviceControllers []servicePod, _ error) {
504
+	var namespaces []v1.Namespace
505
+	if ns == "" {
506
+		nsList, err := c.client.Namespaces().List(api.ListOptions{})
507
+		if err != nil {
508
+			return serviceControllers, errors.Wrap(err, "getting namespaces")
509
+		}
510
+		namespaces = nsList.Items
511
+	} else {
512
+		nsSingle, err := c.client.Namespaces().Get(ns)
513
+		if err != nil {
514
+			return serviceControllers, errors.Wrap(err, "getting namespaces")
515
+		}
516
+		namespaces = []v1.Namespace{*nsSingle}
517
+	}
518
+
519
+	// Foreach namespace
520
+	for _, ns := range namespaces {
521
+		services, err := c.client.Services(ns.Name).List(api.ListOptions{})
522
+		if err != nil {
523
+			return serviceControllers, errors.Wrapf(err, "getting services for namespace %s", ns.Name)
524
+		}
525
+
526
+		controllers, err := c.podControllersInNamespace(ns.Name)
527
+		if err != nil {
528
+			return serviceControllers, errors.Wrapf(err, "getting controllers for namespace %s", ns.Name)
529
+		}
530
+
531
+		// Foreach service
532
+		for _, service := range services.Items {
533
+			if isAddon(&service) {
534
+				continue
535
+			}
536
+
537
+			// Find controller for service
538
+			pc, err := matchController(&service, controllers)
539
+			if err != nil {
540
+				c.logger.Log(errors.Wrapf(cluster.ErrNoMatching, "matching controllers to service %s/%s", ns.Name, service.Name))
541
+				continue
542
+			}
543
+			serviceControllers = append(serviceControllers, servicePod{s: service, pc: pc})
544
+		}
545
+	}
546
+	return
547
+}
548
+
549
+// ImagesToFetch is a k8s specific method to get a list of images to update along with their credentials
550
+func (c *Cluster) ImagesToFetch() (imageCreds registry.ImageCreds) {
551
+	imageCreds = make(registry.ImageCreds, 0)
552
+	serviceControllers, err := c.allServices("")
553
+	if err != nil {
554
+		c.logger.Log("err", errors.Wrapf(err, "fetching images"))
555
+		return
556
+	}
557
+
558
+	// Foreach service-controller combo
559
+	for _, servicePod := range serviceControllers {
560
+		service := servicePod.s
561
+		controller := servicePod.pc
562
+		var rawSecrets = controller.secrets()
563
+		creds := registry.NoCredentials()
564
+		// Foreach secret in PodSpec
565
+		for _, secName := range rawSecrets {
566
+			// Get secret
567
+			sec, err := c.client.Secrets(service.Namespace).Get(secName.Name)
568
+			if err != nil {
569
+				c.logger.Log("err", errors.Wrapf(err, "getting secret %q from namespace %q", secName.Name, service.Namespace))
570
+				continue
571
+			}
572
+			if sec.Type != v1.SecretTypeDockercfg {
573
+				continue
574
+			}
575
+			decoded, ok := sec.Data[v1.DockerConfigKey]
576
+			if !ok {
577
+				c.logger.Log("err", errors.Wrapf(err, "retrieving pod secret %q", secName.Name))
578
+				continue
579
+			}
580
+
581
+			// Parse secret
582
+			crd, err := registry.ParseCredentials(decoded)
583
+			if err != nil {
584
+				c.logger.Log("err", err.Error())
585
+				continue
586
+			}
587
+
588
+			// Merge into the credentials for this PodSpec
589
+			creds.Merge(crd)
590
+		}
591
+
592
+		// Now create the service and attach the credentials
593
+		for _, ctn := range controller.templateContainers() {
594
+			r, err := flux.ParseImageID(ctn.Image)
595
+			if err != nil {
596
+				c.logger.Log("err", err.Error())
597
+				continue
598
+			}
599
+			imageCreds[r] = creds
600
+		}
601
+	}
602
+	return
603
+}
604
+
486 605
 // --- end cluster.Cluster
487 606
 
488 607
 // A convenience for getting an minimal object from some bytes.

+ 9
- 36
cmd/fluxd/main.go View File

@@ -19,7 +19,6 @@ import (
19 19
 	k8sclient "k8s.io/client-go/1.5/kubernetes"
20 20
 	"k8s.io/client-go/1.5/rest"
21 21
 
22
-	//	"github.com/weaveworks/flux"
23 22
 	"github.com/weaveworks/flux"
24 23
 	"github.com/weaveworks/flux/cluster"
25 24
 	"github.com/weaveworks/flux/cluster/kubernetes"
@@ -76,7 +75,6 @@ func main() {
76 75
 		gitNotesRef     = fs.String("git-notes-ref", "flux", "ref to use for keeping commit annotations in git notes")
77 76
 		gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits")
78 77
 		// registry
79
-		dockerCredFile       = fs.String("docker-config", "~/.docker/config.json", "Path to config file with credentials for DockerHub, quay.io etc.")
80 78
 		memcachedHostname    = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
81 79
 		memcachedTimeout     = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
82 80
 		memcachedService     = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
@@ -118,6 +116,7 @@ func main() {
118 116
 	var clusterVersion string
119 117
 	var sshKeyRing ssh.KeyRing
120 118
 	var k8s cluster.Cluster
119
+	var image_creds func() registry.ImageCreds
121 120
 	var k8sManifests cluster.Manifests
122 121
 	{
123 122
 		restClientConfig, err := rest.InClusterConfig()
@@ -134,6 +133,7 @@ func main() {
134 133
 			logger.Log("err", err)
135 134
 			os.Exit(1)
136 135
 		}
136
+
137 137
 		serverVersion, err := clientset.ServerVersion()
138 138
 		if err != nil {
139 139
 			logger.Log("err", err)
@@ -180,19 +180,20 @@ func main() {
180 180
 		logger.Log("kubectl", kubectl)
181 181
 
182 182
 		kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig, os.Stdout, os.Stderr)
183
-		cluster, err := kubernetes.NewCluster(clientset, kubectlApplier, sshKeyRing, logger)
183
+		k8s_inst, err := kubernetes.NewCluster(clientset, kubectlApplier, sshKeyRing, logger)
184 184
 		if err != nil {
185 185
 			logger.Log("err", err)
186 186
 			os.Exit(1)
187 187
 		}
188 188
 
189
-		if err := cluster.Ping(); err != nil {
189
+		if err := k8s_inst.Ping(); err != nil {
190 190
 			logger.Log("ping", err)
191 191
 		} else {
192 192
 			logger.Log("ping", true)
193 193
 		}
194 194
 
195
-		k8s = cluster
195
+		image_creds = k8s_inst.ImagesToFetch
196
+		k8s = k8s_inst
196 197
 		// There is only one way we currently interpret a repo of
197 198
 		// files as manifests, and that's as Kubernetes yamels.
198 199
 		k8sManifests = &kubernetes.Manifests{}
@@ -216,13 +217,9 @@ func main() {
216 217
 			defer memcacheClient.Stop()
217 218
 		}
218 219
 
219
-		creds, err := registry.CredentialsFromFile(*dockerCredFile)
220
-		if err != nil {
221
-			logger.Log("err", err)
222
-		}
223 220
 		cacheLogger := log.NewContext(logger).With("component", "cache")
224 221
 		cache = registry.NewRegistry(
225
-			registry.NewCacheClientFactory(creds, cacheLogger, memcacheClient, *registryCacheExpiry),
222
+			registry.NewCacheClientFactory(cacheLogger, memcacheClient, *registryCacheExpiry),
226 223
 			cacheLogger,
227 224
 			*memcachedConnections,
228 225
 		)
@@ -230,7 +227,7 @@ func main() {
230 227
 
231 228
 		// Remote
232 229
 		registryLogger := log.NewContext(logger).With("component", "registry")
233
-		remoteFactory := registry.NewRemoteClientFactory(creds, registryLogger, registryMiddleware.RateLimiterConfig{
230
+		remoteFactory := registry.NewRemoteClientFactory(registryLogger, registryMiddleware.RateLimiterConfig{
234 231
 			RPS:   *registryRPS,
235 232
 			Burst: *registryBurst,
236 233
 		})
@@ -240,7 +237,6 @@ func main() {
240 237
 		cacheWarmer = registry.Warmer{
241 238
 			Logger:        warmerLogger,
242 239
 			ClientFactory: remoteFactory,
243
-			Creds:         creds,
244 240
 			Expiry:        *registryCacheExpiry,
245 241
 			Reader:        memcacheClient,
246 242
 			Writer:        memcacheClient,
@@ -377,7 +373,7 @@ func main() {
377 373
 	go daemon.GitPollLoop(shutdown, shutdownWg, log.NewContext(logger).With("component", "sync-loop"))
378 374
 
379 375
 	shutdownWg.Add(1)
380
-	go cacheWarmer.Loop(shutdown, shutdownWg, servicesToRepositories(k8s, cacheWarmer.Logger))
376
+	go cacheWarmer.Loop(shutdown, shutdownWg, image_creds)
381 377
 
382 378
 	// Update daemonRef so that upstream and handlers point to fully working daemon
383 379
 	daemonRef.UpdatePlatform(daemon)
@@ -421,26 +417,3 @@ func checkForUpdates(clusterString string, gitString string, logger log.Logger)
421 417
 
422 418
 	return checkpoint.CheckInterval(&params, versionCheckPeriod, handleResponse)
423 419
 }
424
-
425
-func servicesToRepositories(k8s cluster.Cluster, log log.Logger) func() []flux.ImageID {
426
-	return func() []flux.ImageID {
427
-		svcs, err := k8s.AllServices("")
428
-		if err != nil {
429
-			log.Log("err", err.Error())
430
-			return []flux.ImageID{}
431
-		}
432
-		repos := make([]flux.ImageID, 0)
433
-		for _, s := range svcs {
434
-			for _, c := range s.Containers.Containers {
435
-				r, err := flux.ParseImageID(c.Image)
436
-				if err != nil {
437
-					log.Log("err", err.Error())
438
-					continue
439
-				}
440
-				repos = append(repos, r)
441
-			}
442
-
443
-		}
444
-		return repos
445
-	}
446
-}

+ 3
- 0
image.go View File

@@ -41,6 +41,9 @@ func ParseImageID(s string) (ImageID, error) {
41 41
 	case 2:
42 42
 		img.Tag = parts[1]
43 43
 		s = parts[0]
44
+	case 3: // There might be three parts if there is a host with a custom port
45
+		img.Tag = parts[2]
46
+		s = s[:strings.LastIndex(s, ":")]
44 47
 	default:
45 48
 		return ImageID{}, ErrMalformedImageID
46 49
 	}

+ 2
- 2
image_test.go View File

@@ -26,6 +26,8 @@ func TestImageID_ParseImageID(t *testing.T) {
26 26
 		{"quay.io/library/alpine", "quay.io/library/alpine:latest"},
27 27
 		{"quay.io/library/alpine:latest", "quay.io/library/alpine:latest"},
28 28
 		{"quay.io/library/alpine:mytag", "quay.io/library/alpine:mytag"},
29
+		{"localhost:5000/library/alpine:mytag", "localhost:5000/library/alpine:mytag"},
30
+		{"kube-registry.kube-system.svc.cluster.local:31000/secret/repo:latest", "kube-registry.kube-system.svc.cluster.local:31000/secret/repo:latest"},
29 31
 	} {
30 32
 		i, err := ParseImageID(x.test)
31 33
 		if err != nil {
@@ -43,8 +45,6 @@ func TestImageID_ParseImageIDErrorCases(t *testing.T) {
43 45
 	}{
44 46
 		{""},
45 47
 		{":tag"},
46
-		{"alpine::"},
47
-		{"alpine:invalid:"},
48 48
 		{"/too/many/slashes/"},
49 49
 	} {
50 50
 		_, err := ParseImageID(x.test)

+ 7
- 17
registry/client_factory.go View File

@@ -23,29 +23,24 @@ var (
23 23
 // Each request might require a new client. E.g. when retrieving docker
24 24
 // images from docker hub, then a second from quay.io
25 25
 type ClientFactory interface {
26
-	ClientFor(host string) (client Client, err error)
26
+	ClientFor(host string, creds Credentials) (client Client, err error)
27 27
 }
28 28
 
29 29
 // ---
30 30
 // A new ClientFactory for a Remote.
31
-func NewRemoteClientFactory(c Credentials, l log.Logger, rlc middleware.RateLimiterConfig) ClientFactory {
32
-	for host, creds := range c.m {
33
-		l.Log("host", host, "username", creds.username)
34
-	}
31
+func NewRemoteClientFactory(l log.Logger, rlc middleware.RateLimiterConfig) ClientFactory {
35 32
 	return &remoteClientFactory{
36
-		creds:  c,
37 33
 		Logger: l,
38 34
 		rlConf: rlc,
39 35
 	}
40 36
 }
41 37
 
42 38
 type remoteClientFactory struct {
43
-	creds  Credentials
44 39
 	Logger log.Logger
45 40
 	rlConf middleware.RateLimiterConfig
46 41
 }
47 42
 
48
-func (f *remoteClientFactory) ClientFor(host string) (Client, error) {
43
+func (f *remoteClientFactory) ClientFor(host string, creds Credentials) (Client, error) {
49 44
 	httphost := "https://" + host
50 45
 
51 46
 	// quay.io wants us to use cookies for authorisation, so we have
@@ -56,7 +51,7 @@ func (f *remoteClientFactory) ClientFor(host string) (Client, error) {
56 51
 	if err != nil {
57 52
 		return nil, err
58 53
 	}
59
-	auth := f.creds.credsFor(host)
54
+	auth := creds.credsFor(host)
60 55
 
61 56
 	// A context we'll use to cancel requests on error
62 57
 	ctx, cancel := context.WithCancel(context.Background())
@@ -95,12 +90,8 @@ func (f *remoteClientFactory) ClientFor(host string) (Client, error) {
95 90
 
96 91
 // ---
97 92
 // A new ClientFactory implementation for a Cache
98
-func NewCacheClientFactory(c Credentials, l log.Logger, cache cache.Reader, cacheExpiry time.Duration) ClientFactory {
99
-	for host, creds := range c.m {
100
-		l.Log("host", host, "username", creds.username)
101
-	}
93
+func NewCacheClientFactory(l log.Logger, cache cache.Reader, cacheExpiry time.Duration) ClientFactory {
102 94
 	return &cacheClientFactory{
103
-		creds:       c,
104 95
 		Logger:      l,
105 96
 		cache:       cache,
106 97
 		CacheExpiry: cacheExpiry,
@@ -108,16 +99,15 @@ func NewCacheClientFactory(c Credentials, l log.Logger, cache cache.Reader, cach
108 99
 }
109 100
 
110 101
 type cacheClientFactory struct {
111
-	creds       Credentials
112 102
 	Logger      log.Logger
113 103
 	cache       cache.Reader
114 104
 	CacheExpiry time.Duration
115 105
 }
116 106
 
117
-func (f *cacheClientFactory) ClientFor(host string) (Client, error) {
107
+func (f *cacheClientFactory) ClientFor(host string, creds Credentials) (Client, error) {
118 108
 	if f.cache == nil {
119 109
 		return nil, ErrNoMemcache
120 110
 	}
121
-	client := NewCache(f.creds, f.cache, f.CacheExpiry, f.Logger)
111
+	client := NewCache(creds, f.cache, f.CacheExpiry, f.Logger)
122 112
 	return client, nil
123 113
 }

+ 17
- 10
registry/credentials.go View File

@@ -5,7 +5,6 @@ import (
5 5
 	"encoding/json"
6 6
 	"fmt"
7 7
 	"github.com/pkg/errors"
8
-	"io/ioutil"
9 8
 	"net/url"
10 9
 	"strings"
11 10
 )
@@ -27,21 +26,21 @@ func NoCredentials() Credentials {
27 26
 	}
28 27
 }
29 28
 
30
-func CredentialsFromFile(path string) (Credentials, error) {
31
-	configBytes, err := ioutil.ReadFile(path)
32
-	if err != nil {
33
-		return Credentials{}, err
34
-	}
35
-
29
+func ParseCredentials(b []byte) (Credentials, error) {
36 30
 	var config struct {
37 31
 		Auths map[string]struct {
38 32
 			Auth string
39 33
 		}
40 34
 	}
41
-	if err = json.Unmarshal(configBytes, &config); err != nil {
35
+	if err := json.Unmarshal(b, &config); err != nil {
42 36
 		return Credentials{}, err
43 37
 	}
44
-
38
+	// If it's in k8s format, it won't have the surrounding "Auth". Try that too.
39
+	if len(config.Auths) == 0 {
40
+		if err := json.Unmarshal(b, &config.Auths); err != nil {
41
+			return Credentials{}, err
42
+		}
43
+	}
45 44
 	m := map[string]creds{}
46 45
 	for host, entry := range config.Auths {
47 46
 		decodedAuth, err := base64.StdEncoding.DecodeString(entry.Auth)
@@ -57,11 +56,13 @@ func CredentialsFromFile(path string) (Credentials, error) {
57 56
 		// Some users were passing in credentials in the form of
58 57
 		// http://docker.io and http://docker.io/v1/, etc.
59 58
 		// So strip everything down to it's base host.
59
+		// Also, the registry might be local and on a different port.
60
+		// So we need to check for that because url.Parse won't parse the ip:port format very well.
60 61
 		u, err := url.Parse(host)
61 62
 		if err != nil {
62 63
 			return Credentials{}, err
63 64
 		}
64
-		if u.Host == "" && u.Path == "" {
65
+		if u.Host == "" && u.Path == "" && !strings.Contains(host, ":") || host == "http://" || host == "https://" {
65 66
 			return Credentials{}, errors.New("Empty registry auth url")
66 67
 		}
67 68
 		if u.Host == "" { // If there's no https:// prefix, it won't parse the host.
@@ -105,3 +106,9 @@ func (cs Credentials) Hosts() []string {
105 106
 	}
106 107
 	return hosts
107 108
 }
109
+
110
+func (cs Credentials) Merge(c Credentials) {
111
+	for k, v := range c.m {
112
+		cs.m[k] = v
113
+	}
114
+}

+ 41
- 85
registry/credentials_test.go View File

@@ -3,15 +3,13 @@ package registry
3 3
 import (
4 4
 	"encoding/base64"
5 5
 	"fmt"
6
-	"io/ioutil"
7
-	"os"
8 6
 	"testing"
7
+	"time"
9 8
 )
10 9
 
11 10
 var (
12 11
 	user string = "user"
13 12
 	pass string = "pass"
14
-	host string = "host"
15 13
 	tmpl string = `
16 14
     {
17 15
         "auths": {
@@ -21,72 +19,6 @@ var (
21 19
 	okCreds string = base64.StdEncoding.EncodeToString([]byte(user + ":" + pass))
22 20
 )
23 21
 
24
-func writeCreds(t *testing.T, creds string) (string, func()) {
25
-	file, err := ioutil.TempFile("", "testcreds")
26
-	file.Write([]byte(creds))
27
-	file.Close()
28
-	if err != nil {
29
-		t.Fatal(err)
30
-	}
31
-	return file.Name(), func() {
32
-		os.Remove(file.Name())
33
-	}
34
-}
35
-
36
-func TestRemoteFactory_CredentialsFromFile(t *testing.T) {
37
-	file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, host, okCreds))
38
-	defer cleanup()
39
-
40
-	creds, err := CredentialsFromFile(file)
41
-	if err != nil {
42
-		t.Fatal(err)
43
-	}
44
-	c := creds.credsFor(host)
45
-	if user != c.username {
46
-		t.Fatalf("Expected %q, got %q.", user, c.username)
47
-	}
48
-	if pass != c.password {
49
-		t.Fatalf("Expected %q, got %q.", pass, c.password)
50
-	}
51
-	if len(creds.Hosts()) != 1 || host != creds.Hosts()[0] {
52
-		t.Fatalf("Expected %q, got %q.", host, creds.Hosts()[0])
53
-	}
54
-}
55
-
56
-func TestRemoteFactory_CredentialsFromConfigDecodeError(t *testing.T) {
57
-	file, cleanup := writeCreds(t, `{
58
-    "auths": {
59
-        "host": {"auth": "credentials:notencoded"}
60
-    }
61
-}`)
62
-	defer cleanup()
63
-	_, err := CredentialsFromFile(file)
64
-	if err == nil {
65
-		t.Fatal("Expected error")
66
-	}
67
-}
68
-
69
-func TestRemoteFactory_CredentialsFromConfigHTTPSHosts(t *testing.T) {
70
-	httpsHost := fmt.Sprintf("https://%s/v1/", host)
71
-	file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, httpsHost, okCreds))
72
-	defer cleanup()
73
-
74
-	creds, err := CredentialsFromFile(file)
75
-	if err != nil {
76
-		t.Fatal(err)
77
-	}
78
-	c := creds.credsFor(host)
79
-	if user != c.username {
80
-		t.Fatalf("Expected %q, got %q.", user, c.username)
81
-	}
82
-	if pass != c.password {
83
-		t.Fatalf("Expected %q, got %q.", pass, c.password)
84
-	}
85
-	if len(creds.Hosts()) != 1 || host != creds.Hosts()[0] {
86
-		t.Fatalf("Expected %q, got %q.", httpsHost, creds.Hosts()[0])
87
-	}
88
-}
89
-
90 22
 func TestRemoteFactory_ParseHost(t *testing.T) {
91 23
 	for _, v := range []struct {
92 24
 		host        string
@@ -101,6 +33,18 @@ func TestRemoteFactory_ParseHost(t *testing.T) {
101 33
 			host:        "gcr.io",
102 34
 			imagePrefix: "gcr.io",
103 35
 		},
36
+		{
37
+			host:        "localhost:5000/v2/",
38
+			imagePrefix: "localhost:5000",
39
+		},
40
+		{
41
+			host:        "https://192.168.99.100:5000/v2",
42
+			imagePrefix: "192.168.99.100:5000",
43
+		},
44
+		{
45
+			host:        "https://my.domain.name:5000/v2",
46
+			imagePrefix: "my.domain.name:5000",
47
+		},
104 48
 		{
105 49
 			host:        "https://gcr.io",
106 50
 			imagePrefix: "gcr.io",
@@ -122,30 +66,25 @@ func TestRemoteFactory_ParseHost(t *testing.T) {
122 66
 			imagePrefix: "gcr.io",
123 67
 		},
124 68
 		{
125
-			host:        "",
126
-			imagePrefix: "gcr.io",
127
-			error:       true,
69
+			host:  "",
70
+			error: true,
128 71
 		},
129 72
 		{
130
-			host:        "https://",
131
-			imagePrefix: "gcr.io",
132
-			error:       true,
73
+			host:  "https://",
74
+			error: true,
133 75
 		},
134 76
 		{
135
-			host:        "^#invalid.io/v1/",
136
-			imagePrefix: "gcr.io",
137
-			error:       true,
77
+			host:  "^#invalid.io/v1/",
78
+			error: true,
138 79
 		},
139 80
 		{
140
-			host:        "/var/user",
141
-			imagePrefix: "gcr.io",
142
-			error:       true,
81
+			host:  "/var/user",
82
+			error: true,
143 83
 		},
144 84
 	} {
145
-
146
-		file, cleanup := writeCreds(t, fmt.Sprintf(tmpl, v.host, okCreds))
147
-		defer cleanup()
148
-		creds, err := CredentialsFromFile(file)
85
+		stringCreds := fmt.Sprintf(tmpl, v.host, okCreds)
86
+		creds, err := ParseCredentials([]byte(stringCreds))
87
+		time.Sleep(100 * time.Millisecond)
149 88
 		if (err != nil) != v.error {
150 89
 			t.Fatalf("For test %q, expected error = %v but got %v", v.host, v.error, err != nil)
151 90
 		}
@@ -157,3 +96,20 @@ func TestRemoteFactory_ParseHost(t *testing.T) {
157 96
 		}
158 97
 	}
159 98
 }
99
+
100
+func TestParseCreds_k8s(t *testing.T) {
101
+	k8sCreds := []byte(`{"localhost:5000":{"username":"testuser","password":"testpassword","email":"foo@bar.com","auth":"dGVzdHVzZXI6dGVzdHBhc3N3b3Jk"}}`)
102
+	c, err := ParseCredentials(k8sCreds)
103
+	if err != nil {
104
+		t.Fatal(err)
105
+	}
106
+	if len(c.Hosts()) != 1 {
107
+		t.Fatal("Invalid number of hosts", len(c.Hosts()))
108
+	} else if c.Hosts()[0] != "localhost:5000" {
109
+		t.Fatal("Host is incorrect: ", c.Hosts()[0])
110
+	} else if c.credsFor("localhost:5000").username != "testuser" {
111
+		t.Fatal("Invalid user", c.credsFor("localhost:5000").username)
112
+	} else if c.credsFor("localhost:5000").password != "testpassword" {
113
+		t.Fatal("Invalid user", c.credsFor("localhost:5000").password)
114
+	}
115
+}

+ 4
- 4
registry/integration_test.go View File

@@ -37,13 +37,11 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) {
37 37
 	logger := log.NewContext(log.NewLogfmtLogger(os.Stderr))
38 38
 
39 39
 	remote := NewRemoteClientFactory(
40
-		NoCredentials(),
41 40
 		logger.With("component", "client"),
42 41
 		middleware.RateLimiterConfig{200, 10},
43 42
 	)
44 43
 
45 44
 	cache := NewCacheClientFactory(
46
-		NoCredentials(),
47 45
 		logger.With("component", "cache"),
48 46
 		mc,
49 47
 		time.Hour,
@@ -73,8 +71,10 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) {
73 71
 	}()
74 72
 
75 73
 	shutdownWg.Add(1)
76
-	go w.Loop(shutdown, shutdownWg, func() []flux.ImageID {
77
-		return []flux.ImageID{id}
74
+	go w.Loop(shutdown, shutdownWg, func() ImageCreds {
75
+		return ImageCreds{
76
+			id: NoCredentials(),
77
+		}
78 78
 	})
79 79
 
80 80
 	timeout := time.NewTicker(10 * time.Second)    // Shouldn't take longer than 10s

+ 1
- 1
registry/mock.go View File

@@ -55,7 +55,7 @@ func NewMockClientFactory(c Client, err error) ClientFactory {
55 55
 	}
56 56
 }
57 57
 
58
-func (m *mockRemoteFactory) ClientFor(repository string) (Client, error) {
58
+func (m *mockRemoteFactory) ClientFor(repository string, creds Credentials) (Client, error) {
59 59
 	return m.c, m.err
60 60
 }
61 61
 

+ 3
- 3
registry/registry.go View File

@@ -37,7 +37,7 @@ type registry struct {
37 37
 	connections int
38 38
 }
39 39
 
40
-// NewClient creates a new registry registry, to use when fetching repositories.
40
+// NewRegistry creates a new registry, to use when fetching repositories.
41 41
 // Behind the scenes the registry will call ClientFactory.ClientFor(...)
42 42
 // when requesting an image. This will generate a Client to access the
43 43
 // backend.
@@ -58,7 +58,7 @@ func NewRegistry(c ClientFactory, l log.Logger, connections int) Registry {
58 58
 //   quay.io/foo/helloworld -> quay.io/foo/helloworld
59 59
 //
60 60
 func (reg *registry) GetRepository(id flux.ImageID) ([]flux.Image, error) {
61
-	client, err := reg.factory.ClientFor(id.Host)
61
+	client, err := reg.factory.ClientFor(id.Host, Credentials{})
62 62
 	if err != nil {
63 63
 		return nil, err
64 64
 	}
@@ -79,7 +79,7 @@ func (reg *registry) GetRepository(id flux.ImageID) ([]flux.Image, error) {
79 79
 
80 80
 // Get a single Image from the registry if it exists
81 81
 func (reg *registry) GetImage(id flux.ImageID) (flux.Image, error) {
82
-	client, err := reg.factory.ClientFor(id.Host)
82
+	client, err := reg.factory.ClientFor(id.Host, Credentials{})
83 83
 	if err != nil {
84 84
 		return flux.Image{}, err
85 85
 	}

+ 5
- 5
registry/registry_test.go View File

@@ -87,14 +87,14 @@ func TestRegistry_GetRepositoryManifestError(t *testing.T) {
87 87
 // It will fail if there is not internet connection
88 88
 func TestRemoteFactory_RawClient(t *testing.T) {
89 89
 	// No credentials required for public Image
90
-	fact := NewRemoteClientFactory(Credentials{}, log.NewNopLogger(), middleware.RateLimiterConfig{
90
+	fact := NewRemoteClientFactory(log.NewNopLogger(), middleware.RateLimiterConfig{
91 91
 		RPS:   200,
92 92
 		Burst: 1,
93 93
 	})
94 94
 
95 95
 	// Refresh tags first
96 96
 	var tags []string
97
-	client, err := fact.ClientFor(id.Host)
97
+	client, err := fact.ClientFor(id.Host, Credentials{})
98 98
 	if err != nil {
99 99
 		t.Fatal(err)
100 100
 	}
@@ -108,7 +108,7 @@ func TestRemoteFactory_RawClient(t *testing.T) {
108 108
 		t.Fatal("Should have some tags")
109 109
 	}
110 110
 
111
-	client, err = fact.ClientFor(id.Host)
111
+	client, err = fact.ClientFor(id.Host, Credentials{})
112 112
 	if err != nil {
113 113
 		t.Fatal(err)
114 114
 	}
@@ -127,12 +127,12 @@ func TestRemoteFactory_RawClient(t *testing.T) {
127 127
 }
128 128
 
129 129
 func TestRemoteFactory_InvalidHost(t *testing.T) {
130
-	fact := NewRemoteClientFactory(Credentials{}, log.NewNopLogger(), middleware.RateLimiterConfig{})
130
+	fact := NewRemoteClientFactory(log.NewNopLogger(), middleware.RateLimiterConfig{})
131 131
 	invalidId, err := flux.ParseImageID("invalid.host/library/alpine:latest")
132 132
 	if err != nil {
133 133
 		t.Fatal(err)
134 134
 	}
135
-	client, err := fact.ClientFor(invalidId.Host)
135
+	client, err := fact.ClientFor(invalidId.Host, Credentials{})
136 136
 	if err != nil {
137 137
 		return
138 138
 	}

+ 9
- 7
registry/warming.go View File

@@ -28,17 +28,19 @@ type Warmer struct {
28 28
 	Burst         int
29 29
 }
30 30
 
31
+type ImageCreds map[flux.ImageID]Credentials
32
+
31 33
 // Continuously get the images to populate the cache with, and
32 34
 // populate the cache with them.
33
-func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() []flux.ImageID) {
35
+func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() ImageCreds) {
34 36
 	defer wg.Done()
35 37
 
36 38
 	if w.Logger == nil || w.ClientFactory == nil || w.Expiry == 0 || w.Writer == nil || w.Reader == nil {
37 39
 		panic("registry.Warmer fields are nil")
38 40
 	}
39 41
 
40
-	for _, r := range imagesToFetchFunc() {
41
-		w.warm(r)
42
+	for k, v := range imagesToFetchFunc() {
43
+		w.warm(k, v)
42 44
 	}
43 45
 
44 46
 	newImages := time.Tick(askForNewImagesInterval)
@@ -48,15 +50,15 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
48 50
 			w.Logger.Log("stopping", "true")
49 51
 			return
50 52
 		case <-newImages:
51
-			for _, r := range imagesToFetchFunc() {
52
-				w.warm(r)
53
+			for k, v := range imagesToFetchFunc() {
54
+				w.warm(k, v)
53 55
 			}
54 56
 		}
55 57
 	}
56 58
 }
57 59
 
58
-func (w *Warmer) warm(id flux.ImageID) {
59
-	client, err := w.ClientFactory.ClientFor(id.Host)
60
+func (w *Warmer) warm(id flux.ImageID, creds Credentials) {
61
+	client, err := w.ClientFactory.ClientFor(id.Host, creds)
60 62
 	if err != nil {
61 63
 		w.Logger.Log("err", err.Error())
62 64
 		return

+ 8
- 0
site/faq.md View File

@@ -67,3 +67,11 @@ Now restart fluxd to re-read the k8s secret (if it is running):
67 67
 
68 68
 `kubectl delete $(kubectl get pod -o name -l name=flux)`
69 69
 
70
+### How do I use a private docker registry?
71
+
72
+Create a Kubernetes Secret with your docker credentials then add the
73
+name of this secret to your Pod manifest under the `imagePullSecrets`
74
+setting. Flux will read this value and parse the Kubernetes secret.
75
+
76
+For a guide showing how to do this, see the
77
+[Kubernetes documentation](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/).

+ 0
- 6
site/standalone/setup.md View File

@@ -106,9 +106,3 @@ arguments (examples with defaults): `--k8s-secret-name=flux-git-deploy`,
106 106
 Using an SSH key allows you to maintain control of the repository. You
107 107
 can revoke permission for `flux` to access the repository at any time
108 108
 by removing the deploy key.
109
-
110
-## Using a Private Registry
111
-
112
-Simply mount the registry credentials into the container. The location
113
-of the credentials can be customised with the argument (example with
114
-default): `--docker-config=~/.docker/config.json`

Loading…
Cancel
Save