Browse Source

Report sync errors including source file

Include an error report in the sync notification, with resources
v. errors (with the latter coming from `kubectl` most likely).

The most useful bit of information when a resource fails to sync --
more useful than the error from kubectl, even -- is the file that had
a problem. Include that in the notification.

Secondarily: to avoid having a long, tmpfile path in messages, make
the source of resources relative to the repo.
Michael Bridgen 2 years ago
parent
commit
4c92f1d2f0

+ 4
- 3
cluster/kubernetes/files.go View File

@@ -1,10 +1,11 @@
1 1
 package kubernetes
2 2
 
3 3
 import (
4
+	"path/filepath"
5
+
4 6
 	"github.com/pkg/errors"
5 7
 
6 8
 	"github.com/weaveworks/flux"
7
-
8 9
 	"github.com/weaveworks/flux/cluster/kubernetes/resource"
9 10
 )
10 11
 
@@ -13,7 +14,7 @@ import (
13 14
 // specified namespace and name) to the paths of resource definition
14 15
 // files.
15 16
 func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]string, error) {
16
-	objects, err := resource.Load(path)
17
+	objects, err := resource.Load(path, path)
17 18
 	if err != nil {
18 19
 		return nil, errors.Wrap(err, "loading resources")
19 20
 	}
@@ -23,7 +24,7 @@ func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]stri
23 24
 		id := obj.ResourceID()
24 25
 		_, kind, _ := id.Components()
25 26
 		if _, ok := resourceKinds[kind]; ok {
26
-			result[id] = append(result[id], obj.Source())
27
+			result[id] = append(result[id], filepath.Join(path, obj.Source()))
27 28
 		}
28 29
 	}
29 30
 	return result, nil

+ 3
- 3
cluster/kubernetes/kubernetes.go View File

@@ -114,7 +114,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) {
114 114
 }
115 115
 
116 116
 type Applier interface {
117
-	apply(log.Logger, changeSet) cluster.SyncErrors
117
+	apply(log.Logger, changeSet) cluster.SyncError
118 118
 }
119 119
 
120 120
 // Cluster is a handle to a Kubernetes API server.
@@ -221,7 +221,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {
221 221
 	logger := log.With(c.logger, "method", "Sync")
222 222
 
223 223
 	cs := makeChangeSet()
224
-	var errs cluster.SyncErrors
224
+	var errs cluster.SyncError
225 225
 	for _, action := range spec.Actions {
226 226
 		stages := []struct {
227 227
 			res resource.Resource
@@ -239,7 +239,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {
239 239
 				obj.Resource = stage.res
240 240
 				cs.stage(stage.cmd, obj)
241 241
 			} else {
242
-				errs = append(errs, cluster.SyncError{Resource: stage.res, Error: err})
242
+				errs = append(errs, cluster.ResourceError{Resource: stage.res, Error: err})
243 243
 				break
244 244
 			}
245 245
 		}

+ 1
- 1
cluster/kubernetes/kubernetes_test.go View File

@@ -14,7 +14,7 @@ type mockApplier struct {
14 14
 	commandRun bool
15 15
 }
16 16
 
17
-func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncErrors {
17
+func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
18 18
 	if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 {
19 19
 		m.commandRun = true
20 20
 	}

+ 2
- 2
cluster/kubernetes/manifests.go View File

@@ -11,8 +11,8 @@ type Manifests struct {
11 11
 
12 12
 // FindDefinedServices implementation in files.go
13 13
 
14
-func (c *Manifests) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
15
-	return kresource.Load(paths...)
14
+func (c *Manifests) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
15
+	return kresource.Load(base, first, rest...)
16 16
 }
17 17
 
18 18
 func (c *Manifests) ParseManifests(allDefs []byte) (map[string]resource.Resource, error) {

+ 2
- 2
cluster/kubernetes/release.go View File

@@ -52,7 +52,7 @@ func (c *Kubectl) connectArgs() []string {
52 52
 	return args
53 53
 }
54 54
 
55
-func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncErrors) {
55
+func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
56 56
 	f := func(m map[string][]*apiObject, cmd string, args ...string) {
57 57
 		objs := m[cmd]
58 58
 		if len(objs) == 0 {
@@ -64,7 +64,7 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError
64 64
 			for _, obj := range objs {
65 65
 				r := bytes.NewReader(obj.Bytes())
66 66
 				if err := c.doCommand(logger, r, args...); err != nil {
67
-					errs = append(errs, cluster.SyncError{obj.Resource, err})
67
+					errs = append(errs, cluster.ResourceError{obj.Resource, err})
68 68
 				}
69 69
 			}
70 70
 		}

+ 8
- 3
cluster/kubernetes/resource/load.go View File

@@ -15,7 +15,8 @@ import (
15 15
 // Load takes paths to directories or files, and creates an object set
16 16
 // based on the file(s) therein. Resources are named according to the
17 17
 // file content, rather than the file name of directory structure.
18
-func Load(roots ...string) (map[string]resource.Resource, error) {
18
+func Load(base, atLeastOne string, more ...string) (map[string]resource.Resource, error) {
19
+	roots := append([]string{atLeastOne}, more...)
19 20
 	objs := map[string]resource.Resource{}
20 21
 	for _, root := range roots {
21 22
 		err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
@@ -32,13 +33,17 @@ func Load(roots ...string) (map[string]resource.Resource, error) {
32 33
 				if err != nil {
33 34
 					return errors.Wrapf(err, "reading file at %q", path)
34 35
 				}
35
-				docsInFile, err := ParseMultidoc(bytes, path)
36
+				source, err := filepath.Rel(base, path)
37
+				if err != nil {
38
+					return errors.Wrapf(err, "finding relative path for %q", path)
39
+				}
40
+				docsInFile, err := ParseMultidoc(bytes, source)
36 41
 				if err != nil {
37 42
 					return errors.Wrapf(err, "parsing file at %q", path)
38 43
 				}
39 44
 				for id, obj := range docsInFile {
40 45
 					if alreadyDefined, ok := objs[id]; ok {
41
-						return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), path)
46
+						return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), source)
42 47
 					}
43 48
 					objs[id] = obj
44 49
 				}

+ 1
- 1
cluster/kubernetes/resource/load_test.go View File

@@ -132,7 +132,7 @@ func TestLoadSome(t *testing.T) {
132 132
 	if err := testfiles.WriteTestFiles(dir); err != nil {
133 133
 		t.Fatal(err)
134 134
 	}
135
-	objs, err := Load(dir)
135
+	objs, err := Load(dir, dir)
136 136
 	if err != nil {
137 137
 		t.Error(err)
138 138
 	}

+ 5
- 2
cluster/manifests.go View File

@@ -20,8 +20,11 @@ type Manifests interface {
20 20
 	// Update the definitions in a manifests bytes according to the
21 21
 	// spec given.
22 22
 	UpdateDefinition(def []byte, container string, newImageID image.Ref) ([]byte, error)
23
-	// Load all the resource manifests under the path given
24
-	LoadManifests(paths ...string) (map[string]resource.Resource, error)
23
+	// Load all the resource manifests under the path given. `baseDir`
24
+	// is used to relativise the paths, which are supplied as absolute
25
+	// paths to directories or files; at least one path must be
26
+	// supplied.
27
+	LoadManifests(baseDir, first string, rest ...string) (map[string]resource.Resource, error)
25 28
 	// Parse the manifests given in an exported blob
26 29
 	ParseManifests([]byte) (map[string]resource.Resource, error)
27 30
 	// UpdatePolicies modifies a manifest to apply the policy update specified

+ 3
- 3
cluster/mock.go View File

@@ -18,7 +18,7 @@ type Mock struct {
18 18
 	PublicSSHKeyFunc         func(regenerate bool) (ssh.PublicKey, error)
19 19
 	FindDefinedServicesFunc  func(path string) (map[flux.ResourceID][]string, error)
20 20
 	UpdateDefinitionFunc     func(def []byte, container string, newImageID image.Ref) ([]byte, error)
21
-	LoadManifestsFunc        func(paths ...string) (map[string]resource.Resource, error)
21
+	LoadManifestsFunc        func(base, first string, rest ...string) (map[string]resource.Resource, error)
22 22
 	ParseManifestsFunc       func([]byte) (map[string]resource.Resource, error)
23 23
 	UpdateManifestFunc       func(path, resourceID string, f func(def []byte) ([]byte, error)) error
24 24
 	UpdatePoliciesFunc       func([]byte, policy.Update) ([]byte, error)
@@ -57,8 +57,8 @@ func (m *Mock) UpdateDefinition(def []byte, container string, newImageID image.R
57 57
 	return m.UpdateDefinitionFunc(def, container, newImageID)
58 58
 }
59 59
 
60
-func (m *Mock) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
61
-	return m.LoadManifestsFunc(paths...)
60
+func (m *Mock) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
61
+	return m.LoadManifestsFunc(base, first, rest...)
62 62
 }
63 63
 
64 64
 func (m *Mock) ParseManifests(def []byte) (map[string]resource.Resource, error) {

+ 5
- 5
cluster/sync.go View File

@@ -11,8 +11,8 @@ import (
11 11
 // SyncAction represents either the deletion or application (create or
12 12
 // update) of a resource.
13 13
 type SyncAction struct {
14
-	Delete     resource.Resource // ) one of these
15
-	Apply      resource.Resource // )
14
+	Delete resource.Resource // ) one of these
15
+	Apply  resource.Resource // )
16 16
 }
17 17
 
18 18
 type SyncDef struct {
@@ -20,14 +20,14 @@ type SyncDef struct {
20 20
 	Actions []SyncAction
21 21
 }
22 22
 
23
-type SyncError struct {
23
+type ResourceError struct {
24 24
 	resource.Resource
25 25
 	Error error
26 26
 }
27 27
 
28
-type SyncErrors []SyncError
28
+type SyncError []ResourceError
29 29
 
30
-func (err SyncErrors) Error() string {
30
+func (err SyncError) Error() string {
31 31
 	var errs []string
32 32
 	for _, e := range err {
33 33
 		errs = append(errs, e.ResourceID().String()+": "+e.Error.Error())

+ 1
- 1
daemon/daemon_test.go View File

@@ -291,7 +291,7 @@ func TestDaemon_PolicyUpdate(t *testing.T) {
291 291
 			return false
292 292
 		}
293 293
 		defer co.Clean()
294
-		m, err := d.Manifests.LoadManifests(co.ManifestDir())
294
+		m, err := d.Manifests.LoadManifests(co.Dir(), co.ManifestDir())
295 295
 		if err != nil {
296 296
 			t.Fatalf("Error: %s", err.Error())
297 297
 		}

+ 15
- 10
daemon/loop.go View File

@@ -13,6 +13,7 @@ import (
13 13
 	"context"
14 14
 
15 15
 	"github.com/weaveworks/flux"
16
+	"github.com/weaveworks/flux/cluster"
16 17
 	"github.com/weaveworks/flux/event"
17 18
 	"github.com/weaveworks/flux/git"
18 19
 	fluxmetrics "github.com/weaveworks/flux/metrics"
@@ -187,21 +188,24 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
187 188
 	}
188 189
 
189 190
 	// Get a map of all resources defined in the repo
190
-	allResources, err := d.Manifests.LoadManifests(working.ManifestDir())
191
+	allResources, err := d.Manifests.LoadManifests(working.Dir(), working.ManifestDir())
191 192
 	if err != nil {
192 193
 		return errors.Wrap(err, "loading resources from repo")
193 194
 	}
194 195
 
196
+	var syncErrors map[string]string
195 197
 	// TODO supply deletes argument from somewhere (command-line?)
196 198
 	if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil {
197 199
 		logger.Log("err", err)
198
-		// TODO(michael): we should distinguish between "fully mostly
199
-		// succeeded" and "failed utterly", since we want to abandon
200
-		// this and not move the tag (and send a SyncFail event
201
-		// upstream?), if the latter. For now, it's presumed that any
202
-		// error returned is at worst a minor, partial failure (e.g.,
203
-		// a small number of resources failed to sync, for unimportant
204
-		// reasons)
200
+		switch syncerr := err.(type) {
201
+		case cluster.SyncError:
202
+			syncErrors = map[string]string{}
203
+			for _, e := range syncerr {
204
+				syncErrors[fmt.Sprintf("%s (%s)", e.ResourceID(), e.Source())] = e.Error.Error()
205
+			}
206
+		default:
207
+			return err
208
+		}
205 209
 	}
206 210
 
207 211
 	// update notes and emit events for applied commits
@@ -232,9 +236,9 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
232 236
 	} else {
233 237
 		ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
234 238
 		changedFiles, err := working.ChangedFiles(ctx, oldTagRev)
235
-		if err == nil {
239
+		if err == nil && len(changedFiles) > 0 {
236 240
 			// We had some changed files, we're syncing a diff
237
-			changedResources, err = d.Manifests.LoadManifests(changedFiles...)
241
+			changedResources, err = d.Manifests.LoadManifests(working.Dir(), changedFiles[0], changedFiles[1:]...)
238 242
 		}
239 243
 		cancel()
240 244
 		if err != nil {
@@ -361,6 +365,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
361 365
 				Commits:     cs,
362 366
 				InitialSync: initialSync,
363 367
 				Includes:    includes,
368
+				Errors:      syncErrors,
364 369
 			},
365 370
 		}); err != nil {
366 371
 			logger.Log("err", err)

+ 2
- 0
event/event.go View File

@@ -204,6 +204,8 @@ type SyncEventMetadata struct {
204 204
 	// policy changes, and "other" (meaning things we didn't commit
205 205
 	// ourselves)
206 206
 	Includes map[string]bool `json:"includes,omitempty"`
207
+	// Per-resource errors
208
+	Errors map[string]string `json:"errors,omitempty"`
207 209
 	// `true` if we have no record of having synced before
208 210
 	InitialSync bool `json:"initialSync,omitempty"`
209 211
 }

+ 6
- 1
git/working.go View File

@@ -84,7 +84,12 @@ func (c *Checkout) Clean() {
84 84
 	}
85 85
 }
86 86
 
87
-// ManifestDir returns a path to where the files are
87
+// Dir returns the path to the repo
88
+func (c *Checkout) Dir() string {
89
+	return c.dir
90
+}
91
+
92
+// ManifestDir returns the path to the manifests files
88 93
 func (c *Checkout) ManifestDir() string {
89 94
 	return filepath.Join(c.dir, c.config.Path)
90 95
 }

+ 3
- 3
sync/sync_test.go View File

@@ -34,7 +34,7 @@ func TestSync(t *testing.T) {
34 34
 	manifests := &kubernetes.Manifests{}
35 35
 	var clus cluster.Cluster = &syncCluster{mockCluster, map[string][]byte{}}
36 36
 
37
-	resources, err := manifests.LoadManifests(checkout.ManifestDir())
37
+	resources, err := manifests.LoadManifests(checkout.Dir(), checkout.ManifestDir())
38 38
 	if err != nil {
39 39
 		t.Fatal(err)
40 40
 	}
@@ -55,7 +55,7 @@ func TestSync(t *testing.T) {
55 55
 		break
56 56
 	}
57 57
 
58
-	resources, err = manifests.LoadManifests(checkout.ManifestDir())
58
+	resources, err = manifests.LoadManifests(checkout.Dir(), checkout.ManifestDir())
59 59
 	if err != nil {
60 60
 		t.Fatal(err)
61 61
 	}
@@ -244,7 +244,7 @@ func checkClusterMatchesFiles(t *testing.T, m cluster.Manifests, c cluster.Clust
244 244
 	if err != nil {
245 245
 		t.Fatal(err)
246 246
 	}
247
-	files, err := m.LoadManifests(dir)
247
+	files, err := m.LoadManifests(dir, dir)
248 248
 	if err != nil {
249 249
 		t.Fatal(err)
250 250
 	}

Loading…
Cancel
Save