Browse Source

Pass along context to methods making API calls

Hidde Beydals 7 months ago
parent
commit
bfdfe314fe

+ 4
- 3
cluster/cluster.go View File

@@ -1,6 +1,7 @@
1 1
 package cluster
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"errors"
5 6
 
6 7
 	"github.com/weaveworks/flux"
@@ -25,11 +26,11 @@ const (
25 26
 // are distinct interfaces.
26 27
 type Cluster interface {
27 28
 	// Get all of the services (optionally, from a specific namespace), excluding those
28
-	AllWorkloads(maybeNamespace string) ([]Workload, error)
29
-	SomeWorkloads([]flux.ResourceID) ([]Workload, error)
29
+	AllWorkloads(ctx context.Context, maybeNamespace string) ([]Workload, error)
30
+	SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]Workload, error)
30 31
 	IsAllowedResource(flux.ResourceID) bool
31 32
 	Ping() error
32
-	Export() ([]byte, error)
33
+	Export(ctx context.Context) ([]byte, error)
33 34
 	Sync(SyncSet) error
34 35
 	PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
35 36
 }

+ 4
- 2
cluster/kubernetes/images.go View File

@@ -1,6 +1,7 @@
1 1
 package kubernetes
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"fmt"
5 6
 
6 7
 	"github.com/go-kit/kit/log"
@@ -122,8 +123,9 @@ func mergeCredentials(log func(...interface{}) error,
122 123
 // ImagesToFetch is a k8s specific method to get a list of images to update along with their credentials
123 124
 func (c *Cluster) ImagesToFetch() registry.ImageCreds {
124 125
 	allImageCreds := make(registry.ImageCreds)
126
+	ctx := context.Background()
125 127
 
126
-	namespaces, err := c.getAllowedAndExistingNamespaces()
128
+	namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
127 129
 	if err != nil {
128 130
 		c.logger.Log("err", errors.Wrap(err, "getting namespaces"))
129 131
 		return allImageCreds
@@ -132,7 +134,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {
132 134
 	for _, ns := range namespaces {
133 135
 		seenCreds := make(map[string]registry.Credentials)
134 136
 		for kind, resourceKind := range resourceKinds {
135
-			workloads, err := resourceKind.getWorkloads(c, ns.Name)
137
+			workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
136 138
 			if err != nil {
137 139
 				if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) {
138 140
 					// Skip unsupported or forbidden resource kinds

+ 16
- 9
cluster/kubernetes/kubernetes.go View File

@@ -2,6 +2,7 @@ package kubernetes
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"context"
5 6
 	"encoding/json"
6 7
 	"fmt"
7 8
 	"sync"
@@ -127,7 +128,7 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,
127 128
 // SomeWorkloads returns the workloads named, missing out any that don't
128 129
 // exist in the cluster or aren't in an allowed namespace.
129 130
 // They do not necessarily have to be returned in the order requested.
130
-func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) {
131
+func (c *Cluster) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) (res []cluster.Workload, err error) {
131 132
 	var workloads []cluster.Workload
132 133
 	for _, id := range ids {
133 134
 		if !c.IsAllowedResource(id) {
@@ -141,7 +142,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
141 142
 			continue
142 143
 		}
143 144
 
144
-		workload, err := resourceKind.getWorkload(c, ns, name)
145
+		workload, err := resourceKind.getWorkload(ctx, c, ns, name)
145 146
 		if err != nil {
146 147
 			if apierrors.IsForbidden(err) || apierrors.IsNotFound(err) {
147 148
 				continue
@@ -161,8 +162,8 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
161 162
 
162 163
 // AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in
163 164
 // the namespace (or any namespace if that argument is empty)
164
-func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) {
165
-	namespaces, err := c.getAllowedAndExistingNamespaces()
165
+func (c *Cluster) AllWorkloads(ctx context.Context, namespace string) (res []cluster.Workload, err error) {
166
+	namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
166 167
 	if err != nil {
167 168
 		return nil, errors.Wrap(err, "getting namespaces")
168 169
 	}
@@ -174,7 +175,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er
174 175
 		}
175 176
 
176 177
 		for kind, resourceKind := range resourceKinds {
177
-			workloads, err := resourceKind.getWorkloads(c, ns.Name)
178
+			workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
178 179
 			if err != nil {
179 180
 				switch {
180 181
 				case apierrors.IsNotFound(err):
@@ -219,10 +220,10 @@ func (c *Cluster) Ping() error {
219 220
 }
220 221
 
221 222
 // Export exports cluster resources
222
-func (c *Cluster) Export() ([]byte, error) {
223
+func (c *Cluster) Export(ctx context.Context) ([]byte, error) {
223 224
 	var config bytes.Buffer
224 225
 
225
-	namespaces, err := c.getAllowedAndExistingNamespaces()
226
+	namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
226 227
 	if err != nil {
227 228
 		return nil, errors.Wrap(err, "getting namespaces")
228 229
 	}
@@ -240,7 +241,7 @@ func (c *Cluster) Export() ([]byte, error) {
240 241
 		}
241 242
 
242 243
 		for _, resourceKind := range resourceKinds {
243
-			workloads, err := resourceKind.getWorkloads(c, ns.Name)
244
+			workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
244 245
 			if err != nil {
245 246
 				switch {
246 247
 				case apierrors.IsNotFound(err):
@@ -281,10 +282,13 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
281 282
 // the Flux instance is expected to have access to and can look for resources inside of.
282 283
 // It returns a list of all namespaces unless an explicit list of allowed namespaces
283 284
 // has been set on the Cluster instance.
284
-func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
285
+func (c *Cluster) getAllowedAndExistingNamespaces(ctx context.Context) ([]apiv1.Namespace, error) {
285 286
 	if len(c.allowedNamespaces) > 0 {
286 287
 		nsList := []apiv1.Namespace{}
287 288
 		for _, name := range c.allowedNamespaces {
289
+			if err := ctx.Err(); err != nil {
290
+				return nil, err
291
+			}
288 292
 			ns, err := c.client.CoreV1().Namespaces().Get(name, meta_v1.GetOptions{})
289 293
 			switch {
290 294
 			case err == nil:
@@ -303,6 +307,9 @@ func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
303 307
 		return nsList, nil
304 308
 	}
305 309
 
310
+	if err := ctx.Err(); err != nil {
311
+		return nil, err
312
+	}
306 313
 	namespaces, err := c.client.CoreV1().Namespaces().List(meta_v1.ListOptions{})
307 314
 	if err != nil {
308 315
 		return nil, err

+ 2
- 1
cluster/kubernetes/kubernetes_test.go View File

@@ -1,6 +1,7 @@
1 1
 package kubernetes
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"reflect"
5 6
 	"testing"
6 7
 
@@ -28,7 +29,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin
28 29
 	client := ExtendedClient{coreClient: clientset}
29 30
 	c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{})
30 31
 
31
-	namespaces, err := c.getAllowedAndExistingNamespaces()
32
+	namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
32 33
 	if err != nil {
33 34
 		t.Errorf("The error should be nil, not: %s", err)
34 35
 	}

+ 51
- 14
cluster/kubernetes/resourcekinds.go View File

@@ -1,6 +1,7 @@
1 1
 package kubernetes
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"strings"
5 6
 
6 7
 	apiapps "k8s.io/api/apps/v1"
@@ -30,8 +31,8 @@ const AntecedentAnnotation = "flux.weave.works/antecedent"
30 31
 // Kind registry
31 32
 
32 33
 type resourceKind interface {
33
-	getWorkload(c *Cluster, namespace, name string) (workload, error)
34
-	getWorkloads(c *Cluster, namespace string) ([]workload, error)
34
+	getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error)
35
+	getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error)
35 36
 }
36 37
 
37 38
 var (
@@ -114,7 +115,10 @@ func (w workload) toClusterWorkload(resourceID flux.ResourceID) cluster.Workload
114 115
 
115 116
 type deploymentKind struct{}
116 117
 
117
-func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
118
+func (dk *deploymentKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
119
+	if err := ctx.Err(); err != nil {
120
+		return workload{}, err
121
+	}
118 122
 	deployment, err := c.client.AppsV1().Deployments(namespace).Get(name, meta_v1.GetOptions{})
119 123
 	if err != nil {
120 124
 		return workload{}, err
@@ -123,7 +127,10 @@ func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workl
123 127
 	return makeDeploymentWorkload(deployment), nil
124 128
 }
125 129
 
126
-func (dk *deploymentKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
130
+func (dk *deploymentKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
131
+	if err := ctx.Err(); err != nil {
132
+		return nil, err
133
+	}
127 134
 	deployments, err := c.client.AppsV1().Deployments(namespace).List(meta_v1.ListOptions{})
128 135
 	if err != nil {
129 136
 		return nil, err
@@ -191,7 +198,10 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload {
191 198
 
192 199
 type daemonSetKind struct{}
193 200
 
194
-func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
201
+func (dk *daemonSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
202
+	if err := ctx.Err(); err != nil {
203
+		return workload{}, err
204
+	}
195 205
 	daemonSet, err := c.client.AppsV1().DaemonSets(namespace).Get(name, meta_v1.GetOptions{})
196 206
 	if err != nil {
197 207
 		return workload{}, err
@@ -200,7 +210,10 @@ func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (worklo
200 210
 	return makeDaemonSetWorkload(daemonSet), nil
201 211
 }
202 212
 
203
-func (dk *daemonSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
213
+func (dk *daemonSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
214
+	if err := ctx.Err(); err != nil {
215
+		return nil, err
216
+	}
204 217
 	daemonSets, err := c.client.AppsV1().DaemonSets(namespace).List(meta_v1.ListOptions{})
205 218
 	if err != nil {
206 219
 		return nil, err
@@ -252,7 +265,10 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload {
252 265
 
253 266
 type statefulSetKind struct{}
254 267
 
255
-func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
268
+func (dk *statefulSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
269
+	if err := ctx.Err(); err != nil {
270
+		return workload{}, err
271
+	}
256 272
 	statefulSet, err := c.client.AppsV1().StatefulSets(namespace).Get(name, meta_v1.GetOptions{})
257 273
 	if err != nil {
258 274
 		return workload{}, err
@@ -261,7 +277,10 @@ func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (work
261 277
 	return makeStatefulSetWorkload(statefulSet), nil
262 278
 }
263 279
 
264
-func (dk *statefulSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
280
+func (dk *statefulSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
281
+	if err := ctx.Err(); err != nil {
282
+		return nil, err
283
+	}
265 284
 	statefulSets, err := c.client.AppsV1().StatefulSets(namespace).List(meta_v1.ListOptions{})
266 285
 	if err != nil {
267 286
 		return nil, err
@@ -345,7 +364,10 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload {
345 364
 
346 365
 type cronJobKind struct{}
347 366
 
348
-func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
367
+func (dk *cronJobKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
368
+	if err := ctx.Err(); err != nil {
369
+		return workload{}, err
370
+	}
349 371
 	cronJob, err := c.client.BatchV1beta1().CronJobs(namespace).Get(name, meta_v1.GetOptions{})
350 372
 	if err != nil {
351 373
 		return workload{}, err
@@ -354,7 +376,10 @@ func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload
354 376
 	return makeCronJobWorkload(cronJob), nil
355 377
 }
356 378
 
357
-func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
379
+func (dk *cronJobKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
380
+	if err := ctx.Err(); err != nil {
381
+		return nil, err
382
+	}
358 383
 	cronJobs, err := c.client.BatchV1beta1().CronJobs(namespace).List(meta_v1.ListOptions{})
359 384
 	if err != nil {
360 385
 		return nil, err
@@ -382,7 +407,10 @@ func makeCronJobWorkload(cronJob *apibatch.CronJob) workload {
382 407
 
383 408
 type fluxHelmReleaseKind struct{}
384 409
 
385
-func (fhr *fluxHelmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
410
+func (fhr *fluxHelmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
411
+	if err := ctx.Err(); err != nil {
412
+		return workload{}, err
413
+	}
386 414
 	fluxHelmRelease, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).Get(name, meta_v1.GetOptions{})
387 415
 	if err != nil {
388 416
 		return workload{}, err
@@ -390,7 +418,10 @@ func (fhr *fluxHelmReleaseKind) getWorkload(c *Cluster, namespace, name string)
390 418
 	return makeFluxHelmReleaseWorkload(fluxHelmRelease), nil
391 419
 }
392 420
 
393
-func (fhr *fluxHelmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
421
+func (fhr *fluxHelmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
422
+	if err := ctx.Err(); err != nil {
423
+		return nil, err
424
+	}
394 425
 	fluxHelmReleases, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).List(meta_v1.ListOptions{})
395 426
 	if err != nil {
396 427
 		return nil, err
@@ -444,7 +475,10 @@ func createK8sFHRContainers(values map[string]interface{}) []apiv1.Container {
444 475
 
445 476
 type helmReleaseKind struct{}
446 477
 
447
-func (hr *helmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
478
+func (hr *helmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
479
+	if err := ctx.Err(); err != nil {
480
+		return workload{}, err
481
+	}
448 482
 	helmRelease, err := c.client.FluxV1beta1().HelmReleases(namespace).Get(name, meta_v1.GetOptions{})
449 483
 	if err != nil {
450 484
 		return workload{}, err
@@ -452,7 +486,10 @@ func (hr *helmReleaseKind) getWorkload(c *Cluster, namespace, name string) (work
452 486
 	return makeHelmReleaseWorkload(helmRelease), nil
453 487
 }
454 488
 
455
-func (hr *helmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
489
+func (hr *helmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
490
+	if err := ctx.Err(); err != nil {
491
+		return nil, err
492
+	}
456 493
 	helmReleases, err := c.client.FluxV1beta1().HelmReleases(namespace).List(meta_v1.ListOptions{})
457 494
 	if err != nil {
458 495
 		return nil, err

+ 2
- 1
cluster/kubernetes/sync.go View File

@@ -2,6 +2,7 @@ package kubernetes
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"context"
5 6
 	"crypto/sha1"
6 7
 	"crypto/sha256"
7 8
 	"encoding/base64"
@@ -292,7 +293,7 @@ func (c *Cluster) listAllowedResources(
292 293
 	}
293 294
 
294 295
 	// List resources only from the allowed namespaces
295
-	namespaces, err := c.getAllowedAndExistingNamespaces()
296
+	namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
296 297
 	if err != nil {
297 298
 		return nil, err
298 299
 	}

+ 10
- 9
cluster/mock/mock.go View File

@@ -2,6 +2,7 @@ package mock
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"context"
5 6
 
6 7
 	"github.com/weaveworks/flux"
7 8
 	"github.com/weaveworks/flux/cluster"
@@ -14,11 +15,11 @@ import (
14 15
 
15 16
 // Doubles as a cluster.Cluster and cluster.Manifests implementation
16 17
 type Mock struct {
17
-	AllWorkloadsFunc              func(maybeNamespace string) ([]cluster.Workload, error)
18
-	SomeWorkloadsFunc             func([]flux.ResourceID) ([]cluster.Workload, error)
18
+	AllWorkloadsFunc              func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error)
19
+	SomeWorkloadsFunc             func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error)
19 20
 	IsAllowedResourceFunc         func(flux.ResourceID) bool
20 21
 	PingFunc                      func() error
21
-	ExportFunc                    func() ([]byte, error)
22
+	ExportFunc                    func(ctx context.Context) ([]byte, error)
22 23
 	SyncFunc                      func(cluster.SyncSet) error
23 24
 	PublicSSHKeyFunc              func(regenerate bool) (ssh.PublicKey, error)
24 25
 	SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
@@ -33,12 +34,12 @@ type Mock struct {
33 34
 var _ cluster.Cluster = &Mock{}
34 35
 var _ manifests.Manifests = &Mock{}
35 36
 
36
-func (m *Mock) AllWorkloads(maybeNamespace string) ([]cluster.Workload, error) {
37
-	return m.AllWorkloadsFunc(maybeNamespace)
37
+func (m *Mock) AllWorkloads(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) {
38
+	return m.AllWorkloadsFunc(ctx, maybeNamespace)
38 39
 }
39 40
 
40
-func (m *Mock) SomeWorkloads(s []flux.ResourceID) ([]cluster.Workload, error) {
41
-	return m.SomeWorkloadsFunc(s)
41
+func (m *Mock) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
42
+	return m.SomeWorkloadsFunc(ctx, ids)
42 43
 }
43 44
 
44 45
 func (m *Mock) IsAllowedResource(id flux.ResourceID) bool {
@@ -49,8 +50,8 @@ func (m *Mock) Ping() error {
49 50
 	return m.PingFunc()
50 51
 }
51 52
 
52
-func (m *Mock) Export() ([]byte, error) {
53
-	return m.ExportFunc()
53
+func (m *Mock) Export(ctx context.Context) ([]byte, error) {
54
+	return m.ExportFunc(ctx)
54 55
 }
55 56
 
56 57
 func (m *Mock) Sync(c cluster.SyncSet) error {

+ 5
- 5
daemon/daemon.go View File

@@ -71,7 +71,7 @@ func (d *Daemon) Ping(ctx context.Context) error {
71 71
 }
72 72
 
73 73
 func (d *Daemon) Export(ctx context.Context) ([]byte, error) {
74
-	return d.Cluster.Export()
74
+	return d.Cluster.Export(ctx)
75 75
 }
76 76
 
77 77
 func (d *Daemon) getManifestStore(checkout *git.Checkout) (manifests.Store, error) {
@@ -122,9 +122,9 @@ func (d *Daemon) ListServicesWithOptions(ctx context.Context, opts v11.ListServi
122 122
 	var clusterWorkloads []cluster.Workload
123 123
 	var err error
124 124
 	if len(opts.Services) > 0 {
125
-		clusterWorkloads, err = d.Cluster.SomeWorkloads(opts.Services)
125
+		clusterWorkloads, err = d.Cluster.SomeWorkloads(ctx, opts.Services)
126 126
 	} else {
127
-		clusterWorkloads, err = d.Cluster.AllWorkloads(opts.Namespace)
127
+		clusterWorkloads, err = d.Cluster.AllWorkloads(ctx, opts.Namespace)
128 128
 	}
129 129
 	if err != nil {
130 130
 		return nil, errors.Wrap(err, "getting workloads from cluster")
@@ -199,12 +199,12 @@ func (d *Daemon) ListImagesWithOptions(ctx context.Context, opts v10.ListImagesO
199 199
 		if err != nil {
200 200
 			return nil, errors.Wrap(err, "treating workload spec as ID")
201 201
 		}
202
-		workloads, err = d.Cluster.SomeWorkloads([]flux.ResourceID{id})
202
+		workloads, err = d.Cluster.SomeWorkloads(ctx, []flux.ResourceID{id})
203 203
 		if err != nil {
204 204
 			return nil, errors.Wrap(err, "getting some workloads")
205 205
 		}
206 206
 	} else {
207
-		workloads, err = d.Cluster.AllWorkloads(opts.Namespace)
207
+		workloads, err = d.Cluster.AllWorkloads(ctx, opts.Namespace)
208 208
 		if err != nil {
209 209
 			return nil, errors.Wrap(err, "getting all workloads")
210 210
 		}

+ 5
- 5
daemon/daemon_test.go View File

@@ -587,7 +587,7 @@ func TestDaemon_Automated(t *testing.T) {
587 587
 			},
588 588
 		},
589 589
 	}
590
-	k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) {
590
+	k8s.SomeWorkloadsFunc = func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
591 591
 		return []cluster.Workload{workload}, nil
592 592
 	}
593 593
 	start()
@@ -613,7 +613,7 @@ func TestDaemon_Automated_semver(t *testing.T) {
613 613
 			},
614 614
 		},
615 615
 	}
616
-	k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) {
616
+	k8s.SomeWorkloadsFunc = func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
617 617
 		return []cluster.Workload{workload}, nil
618 618
 	}
619 619
 	start()
@@ -675,7 +675,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *mock.Mock, *mockEventWr
675 675
 	var k8s *mock.Mock
676 676
 	{
677 677
 		k8s = &mock.Mock{}
678
-		k8s.AllWorkloadsFunc = func(maybeNamespace string) ([]cluster.Workload, error) {
678
+		k8s.AllWorkloadsFunc = func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) {
679 679
 			if maybeNamespace == ns {
680 680
 				return []cluster.Workload{
681 681
 					singleService,
@@ -686,9 +686,9 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *mock.Mock, *mockEventWr
686 686
 			return []cluster.Workload{}, nil
687 687
 		}
688 688
 		k8s.IsAllowedResourceFunc = func(flux.ResourceID) bool { return true }
689
-		k8s.ExportFunc = func() ([]byte, error) { return testBytes, nil }
689
+		k8s.ExportFunc = func(ctx context.Context) ([]byte, error) { return testBytes, nil }
690 690
 		k8s.PingFunc = func() error { return nil }
691
-		k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) {
691
+		k8s.SomeWorkloadsFunc = func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
692 692
 			return []cluster.Workload{
693 693
 				singleService,
694 694
 			}, nil

+ 1
- 1
daemon/images.go View File

@@ -29,7 +29,7 @@ func (d *Daemon) pollForNewImages(logger log.Logger) {
29 29
 		return
30 30
 	}
31 31
 	// Find images to check
32
-	workloads, err := d.Cluster.SomeWorkloads(candidateWorkloads.IDs())
32
+	workloads, err := d.Cluster.SomeWorkloads(ctx, candidateWorkloads.IDs())
33 33
 	if err != nil {
34 34
 		logger.Log("error", errors.Wrap(err, "checking workloads for new images"))
35 35
 		return

+ 1
- 1
daemon/sync_test.go View File

@@ -44,7 +44,7 @@ func daemon(t *testing.T) (*Daemon, func()) {
44 44
 	repo, repoCleanup := gittest.Repo(t)
45 45
 
46 46
 	k8s = &mock.Mock{}
47
-	k8s.ExportFunc = func() ([]byte, error) { return nil, nil }
47
+	k8s.ExportFunc = func(ctx context.Context) ([]byte, error) { return nil, nil }
48 48
 
49 49
 	events = &mockEventWriter{}
50 50
 

+ 1
- 1
release/context.go View File

@@ -85,7 +85,7 @@ func (rc *ReleaseContext) SelectWorkloads(ctx context.Context, results update.Re
85 85
 	}
86 86
 
87 87
 	// Ask the cluster about those that we're still interested in
88
-	definedAndRunning, err := rc.cluster.SomeWorkloads(toAskClusterAbout)
88
+	definedAndRunning, err := rc.cluster.SomeWorkloads(ctx, toAskClusterAbout)
89 89
 	if err != nil {
90 90
 		return nil, err
91 91
 	}

+ 2
- 2
release/releaser_test.go View File

@@ -145,10 +145,10 @@ var (
145 145
 
146 146
 func mockCluster(running ...cluster.Workload) *mock.Mock {
147 147
 	return &mock.Mock{
148
-		AllWorkloadsFunc: func(string) ([]cluster.Workload, error) {
148
+		AllWorkloadsFunc: func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) {
149 149
 			return running, nil
150 150
 		},
151
-		SomeWorkloadsFunc: func(ids []flux.ResourceID) ([]cluster.Workload, error) {
151
+		SomeWorkloadsFunc: func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
152 152
 			var res []cluster.Workload
153 153
 			for _, id := range ids {
154 154
 				for _, svc := range running {

Loading…
Cancel
Save