Browse Source

Exclude resources if we cannot find their scope

This is aiming at custom resources whose scope is unknown because
its CRD is included in a helm chart Flux won't inspect (since it's
the helm operator's responsibility to do it).

This allows Flux to move forward and create the `HelmRelease` supplying the
CRD. In a subsequent sync, once we know its scope, the custom resources will be
created.
Alfonso Acosta 1 month ago
parent
commit
6f2eba2865

+ 1
- 1
cluster/kubernetes/doc.go View File

@@ -1,6 +1,6 @@
1 1
 /*
2 2
 Package kubernetes provides implementations of `Cluster` and
3
-`Manifests` that interact with the Kubernetes API (using kubectl or
3
+`manifests` that interact with the Kubernetes API (using kubectl or
4 4
 the k8s API client).
5 5
 */
6 6
 package kubernetes

+ 43
- 12
cluster/kubernetes/manifests.go View File

@@ -1,6 +1,10 @@
1 1
 package kubernetes
2 2
 
3 3
 import (
4
+	"fmt"
5
+	"strings"
6
+
7
+	"github.com/go-kit/kit/log"
4 8
 	"gopkg.in/yaml.v2"
5 9
 	"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
6 10
 	"k8s.io/apimachinery/pkg/runtime/schema"
@@ -24,13 +28,23 @@ type namespacer interface {
24 28
 	EffectiveNamespace(manifest kresource.KubeManifest, knownScopes ResourceScopes) (string, error)
25 29
 }
26 30
 
27
-// Manifests is an implementation of cluster.Manifests, particular to
31
+// manifests is an implementation of cluster.Manifests, particular to
28 32
 // Kubernetes. Aside from loading manifests from files, it does some
29 33
 // "post-processsing" to make sure the view of the manifests is what
30 34
 // would be applied; in particular, it fills in the namespace of
31 35
 // manifests that would be given a default namespace when applied.
32
-type Manifests struct {
33
-	Namespacer namespacer
36
+type manifests struct {
37
+	namespacer       namespacer
38
+	logger           log.Logger
39
+	resourceWarnings map[string]struct{}
40
+}
41
+
42
+func NewManifests(ns namespacer, logger log.Logger) *manifests {
43
+	return &manifests{
44
+		namespacer:       ns,
45
+		logger:           logger,
46
+		resourceWarnings: map[string]struct{}{},
47
+	}
34 48
 }
35 49
 
36 50
 func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
@@ -60,31 +74,48 @@ func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
60 74
 	return result
61 75
 }
62 76
 
63
-func setEffectiveNamespaces(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) {
77
+func (m *manifests) setEffectiveNamespaces(manifests map[string]kresource.KubeManifest) (map[string]resource.Resource, error) {
64 78
 	knownScopes := getCRDScopes(manifests)
65 79
 	result := map[string]resource.Resource{}
66 80
 	for _, km := range manifests {
67
-		if nser != nil {
68
-			ns, err := nser.EffectiveNamespace(km, knownScopes)
69
-			if err != nil {
70
-				return nil, err
81
+		resID := km.ResourceID()
82
+		resIDStr := resID.String()
83
+		ns, err := m.namespacer.EffectiveNamespace(km, knownScopes)
84
+		if err != nil {
85
+			if strings.Contains(err.Error(), "not found") {
86
+				// discard the resource and keep going after making sure we logged about it
87
+				if _, warningLogged := m.resourceWarnings[resIDStr]; !warningLogged {
88
+					_, kind, name := resID.Components()
89
+					partialResIDStr := kind + "/" + name
90
+					m.logger.Log(
91
+						"warn", fmt.Sprintf("cannot find scope of resource %s: %s", partialResIDStr, err),
92
+						"impact", fmt.Sprintf("resource %s will be excluded until its scope is available", partialResIDStr))
93
+					m.resourceWarnings[resIDStr] = struct{}{}
94
+				}
95
+				continue
71 96
 			}
72
-			km.SetNamespace(ns)
97
+			return nil, err
98
+		}
99
+		km.SetNamespace(ns)
100
+		if _, warningLogged := m.resourceWarnings[resIDStr]; warningLogged {
101
+			// indicate that we found the resource's scope and allow logging a warning again
102
+			m.logger.Log("info", fmt.Sprintf("found scope of resource %s, back in bussiness!", km.ResourceID().String()))
103
+			delete(m.resourceWarnings, resIDStr)
73 104
 		}
74 105
 		result[km.ResourceID().String()] = km
75 106
 	}
76 107
 	return result, nil
77 108
 }
78 109
 
79
-func (m *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
110
+func (m *manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
80 111
 	manifests, err := kresource.Load(base, paths)
81 112
 	if err != nil {
82 113
 		return nil, err
83 114
 	}
84
-	return setEffectiveNamespaces(manifests, m.Namespacer)
115
+	return m.setEffectiveNamespaces(manifests)
85 116
 }
86 117
 
87
-func (m *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
118
+func (m *manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
88 119
 	return updateWorkload(def, id, container, image)
89 120
 }
90 121
 

+ 78
- 10
cluster/kubernetes/manifests_test.go View File

@@ -1,21 +1,25 @@
1 1
 package kubernetes
2 2
 
3 3
 import (
4
+	"bytes"
4 5
 	"io/ioutil"
6
+	"os"
5 7
 	"path/filepath"
6 8
 	"testing"
7 9
 
10
+	"github.com/go-kit/kit/log"
11
+	"github.com/stretchr/testify/assert"
12
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
+
8 14
 	"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
9 15
 )
10 16
 
11
-func TestKnownCRDScope(t *testing.T) {
17
+func TestLocalCRDScope(t *testing.T) {
12 18
 	coreClient := makeFakeClient()
13 19
 
14 20
 	nser, err := NewNamespacer(coreClient.Discovery())
15
-	if err != nil {
16
-		t.Fatal(err)
17
-	}
18
-	manifests := Manifests{nser}
21
+	assert.NoError(t, err)
22
+	manifests := NewManifests(nser, log.NewLogfmtLogger(os.Stdout))
19 23
 
20 24
 	dir, cleanup := testfiles.TempDir(t)
21 25
 	defer cleanup()
@@ -46,17 +50,81 @@ metadata:
46 50
   namespace: bar
47 51
 `
48 52
 
49
-	if err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600); err != nil {
50
-		t.Fatal(err)
51
-	}
53
+	err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600)
54
+	assert.NoError(t, err)
52 55
 
53 56
 	resources, err := manifests.LoadManifests(dir, []string{dir})
54 57
 	if err != nil {
55 58
 		t.Fatal(err)
56 59
 	}
57 60
 
58
-	if _, ok := resources["bar:foo/fooinstance"]; !ok {
59
-		t.Fatal("couldn't find crd instance")
61
+	assert.Contains(t, resources, "bar:foo/fooinstance")
62
+}
63
+
64
+func TestUnKnownCRDScope(t *testing.T) {
65
+	coreClient := makeFakeClient()
66
+
67
+	nser, err := NewNamespacer(coreClient.Discovery())
68
+	assert.NoError(t, err)
69
+	logBuffer := bytes.NewBuffer(nil)
70
+	manifests := NewManifests(nser, log.NewLogfmtLogger(logBuffer))
71
+
72
+	dir, cleanup := testfiles.TempDir(t)
73
+	defer cleanup()
74
+	const defs = `---
75
+apiVersion: v1
76
+kind: Namespace
77
+metadata:
78
+  name: mynamespace
79
+---
80
+apiVersion: foo.example.com/v1beta1
81
+kind: Foo
82
+metadata:
83
+  name: fooinstance
84
+  namespace: bar
85
+`
86
+
87
+	err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600)
88
+	assert.NoError(t, err)
89
+
90
+	resources, err := manifests.LoadManifests(dir, []string{dir})
91
+	assert.NoError(t, err)
92
+
93
+	// can't contain the CRD since we cannot figure out its scope
94
+	assert.NotContains(t, resources, "bar:foo/fooinstance")
95
+
96
+	// however, it should contain the namespace
97
+	assert.Contains(t, resources, "<cluster>:namespace/mynamespace")
98
+
99
+	savedLog := logBuffer.String()
100
+	// and we should had logged a warning about it
101
+	assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance")
102
+
103
+	// loading again shouldn't result in more warnings
104
+	resources, err = manifests.LoadManifests(dir, []string{dir})
105
+	assert.NoError(t, err)
106
+	assert.Equal(t, logBuffer.String(), savedLog)
107
+
108
+	// But getting the scope of the CRD should result in a log saying we found the scope
109
+	apiResourcesWithoutFoo := coreClient.Resources
110
+	apiResource := &metav1.APIResourceList{
111
+		GroupVersion: "foo.example.com/v1beta1",
112
+		APIResources: []metav1.APIResource{
113
+			{Name: "foos", SingularName: "foo", Namespaced: true, Kind: "Foo"},
114
+		},
60 115
 	}
116
+	coreClient.Resources = append(coreClient.Resources, apiResource)
117
+
118
+	logBuffer.Reset()
119
+	resources, err = manifests.LoadManifests(dir, []string{dir})
120
+	assert.NoError(t, err)
121
+	assert.Len(t, resources, 2)
122
+	assert.Contains(t, logBuffer.String(), "found scope of resource bar:foo/fooinstance")
61 123
 
124
+	// and missing the scope information again should result in another warning
125
+	coreClient.Resources = apiResourcesWithoutFoo
126
+	logBuffer.Reset()
127
+	resources, err = manifests.LoadManifests(dir, []string{dir})
128
+	assert.NoError(t, err)
129
+	assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance")
62 130
 }

+ 9
- 0
cluster/kubernetes/mock.go View File

@@ -0,0 +1,9 @@
1
+package kubernetes
2
+
3
+import kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
4
+
5
+type ConstNamespacer string
6
+
7
+func (ns ConstNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) {
8
+	return string(ns), nil
9
+}

+ 1
- 0
cluster/kubernetes/namespacer.go View File

@@ -97,5 +97,6 @@ func (n *namespaceViaDiscovery) lookupNamespacedInCluster(groupVersion, kind str
97 97
 			return resource.Namespaced, nil
98 98
 		}
99 99
 	}
100
+
100 101
 	return false, fmt.Errorf("resource not found for API %s, kind %s", groupVersion, kind)
101 102
 }

+ 3
- 3
cluster/kubernetes/policies.go View File

@@ -11,7 +11,7 @@ import (
11 11
 	"github.com/weaveworks/flux/resource"
12 12
 )
13 13
 
14
-func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
14
+func (m *manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
15 15
 	ns, kind, name := id.Components()
16 16
 	add, del := update.Add, update.Remove
17 17
 
@@ -48,7 +48,7 @@ func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy
48 48
 	return (KubeYAML{}).Annotate(def, ns, kind, name, args...)
49 49
 }
50 50
 
51
-func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
51
+func (m *manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
52 52
 	kresources, err := kresource.ParseMultidoc(def, "stdin")
53 53
 	if err != nil {
54 54
 		return nil, err
@@ -58,7 +58,7 @@ func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([
58 58
 	// We could get out of our way to fix this (or give a better error) but:
59 59
 	// 1. With the exception of HelmReleases CRD instances are not workloads anyways.
60 60
 	// 2. The problem is eventually fixed by the first successful sync.
61
-	resources, err := setEffectiveNamespaces(kresources, m.Namespacer)
61
+	resources, err := m.setEffectiveNamespaces(kresources)
62 62
 	if err != nil {
63 63
 		return nil, err
64 64
 	}

+ 5
- 9
cluster/kubernetes/policies_test.go View File

@@ -2,22 +2,17 @@ package kubernetes
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"os"
5 6
 	"testing"
6 7
 	"text/template"
7 8
 
9
+	"github.com/go-kit/kit/log"
8 10
 	"github.com/stretchr/testify/assert"
9 11
 
10 12
 	"github.com/weaveworks/flux"
11
-	kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
12 13
 	"github.com/weaveworks/flux/policy"
13 14
 )
14 15
 
15
-type constNamespacer string
16
-
17
-func (ns constNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) {
18
-	return string(ns), nil
19
-}
20
-
21 16
 func TestUpdatePolicies(t *testing.T) {
22 17
 	for _, c := range []struct {
23 18
 		name    string
@@ -186,7 +181,8 @@ func TestUpdatePolicies(t *testing.T) {
186 181
 			caseIn := templToString(t, annotationsTemplate, c.in)
187 182
 			caseOut := templToString(t, annotationsTemplate, c.out)
188 183
 			resourceID := flux.MustParseResourceID("default:deployment/nginx")
189
-			out, err := (&Manifests{constNamespacer("default")}).UpdatePolicies([]byte(caseIn), resourceID, c.update)
184
+			manifests := NewManifests(ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout))
185
+			out, err := manifests.UpdatePolicies([]byte(caseIn), resourceID, c.update)
190 186
 			assert.Equal(t, c.wantErr, err != nil, "unexpected error value: %s", err)
191 187
 			if !c.wantErr {
192 188
 				assert.Equal(t, string(out), caseOut)
@@ -200,7 +196,7 @@ func TestUpdatePolicies_invalidTagPattern(t *testing.T) {
200 196
 	update := policy.Update{
201 197
 		Add: policy.Set{policy.TagPrefix("nginx"): "semver:invalid"},
202 198
 	}
203
-	_, err := (&Manifests{}).UpdatePolicies(nil, resourceID, update)
199
+	_, err := (&manifests{}).UpdatePolicies(nil, resourceID, update)
204 200
 	assert.Error(t, err)
205 201
 }
206 202
 

+ 3
- 1
cluster/kubernetes/sync_test.go View File

@@ -2,6 +2,7 @@ package kubernetes
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"os"
5 6
 	"sort"
6 7
 	"strings"
7 8
 	"testing"
@@ -313,6 +314,7 @@ metadata:
313 314
 		if err != nil {
314 315
 			t.Fatal(err)
315 316
 		}
317
+		manifests := NewManifests(namespacer, log.NewLogfmtLogger(os.Stdout))
316 318
 
317 319
 		resources0, err := kresource.ParseMultidoc([]byte(defs), "before")
318 320
 		if err != nil {
@@ -320,7 +322,7 @@ metadata:
320 322
 		}
321 323
 
322 324
 		// Needed to get from KubeManifest to resource.Resource
323
-		resources, err := setEffectiveNamespaces(resources0, namespacer)
325
+		resources, err := manifests.setEffectiveNamespaces(resources0)
324 326
 		if err != nil {
325 327
 			t.Fatal(err)
326 328
 		}

+ 3
- 4
cmd/fluxd/main.go View File

@@ -271,7 +271,7 @@ func main() {
271 271
 	var clusterVersion string
272 272
 	var sshKeyRing ssh.KeyRing
273 273
 	var k8s cluster.Cluster
274
-	var k8sManifests *kubernetes.Manifests
274
+	var k8sManifests cluster.Manifests
275 275
 	var imageCreds func() registry.ImageCreds
276 276
 	{
277 277
 		restClientConfig, err := rest.InClusterConfig()
@@ -369,13 +369,12 @@ func main() {
369 369
 		imageCreds = k8sInst.ImagesToFetch
370 370
 		// There is only one way we currently interpret a repo of
371 371
 		// files as manifests, and that's as Kubernetes yamels.
372
-		k8sManifests = &kubernetes.Manifests{}
373
-		k8sManifests.Namespacer, err = kubernetes.NewNamespacer(discoClientset)
374
-
372
+		namespacer, err := kubernetes.NewNamespacer(discoClientset)
375 373
 		if err != nil {
376 374
 			logger.Log("err", err)
377 375
 			os.Exit(1)
378 376
 		}
377
+		k8sManifests = kubernetes.NewManifests(namespacer, logger)
379 378
 	}
380 379
 
381 380
 	// Wrap the procedure for collecting images to scan

+ 3
- 1
daemon/daemon_test.go View File

@@ -731,12 +731,14 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven
731 731
 	// Jobs queue (starts itself)
732 732
 	jobs := job.NewQueue(jshutdown, jwg)
733 733
 
734
+	manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout))
735
+
734 736
 	// Finally, the daemon
735 737
 	d := &Daemon{
736 738
 		Repo:           repo,
737 739
 		GitConfig:      params,
738 740
 		Cluster:        k8s,
739
-		Manifests:      &kubernetes.Manifests{Namespacer: alwaysDefault},
741
+		Manifests:      manifests,
740 742
 		Registry:       imageRegistry,
741 743
 		V:              testVersion,
742 744
 		Jobs:           jobs,

+ 3
- 1
daemon/loop_test.go View File

@@ -60,10 +60,12 @@ func daemon(t *testing.T) (*Daemon, func()) {
60 60
 		UserEmail: gitEmail,
61 61
 	}
62 62
 
63
+	manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout))
64
+
63 65
 	jobs := job.NewQueue(shutdown, wg)
64 66
 	d := &Daemon{
65 67
 		Cluster:        k8s,
66
-		Manifests:      &kubernetes.Manifests{Namespacer: alwaysDefault},
68
+		Manifests:      manifests,
67 69
 		Registry:       &registryMock.Registry{},
68 70
 		Repo:           repo,
69 71
 		GitConfig:      gitConfig,

+ 8
- 13
release/releaser_test.go View File

@@ -3,6 +3,7 @@ package release
3 3
 import (
4 4
 	"errors"
5 5
 	"fmt"
6
+	"os"
6 7
 	"reflect"
7 8
 	"testing"
8 9
 	"time"
@@ -13,7 +14,6 @@ import (
13 14
 	"github.com/weaveworks/flux"
14 15
 	"github.com/weaveworks/flux/cluster"
15 16
 	"github.com/weaveworks/flux/cluster/kubernetes"
16
-	kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
17 17
 	"github.com/weaveworks/flux/git"
18 18
 	"github.com/weaveworks/flux/git/gittest"
19 19
 	"github.com/weaveworks/flux/image"
@@ -22,12 +22,6 @@ import (
22 22
 	"github.com/weaveworks/flux/update"
23 23
 )
24 24
 
25
-type constNamespacer string
26
-
27
-func (ns constNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ kubernetes.ResourceScopes) (string, error) {
28
-	return string(ns), nil
29
-}
30
-
31 25
 var (
32 26
 	// This must match the value in cluster/kubernetes/testfiles/data.go
33 27
 	helloContainer   = "greeter"
@@ -142,7 +136,7 @@ var (
142 136
 			},
143 137
 		},
144 138
 	}
145
-	mockManifests = &kubernetes.Manifests{Namespacer: constNamespacer("default")}
139
+	mockManifests = kubernetes.NewManifests(kubernetes.ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout))
146 140
 )
147 141
 
148 142
 func mockCluster(running ...cluster.Workload) *cluster.Mock {
@@ -1055,9 +1049,9 @@ func testRelease(t *testing.T, ctx *ReleaseContext, spec update.ReleaseImageSpec
1055 1049
 
1056 1050
 // --- test verification
1057 1051
 
1058
-// A Manifests implementation that does updates incorrectly, so they should fail verification.
1052
+// A manifests implementation that does updates incorrectly, so they should fail verification.
1059 1053
 type badManifests struct {
1060
-	kubernetes.Manifests
1054
+	cluster.Manifests
1061 1055
 }
1062 1056
 
1063 1057
 func (m *badManifests) UpdateImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) {
@@ -1075,15 +1069,16 @@ func Test_BadRelease(t *testing.T) {
1075 1069
 	checkout1, cleanup1 := setup(t)
1076 1070
 	defer cleanup1()
1077 1071
 
1072
+	manifests := kubernetes.NewManifests(kubernetes.ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout))
1078 1073
 	ctx := &ReleaseContext{
1079 1074
 		cluster:   cluster,
1080
-		manifests: &kubernetes.Manifests{},
1075
+		manifests: manifests,
1081 1076
 		repo:      checkout1,
1082 1077
 		registry:  mockRegistry,
1083 1078
 	}
1084 1079
 	_, err := Release(ctx, spec, log.NewNopLogger())
1085 1080
 	if err != nil {
1086
-		t.Fatal("release with 'good' Manifests should succeed, but errored:", err)
1081
+		t.Fatal("release with 'good' manifests should succeed, but errored:", err)
1087 1082
 	}
1088 1083
 
1089 1084
 	checkout2, cleanup2 := setup(t)
@@ -1091,7 +1086,7 @@ func Test_BadRelease(t *testing.T) {
1091 1086
 
1092 1087
 	ctx = &ReleaseContext{
1093 1088
 		cluster:   cluster,
1094
-		manifests: &badManifests{Manifests: kubernetes.Manifests{constNamespacer("default")}},
1089
+		manifests: &badManifests{manifests},
1095 1090
 		repo:      checkout2,
1096 1091
 		registry:  mockRegistry,
1097 1092
 	}

+ 3
- 1
sync/sync_test.go View File

@@ -1,8 +1,10 @@
1 1
 package sync
2 2
 
3 3
 import (
4
+	"os"
4 5
 	"testing"
5 6
 
7
+	"github.com/go-kit/kit/log"
6 8
 	"github.com/stretchr/testify/assert"
7 9
 
8 10
 	"github.com/weaveworks/flux/cluster"
@@ -19,7 +21,7 @@ func TestSync(t *testing.T) {
19 21
 	defer cleanup()
20 22
 
21 23
 	// Start with nothing running. We should be told to apply all the things.
22
-	manifests := &kubernetes.Manifests{}
24
+	manifests := kubernetes.NewManifests(kubernetes.ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout))
23 25
 	clus := &syncCluster{map[string]string{}}
24 26
 
25 27
 	dirs := checkout.ManifestDirs()

Loading…
Cancel
Save