Browse Source

Separate out refresh loop for git mirroring

This rewrites the git package so that there are two distinct
abstractions: a self-syncing `Repo` and a working directory
`Checkout`.

`Repo` maintains a git mirror of an upstream git repository. It does
both the initial clone and regular fetches, notifying on a channel
when it's done so. The advantage of it being a mirror is that it will
keep track of all refs (including notes), so does not need to be told
to fetch particular branches.

Since the daemon often needs to push new commits, then wait for them
to be mirrored, there is also a synchronous method to fetch from
upstream.

Since it's just a mirror (i.e., has no working files), if the daemon
needs to look at files it must clone the repo. There's now a helper
method for this -- `*Daemon.WithClone` -- which makes sure things are
tidied up afterward.
Michael Bridgen 2 years ago
parent
commit
c3ddea5d98
13 changed files with 773 additions and 499 deletions
  1. 43
    51
      cmd/fluxd/main.go
  2. 74
    49
      daemon/daemon.go
  3. 54
    31
      daemon/daemon_test.go
  4. 9
    4
      daemon/images.go
  5. 69
    60
      daemon/loop.go
  6. 77
    39
      daemon/loop_test.go
  7. 0
    11
      flux.go
  8. 33
    5
      git/gittest/repo.go
  9. 21
    20
      git/gittest/repo_test.go
  10. 18
    14
      git/operations.go
  11. 203
    201
      git/repo.go
  12. 171
    0
      git/working.go
  13. 1
    14
      sync/sync_test.go

+ 43
- 51
cmd/fluxd/main.go View File

@@ -19,8 +19,6 @@ import (
19 19
 	k8sclient "k8s.io/client-go/kubernetes"
20 20
 	"k8s.io/client-go/rest"
21 21
 
22
-	"context"
23
-
24 22
 	"github.com/weaveworks/flux"
25 23
 	"github.com/weaveworks/flux/cluster"
26 24
 	"github.com/weaveworks/flux/cluster/kubernetes"
@@ -146,6 +144,11 @@ func main() {
146 144
 		}
147 145
 	}
148 146
 
147
+	if len(*gitPath) > 0 && (*gitPath)[0] == '/' {
148
+		logger.Log("err", "git subdirectory (--git-path) should not have leading forward slash")
149
+		os.Exit(1)
150
+	}
151
+
149 152
 	// Cluster component.
150 153
 	var clusterVersion string
151 154
 	var sshKeyRing ssh.KeyRing
@@ -274,13 +277,12 @@ func main() {
274 277
 		}
275 278
 	}
276 279
 
277
-	gitRemoteConfig, err := flux.NewGitRemoteConfig(*gitURL, *gitBranch, *gitPath)
278
-	if err != nil {
279
-		logger.Log("err", err)
280
-		os.Exit(1)
281
-	}
282 280
 	// Indirect reference to a daemon, initially of the NotReady variety
283
-	notReadyDaemon := daemon.NewNotReadyDaemon(version, k8s, gitRemoteConfig)
281
+	notReadyDaemon := daemon.NewNotReadyDaemon(version, k8s, flux.GitRemoteConfig{
282
+		URL:    *gitURL,
283
+		Branch: *gitBranch,
284
+		Path:   *gitPath,
285
+	})
284 286
 	daemonRef := daemon.NewRef(notReadyDaemon)
285 287
 
286 288
 	var eventWriter event.EventWriter
@@ -352,60 +354,49 @@ func main() {
352 354
 	var checker *checkpoint.Checker
353 355
 	updateCheckLogger := log.With(logger, "component", "checkpoint")
354 356
 
355
-	var repo git.Repo
356
-	var checkout *git.Checkout
357
+	gitRemote := git.Remote{URL: *gitURL}
358
+	gitConfig := git.Config{
359
+		Path:      *gitPath,
360
+		Branch:    *gitBranch,
361
+		SyncTag:   *gitSyncTag,
362
+		NotesRef:  *gitNotesRef,
363
+		UserName:  *gitUser,
364
+		UserEmail: *gitEmail,
365
+		SetAuthor: *gitSetAuthor,
366
+	}
367
+
368
+	repo := git.NewRepo(gitRemote)
357 369
 	{
358
-		repo = git.Repo{
359
-			GitRemoteConfig: gitRemoteConfig,
360
-		}
361
-		// TODO: should not need to supply all of this -- maybe just
362
-		// on clone?
363
-		gitConfig := git.Config{
364
-			SyncTag:   *gitSyncTag,
365
-			NotesRef:  *gitNotesRef,
366
-			UserName:  *gitUser,
367
-			UserEmail: *gitEmail,
368
-			SetAuthor: *gitSetAuthor,
369
-		}
370 370
 
371 371
 		// If there's no URL here, we will not be able to do anything else.
372
-		if gitRemoteConfig.URL == "" {
372
+		if gitRemote.URL == "" {
373 373
 			checker = checkForUpdates(clusterVersion, "false", updateCheckLogger)
374 374
 			return
375 375
 		}
376 376
 
377
-		for checkout == nil {
378
-			var stage flux.GitRepoStatus = flux.RepoNew
379
-			ctx, cancel := context.WithTimeout(context.Background(), git.DefaultCloneTimeout)
380
-			working, err := repo.Clone(ctx, gitConfig)
381
-			cancel()
382
-			if err == nil {
383
-				stage = flux.RepoCloned
384
-				ctx, cancel = context.WithTimeout(context.Background(), git.DefaultCloneTimeout)
385
-				err = working.CheckOriginWritable(ctx)
386
-				cancel()
387
-			}
388
-			if err == nil {
389
-				notReadyDaemon.UpdateStatus(flux.RepoReady, nil)
390
-				if checker != nil {
391
-					checker.Stop()
392
-				}
377
+		shutdownWg.Add(1)
378
+		go func() {
379
+			errc <- repo.Start(shutdown, shutdownWg)
380
+		}()
381
+		for {
382
+			status, err := repo.Status()
383
+			logger.Log("repo", repo.Origin().URL, "status", status, "err", err)
384
+			notReadyDaemon.UpdateStatus(status, err)
385
+
386
+			if status == flux.RepoReady {
393 387
 				checker = checkForUpdates(clusterVersion, "true", updateCheckLogger)
394
-				logger.Log("working-dir", working.Dir,
388
+				logger.Log("working-dir", repo.Dir(),
395 389
 					"user", *gitUser,
396 390
 					"email", *gitEmail,
397 391
 					"sync-tag", *gitSyncTag,
398 392
 					"notes-ref", *gitNotesRef,
399 393
 					"set-author", *gitSetAuthor)
400
-				checkout = working
401 394
 				break
402 395
 			}
403 396
 
404
-			notReadyDaemon.UpdateStatus(stage, err)
405 397
 			if checker == nil {
406 398
 				checker = checkForUpdates(clusterVersion, "false", updateCheckLogger)
407 399
 			}
408
-			logger.Log("component", "git", "err", err.Error())
409 400
 
410 401
 			tryAgain := time.NewTimer(10 * time.Second)
411 402
 			select {
@@ -424,24 +415,25 @@ func main() {
424 415
 	}
425 416
 
426 417
 	daemon := &daemon.Daemon{
427
-		V:            version,
428
-		Cluster:      k8s,
429
-		Manifests:    k8sManifests,
430
-		Registry:     cacheRegistry,
431
-		ImageRefresh: make(chan image.Name, 100), // size chosen by fair dice roll
432
-		Repo:         repo, Checkout: checkout,
418
+		V:              version,
419
+		Cluster:        k8s,
420
+		Manifests:      k8sManifests,
421
+		Registry:       cacheRegistry,
422
+		ImageRefresh:   make(chan image.Name, 100), // size chosen by fair dice roll
423
+		Repo:           repo,
424
+		GitConfig:      gitConfig,
433 425
 		Jobs:           jobs,
434 426
 		JobStatusCache: &job.StatusCache{Size: 100},
435 427
 
436 428
 		EventWriter: eventWriter,
437 429
 		Logger:      log.With(logger, "component", "daemon"), LoopVars: &daemon.LoopVars{
438
-			GitPollInterval:      *gitPollInterval,
430
+			SyncInterval:         *gitPollInterval,
439 431
 			RegistryPollInterval: *registryPollInterval,
440 432
 		},
441 433
 	}
442 434
 
443 435
 	shutdownWg.Add(1)
444
-	go daemon.GitPollLoop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop"))
436
+	go daemon.Loop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop"))
445 437
 
446 438
 	cacheWarmer.Notify = daemon.AskForImagePoll
447 439
 	cacheWarmer.Priority = daemon.ImageRefresh

+ 74
- 49
daemon/daemon.go View File

@@ -42,8 +42,8 @@ type Daemon struct {
42 42
 	Manifests      cluster.Manifests
43 43
 	Registry       registry.Registry
44 44
 	ImageRefresh   chan image.Name
45
-	Repo           git.Repo
46
-	Checkout       *git.Checkout
45
+	Repo           *git.Repo
46
+	GitConfig      git.Config
47 47
 	Jobs           *job.Queue
48 48
 	JobStatusCache *job.StatusCache
49 49
 	EventWriter    event.EventWriter
@@ -73,10 +73,12 @@ func (d *Daemon) ListServices(ctx context.Context, namespace string) ([]flux.Con
73 73
 		return nil, errors.Wrap(err, "getting services from cluster")
74 74
 	}
75 75
 
76
-	d.Checkout.RLock()
77
-	defer d.Checkout.RUnlock()
78
-
79
-	services, err := d.Manifests.ServicesWithPolicies(d.Checkout.ManifestDir())
76
+	var services policy.ResourceMap
77
+	err = d.WithClone(ctx, func(checkout *git.Checkout) error {
78
+		var err error
79
+		services, err = d.Manifests.ServicesWithPolicies(checkout.ManifestDir())
80
+		return err
81
+	})
80 82
 	if err != nil {
81 83
 		return nil, errors.Wrap(err, "getting service policies")
82 84
 	}
@@ -141,17 +143,19 @@ func (d *Daemon) executeJob(id job.ID, do DaemonJobFunc, logger log.Logger) (*ev
141 143
 	d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusRunning})
142 144
 	// make a working clone so we don't mess with files we
143 145
 	// will be reading from elsewhere
144
-	working, err := d.Checkout.WorkingClone(ctx)
146
+	var metadata *event.CommitEventMetadata
147
+	err := d.WithClone(ctx, func(working *git.Checkout) error {
148
+		var err error
149
+		metadata, err = do(ctx, id, working, logger)
150
+		if err != nil {
151
+			return err
152
+		}
153
+		return nil
154
+	})
145 155
 	if err != nil {
146 156
 		d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusFailed, Err: err.Error()})
147 157
 		return nil, err
148 158
 	}
149
-	defer working.Clean()
150
-	metadata, err := do(ctx, id, working, logger)
151
-	if err != nil {
152
-		d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusFailed, Err: err.Error()})
153
-		return metadata, err
154
-	}
155 159
 	d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusSucceeded, Result: *metadata})
156 160
 	return metadata, nil
157 161
 }
@@ -272,7 +276,7 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJo
272 276
 		}
273 277
 
274 278
 		commitAuthor := ""
275
-		if d.Checkout.Config.SetAuthor {
279
+		if d.GitConfig.SetAuthor {
276 280
 			commitAuthor = spec.Cause.User
277 281
 		}
278 282
 		commitAction := &git.CommitAction{Author: commitAuthor, Message: policyCommitMessage(updates, spec.Cause)}
@@ -311,15 +315,16 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
311 315
 				commitMsg = c.CommitMessage()
312 316
 			}
313 317
 			commitAuthor := ""
314
-			if d.Checkout.Config.SetAuthor {
318
+			if d.GitConfig.SetAuthor {
315 319
 				commitAuthor = spec.Cause.User
316 320
 			}
317 321
 			commitAction := &git.CommitAction{Author: commitAuthor, Message: commitMsg}
318 322
 			if err := working.CommitAndPush(ctx, commitAction, &git.Note{JobID: jobID, Spec: spec, Result: result}); err != nil {
319 323
 				// On the chance pushing failed because it was not
320
-				// possible to fast-forward, ask for a sync so the
321
-				// next attempt is more likely to succeed.
322
-				d.AskForSync()
324
+				// possible to fast-forward, ask the repo to fetch
325
+				// from upstream ASAP, so the next attempt is more
326
+				// likely to succeed.
327
+				d.Repo.Notify()
323 328
 				return nil, err
324 329
 			}
325 330
 			revision, err = working.HeadRevision(ctx)
@@ -343,7 +348,7 @@ func (d *Daemon) NotifyChange(ctx context.Context, change api.Change) error {
343 348
 	switch change.Kind {
344 349
 	case api.GitChange:
345 350
 		gitUpdate := change.Source.(api.GitUpdate)
346
-		if gitUpdate.URL != d.Repo.URL && gitUpdate.Branch != d.Repo.Branch {
351
+		if gitUpdate.URL != d.Repo.Origin().URL && gitUpdate.Branch != d.GitConfig.Branch {
347 352
 			// It isn't strictly an _error_ to be notified about a repo/branch pair
348 353
 			// that isn't ours, but it's worth logging anyway for debugging.
349 354
 			d.Logger.Log("msg", "notified about unrelated change",
@@ -351,7 +356,7 @@ func (d *Daemon) NotifyChange(ctx context.Context, change api.Change) error {
351 356
 				"branch", gitUpdate.Branch)
352 357
 			break
353 358
 		}
354
-		d.AskForSync()
359
+		d.Repo.Notify()
355 360
 	case api.ImageChange:
356 361
 		imageUpdate := change.Source.(api.ImageUpdate)
357 362
 		d.ImageRefresh <- imageUpdate.Name
@@ -371,41 +376,45 @@ func (d *Daemon) JobStatus(ctx context.Context, jobID job.ID) (job.Status, error
371 376
 	// Look through the commits for a note referencing this job.  This
372 377
 	// means that even if fluxd restarts, we will at least remember
373 378
 	// jobs which have pushed a commit.
374
-	notes, err := d.Checkout.NoteRevList(ctx)
375
-	if err != nil {
376
-		return job.Status{}, errors.Wrap(err, "enumerating commit notes")
377
-	}
378
-	commits, err := d.Checkout.CommitsBefore(ctx, "HEAD")
379
-	if err != nil {
380
-		return job.Status{}, errors.Wrap(err, "checking revisions for status")
381
-	}
379
+	// FIXME(michael): consider looking at the repo for this, since read op
380
+	err := d.WithClone(ctx, func(working *git.Checkout) error {
381
+		notes, err := working.NoteRevList(ctx)
382
+		if err != nil {
383
+			return errors.Wrap(err, "enumerating commit notes")
384
+		}
385
+		commits, err := d.Repo.CommitsBefore(ctx, "HEAD", d.GitConfig.Path)
386
+		if err != nil {
387
+			return errors.Wrap(err, "checking revisions for status")
388
+		}
382 389
 
383
-	for _, commit := range commits {
384
-		if _, ok := notes[commit.Revision]; ok {
385
-			note, _ := d.Checkout.GetNote(ctx, commit.Revision)
386
-			if note != nil && note.JobID == jobID {
387
-				return job.Status{
388
-					StatusString: job.StatusSucceeded,
389
-					Result: event.CommitEventMetadata{
390
-						Revision: commit.Revision,
391
-						Spec:     &note.Spec,
392
-						Result:   note.Result,
393
-					},
394
-				}, nil
390
+		for _, commit := range commits {
391
+			if _, ok := notes[commit.Revision]; ok {
392
+				note, _ := working.GetNote(ctx, commit.Revision)
393
+				if note != nil && note.JobID == jobID {
394
+					status = job.Status{
395
+						StatusString: job.StatusSucceeded,
396
+						Result: event.CommitEventMetadata{
397
+							Revision: commit.Revision,
398
+							Spec:     &note.Spec,
399
+							Result:   note.Result,
400
+						},
401
+					}
402
+					return nil
403
+				}
395 404
 			}
396 405
 		}
397
-	}
398
-
399
-	return job.Status{}, unknownJobError(jobID)
406
+		return unknownJobError(jobID)
407
+	})
408
+	return status, err
400 409
 }
401 410
 
402 411
 // Ask the daemon how far it's got applying things; in particular, is it
403
-// past the supplied release? Return the list of commits between where
404
-// we have applied and the ref given, inclusive. E.g., if you send HEAD,
412
+// past the given commit? Return the list of commits between where
413
+// we have applied (the sync tag) and the ref given, inclusive. E.g., if you send HEAD,
405 414
 // you'll get all the commits yet to be applied. If you send a hash
406
-// and it's applied _past_ it, you'll get an empty list.
415
+// and it's applied at or _past_ it, you'll get an empty list.
407 416
 func (d *Daemon) SyncStatus(ctx context.Context, commitRef string) ([]string, error) {
408
-	commits, err := d.Checkout.CommitsBetween(ctx, d.Checkout.SyncTag, commitRef)
417
+	commits, err := d.Repo.CommitsBetween(ctx, d.GitConfig.SyncTag, commitRef, d.GitConfig.Path)
409 418
 	if err != nil {
410 419
 		return nil, err
411 420
 	}
@@ -423,15 +432,31 @@ func (d *Daemon) GitRepoConfig(ctx context.Context, regenerate bool) (flux.GitCo
423 432
 	if err != nil {
424 433
 		return flux.GitConfig{}, err
425 434
 	}
435
+
436
+	origin := d.Repo.Origin()
437
+	status, _ := d.Repo.Status()
426 438
 	return flux.GitConfig{
427
-		Remote:       d.Repo.GitRemoteConfig,
439
+		Remote: flux.GitRemoteConfig{
440
+			URL:    origin.URL,
441
+			Branch: d.GitConfig.Branch,
442
+			Path:   d.GitConfig.Path,
443
+		},
428 444
 		PublicSSHKey: publicSSHKey,
429
-		Status:       flux.RepoReady,
445
+		Status:       status,
430 446
 	}, nil
431 447
 }
432 448
 
433 449
 // Non-api.Server methods
434 450
 
451
+func (d *Daemon) WithClone(ctx context.Context, fn func(*git.Checkout) error) error {
452
+	co, err := d.Repo.Clone(ctx, d.GitConfig)
453
+	if err != nil {
454
+		return err
455
+	}
456
+	defer co.Clean()
457
+	return fn(co)
458
+}
459
+
435 460
 func unknownJobError(id job.ID) error {
436 461
 	return &fluxerr.Error{
437 462
 		Type: fluxerr.Missing,

+ 54
- 31
daemon/daemon_test.go View File

@@ -50,7 +50,8 @@ var (
50 50
 
51 51
 // When I ping, I should get a response
52 52
 func TestDaemon_Ping(t *testing.T) {
53
-	d, clean, _, _ := mockDaemon(t)
53
+	d, start, clean, _, _ := mockDaemon(t)
54
+	start()
54 55
 	defer clean()
55 56
 	ctx := context.Background()
56 57
 	if d.Ping(ctx) != nil {
@@ -60,7 +61,8 @@ func TestDaemon_Ping(t *testing.T) {
60 61
 
61 62
 // When I ask a version, I should get a version
62 63
 func TestDaemon_Version(t *testing.T) {
63
-	d, clean, _, _ := mockDaemon(t)
64
+	d, start, clean, _, _ := mockDaemon(t)
65
+	start()
64 66
 	defer clean()
65 67
 
66 68
 	ctx := context.Background()
@@ -75,7 +77,8 @@ func TestDaemon_Version(t *testing.T) {
75 77
 
76 78
 // When I export it should export the current (mocked) k8s cluster
77 79
 func TestDaemon_Export(t *testing.T) {
78
-	d, clean, _, _ := mockDaemon(t)
80
+	d, start, clean, _, _ := mockDaemon(t)
81
+	start()
79 82
 	defer clean()
80 83
 
81 84
 	ctx := context.Background()
@@ -91,7 +94,8 @@ func TestDaemon_Export(t *testing.T) {
91 94
 
92 95
 // When I call list services, it should list all the services
93 96
 func TestDaemon_ListServices(t *testing.T) {
94
-	d, clean, _, _ := mockDaemon(t)
97
+	d, start, clean, _, _ := mockDaemon(t)
98
+	start()
95 99
 	defer clean()
96 100
 
97 101
 	ctx := context.Background()
@@ -126,7 +130,8 @@ func TestDaemon_ListServices(t *testing.T) {
126 130
 
127 131
 // When I call list images for a service, it should return images
128 132
 func TestDaemon_ListImages(t *testing.T) {
129
-	d, clean, _, _ := mockDaemon(t)
133
+	d, start, clean, _, _ := mockDaemon(t)
134
+	start()
130 135
 	defer clean()
131 136
 
132 137
 	ctx := context.Background()
@@ -156,10 +161,9 @@ func TestDaemon_ListImages(t *testing.T) {
156 161
 
157 162
 // When I call notify, it should cause a sync
158 163
 func TestDaemon_NotifyChange(t *testing.T) {
159
-	d, clean, mockK8s, events := mockDaemon(t)
160
-	defer clean()
161
-	w := newWait(t)
164
+	d, start, clean, mockK8s, events := mockDaemon(t)
162 165
 
166
+	w := newWait(t)
163 167
 	ctx := context.Background()
164 168
 
165 169
 	var syncCalled int
@@ -173,6 +177,9 @@ func TestDaemon_NotifyChange(t *testing.T) {
173 177
 		return nil
174 178
 	}
175 179
 
180
+	start()
181
+	defer clean()
182
+
176 183
 	d.NotifyChange(ctx, api.Change{Kind: api.GitChange, Source: api.GitUpdate{}})
177 184
 	w.Eventually(func() bool {
178 185
 		syncMu.Lock()
@@ -208,7 +215,8 @@ func TestDaemon_NotifyChange(t *testing.T) {
208 215
 // When I ask about a Job, it should tell me about a job
209 216
 // When I perform a release, it should update the git repo
210 217
 func TestDaemon_Release(t *testing.T) {
211
-	d, clean, _, _ := mockDaemon(t)
218
+	d, start, clean, _, _ := mockDaemon(t)
219
+	start()
212 220
 	defer clean()
213 221
 	w := newWait(t)
214 222
 
@@ -232,8 +240,13 @@ func TestDaemon_Release(t *testing.T) {
232 240
 
233 241
 	// Wait and check that the git manifest has been altered
234 242
 	w.Eventually(func() bool {
243
+		co, err := d.Repo.Clone(ctx, d.GitConfig)
244
+		if err != nil {
245
+			return false
246
+		}
247
+		defer co.Clean()
235 248
 		// open a file
236
-		if file, err := os.Open(filepath.Join(d.Checkout.ManifestDir(), "helloworld-deploy.yaml")); err == nil {
249
+		if file, err := os.Open(filepath.Join(co.ManifestDir(), "helloworld-deploy.yaml")); err == nil {
237 250
 
238 251
 			// make sure it gets closed
239 252
 			defer file.Close()
@@ -257,7 +270,8 @@ func TestDaemon_Release(t *testing.T) {
257 270
 // When I update a policy, I expect it to add to the queue
258 271
 // When I update a policy, it should add an annotation to the manifest
259 272
 func TestDaemon_PolicyUpdate(t *testing.T) {
260
-	d, clean, _, _ := mockDaemon(t)
273
+	d, start, clean, _, _ := mockDaemon(t)
274
+	start()
261 275
 	defer clean()
262 276
 	w := newWait(t)
263 277
 
@@ -270,12 +284,16 @@ func TestDaemon_PolicyUpdate(t *testing.T) {
270 284
 
271 285
 	// Wait and check for new annotation
272 286
 	w.Eventually(func() bool {
273
-		d.Checkout.Lock()
274
-		m, err := d.Manifests.LoadManifests(d.Checkout.ManifestDir())
287
+		co, err := d.Repo.Clone(ctx, d.GitConfig)
288
+		if err != nil {
289
+			t.Error(err)
290
+			return false
291
+		}
292
+		defer co.Clean()
293
+		m, err := d.Manifests.LoadManifests(co.ManifestDir())
275 294
 		if err != nil {
276 295
 			t.Fatalf("Error: %s", err.Error())
277 296
 		}
278
-		d.Checkout.Unlock()
279 297
 		return len(m[svc].Policy()) > 0
280 298
 	}, "Waiting for new annotation")
281 299
 }
@@ -284,7 +302,8 @@ func TestDaemon_PolicyUpdate(t *testing.T) {
284 302
 // that is about to take place. Then it should return empty once it is
285 303
 // complete
286 304
 func TestDaemon_SyncStatus(t *testing.T) {
287
-	d, clean, _, _ := mockDaemon(t)
305
+	d, start, clean, _, _ := mockDaemon(t)
306
+	start()
288 307
 	defer clean()
289 308
 	w := newWait(t)
290 309
 
@@ -304,7 +323,8 @@ func TestDaemon_SyncStatus(t *testing.T) {
304 323
 
305 324
 // When I restart fluxd, there won't be any jobs in the cache
306 325
 func TestDaemon_JobStatusWithNoCache(t *testing.T) {
307
-	d, clean, _, _ := mockDaemon(t)
326
+	d, start, clean, _, _ := mockDaemon(t)
327
+	start()
308 328
 	defer clean()
309 329
 	w := newWait(t)
310 330
 
@@ -327,7 +347,7 @@ func makeImageInfo(ref string, t time.Time) image.Info {
327 347
 	return image.Info{ID: r, CreatedAt: t}
328 348
 }
329 349
 
330
-func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter) {
350
+func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEventWriter) {
331 351
 	logger := log.NewNopLogger()
332 352
 
333 353
 	singleService := cluster.Controller{
@@ -358,18 +378,13 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter)
358 378
 
359 379
 	repo, repoCleanup := gittest.Repo(t)
360 380
 	params := git.Config{
381
+		Branch:    "master",
361 382
 		UserName:  "example",
362 383
 		UserEmail: "example@example.com",
363 384
 		SyncTag:   "flux-test",
364 385
 		NotesRef:  "fluxtest",
365 386
 	}
366 387
 
367
-	ctx := context.Background()
368
-	checkout, err := repo.Clone(ctx, params)
369
-	if err != nil {
370
-		t.Fatal(err)
371
-	}
372
-
373 388
 	var k8s *cluster.Mock
374 389
 	{
375 390
 		k8s = &cluster.Mock{}
@@ -417,16 +432,17 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter)
417 432
 
418 433
 	events := &mockEventWriter{}
419 434
 
420
-	// Shutdown chans and waitgroups
435
+	// Shutdown chan and waitgroups
421 436
 	shutdown := make(chan struct{})
422 437
 	wg := &sync.WaitGroup{}
423 438
 
424
-	// Jobs queue
439
+	// Jobs queue (starts itself)
425 440
 	jobs := job.NewQueue(shutdown, wg)
426 441
 
427 442
 	// Finally, the daemon
428 443
 	d := &Daemon{
429
-		Checkout:       checkout,
444
+		Repo:           repo,
445
+		GitConfig:      params,
430 446
 		Cluster:        k8s,
431 447
 		Manifests:      &kubernetes.Manifests{},
432 448
 		Registry:       imageRegistry,
@@ -438,15 +454,22 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter)
438 454
 		LoopVars:       &LoopVars{},
439 455
 	}
440 456
 
441
-	wg.Add(1)
442
-	go d.GitPollLoop(shutdown, wg, logger)
457
+	start := func() {
458
+		wg.Add(1)
459
+		go repo.Start(shutdown, wg)
460
+		gittest.WaitForRepoReady(repo, t)
461
+
462
+		wg.Add(1)
463
+		go d.Loop(shutdown, wg, logger)
464
+	}
443 465
 
444
-	return d, func() {
466
+	stop := func() {
445 467
 		// Close daemon first so we don't get errors if the queue closes before the daemon
446 468
 		close(shutdown)
447
-		wg.Wait() // Wait for it to close, it might take a while
469
+		wg.Wait()
448 470
 		repoCleanup()
449
-	}, k8s, events
471
+	}
472
+	return d, start, stop, k8s, events
450 473
 }
451 474
 
452 475
 type mockEventWriter struct {

+ 9
- 4
daemon/images.go View File

@@ -8,6 +8,7 @@ import (
8 8
 	"github.com/pkg/errors"
9 9
 
10 10
 	"github.com/weaveworks/flux"
11
+	"github.com/weaveworks/flux/git"
11 12
 	"github.com/weaveworks/flux/image"
12 13
 	"github.com/weaveworks/flux/policy"
13 14
 	"github.com/weaveworks/flux/update"
@@ -16,10 +17,9 @@ import (
16 17
 func (d *Daemon) pollForNewImages(logger log.Logger) {
17 18
 	logger.Log("msg", "polling images")
18 19
 
19
-	// One day we may use this for operations other than the call at the end
20 20
 	ctx := context.Background()
21 21
 
22
-	candidateServices, err := d.unlockedAutomatedServices()
22
+	candidateServices, err := d.unlockedAutomatedServices(ctx)
23 23
 	if err != nil {
24 24
 		logger.Log("error", errors.Wrap(err, "getting unlocked automated services"))
25 25
 		return
@@ -77,8 +77,13 @@ func getTagPattern(services policy.ResourceMap, service flux.ResourceID, contain
77 77
 	return "*"
78 78
 }
79 79
 
80
-func (d *Daemon) unlockedAutomatedServices() (policy.ResourceMap, error) {
81
-	services, err := d.Manifests.ServicesWithPolicies(d.Checkout.ManifestDir())
80
+func (d *Daemon) unlockedAutomatedServices(ctx context.Context) (policy.ResourceMap, error) {
81
+	var services policy.ResourceMap
82
+	err := d.WithClone(ctx, func(checkout *git.Checkout) error {
83
+		var err error
84
+		services, err = d.Manifests.ServicesWithPolicies(checkout.ManifestDir())
85
+		return err
86
+	})
82 87
 	if err != nil {
83 88
 		return nil, err
84 89
 	}

+ 69
- 60
daemon/loop.go View File

@@ -27,11 +27,12 @@ const (
27 27
 )
28 28
 
29 29
 type LoopVars struct {
30
-	GitPollInterval      time.Duration
30
+	SyncInterval         time.Duration
31 31
 	RegistryPollInterval time.Duration
32
-	syncSoon             chan struct{}
33
-	pollImagesSoon       chan struct{}
34
-	initOnce             sync.Once
32
+
33
+	initOnce       sync.Once
34
+	syncSoon       chan struct{}
35
+	pollImagesSoon chan struct{}
35 36
 }
36 37
 
37 38
 func (loop *LoopVars) ensureInit() {
@@ -41,30 +42,22 @@ func (loop *LoopVars) ensureInit() {
41 42
 	})
42 43
 }
43 44
 
44
-func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger) {
45
+func (d *Daemon) Loop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger) {
45 46
 	defer wg.Done()
46
-	// We want to pull the repo and sync at least every
47
-	// `GitPollInterval`. Being told to sync, or completing a job, may
48
-	// intervene (in which case, reschedule the next pull-and-sync)
49
-	gitPollTimer := time.NewTimer(d.GitPollInterval)
50
-	pullThen := func(k func(logger log.Logger) error) {
51
-		defer func() {
52
-			gitPollTimer.Stop()
53
-			gitPollTimer = time.NewTimer(d.GitPollInterval)
54
-		}()
55
-		ctx, cancel := context.WithTimeout(context.Background(), gitOpTimeout)
56
-		defer cancel()
57
-		if err := d.Checkout.Pull(ctx); err != nil {
58
-			logger.Log("operation", "pull", "err", err)
59
-			return
60
-		}
61
-		if err := k(logger); err != nil {
62
-			logger.Log("operation", "after-pull", "err", err)
63
-		}
64
-	}
65 47
 
48
+	// We want to sync at least every `SyncInterval`. Being told to
49
+	// sync, or completing a job, may intervene (in which case,
50
+	// reschedule the next sync).
51
+	syncTimer := time.NewTimer(d.SyncInterval)
52
+	// Similarly checking to see if any controllers have new images
53
+	// available.
66 54
 	imagePollTimer := time.NewTimer(d.RegistryPollInterval)
67 55
 
56
+	// Keep track of current HEAD, so we can know when to treat a repo
57
+	// mirror notification as a change. Otherwise, we'll just sync
58
+	// every timer tick as well as every mirror refresh.
59
+	syncHead := ""
60
+
68 61
 	// Ask for a sync, and to poll images, straight away
69 62
 	d.AskForSync()
70 63
 	d.AskForImagePoll()
@@ -74,17 +67,40 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
74 67
 			logger.Log("stopping", "true")
75 68
 			return
76 69
 		case <-d.pollImagesSoon:
70
+			if !imagePollTimer.Stop() {
71
+				select {
72
+				case <-imagePollTimer.C:
73
+				default:
74
+				}
75
+			}
77 76
 			d.pollForNewImages(logger)
78
-			imagePollTimer.Stop()
79
-			imagePollTimer = time.NewTimer(d.RegistryPollInterval)
77
+			imagePollTimer.Reset(d.RegistryPollInterval)
80 78
 		case <-imagePollTimer.C:
81 79
 			d.AskForImagePoll()
82 80
 		case <-d.syncSoon:
83
-			pullThen(d.doSync)
84
-		case <-gitPollTimer.C:
85
-			// Time to poll for new commits (unless we're already
86
-			// about to do that)
81
+			if !syncTimer.Stop() {
82
+				select {
83
+				case <-syncTimer.C:
84
+				default:
85
+				}
86
+			}
87
+			d.doSync(logger)
88
+			syncTimer.Reset(d.SyncInterval)
89
+		case <-syncTimer.C:
87 90
 			d.AskForSync()
91
+		case <-d.Repo.C:
92
+			ctx, cancel := context.WithTimeout(context.Background(), gitOpTimeout)
93
+			newSyncHead, err := d.Repo.Revision(ctx, d.GitConfig.Branch)
94
+			cancel()
95
+			if err != nil {
96
+				logger.Log("url", d.Repo.Origin().URL, "err", err)
97
+				continue
98
+			}
99
+			logger.Log("event", "refreshed", "url", d.Repo.Origin().URL, "branch", d.GitConfig.Branch, "HEAD", newSyncHead)
100
+			if newSyncHead != syncHead {
101
+				syncHead = newSyncHead
102
+				d.AskForSync()
103
+			}
88 104
 		case job := <-d.Jobs.Ready():
89 105
 			queueLength.Set(float64(d.Jobs.Len()))
90 106
 			jobLogger := log.With(logger, "jobID", job.ID)
@@ -94,15 +110,20 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
94 110
 			// pull from there and sync the cluster afterwards.
95 111
 			start := time.Now()
96 112
 			err := job.Do(jobLogger)
113
+			jobDuration.With(
114
+				fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
115
+			).Observe(time.Since(start).Seconds())
97 116
 			if err != nil {
98 117
 				jobLogger.Log("state", "done", "success", "false", "err", err)
99 118
 			} else {
100 119
 				jobLogger.Log("state", "done", "success", "true")
120
+				ctx, cancel := context.WithTimeout(context.Background(), gitOpTimeout)
121
+				err := d.Repo.Refresh(ctx)
122
+				if err != nil {
123
+					logger.Log("err", err)
124
+				}
125
+				cancel()
101 126
 			}
102
-			jobDuration.With(
103
-				fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
104
-			).Observe(time.Since(start).Seconds())
105
-			pullThen(d.doSync)
106 127
 		}
107 128
 	}
108 129
 }
@@ -145,14 +166,19 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
145 166
 		var err error
146 167
 		ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
147 168
 		defer cancel()
148
-		working, err = d.Checkout.WorkingClone(ctx)
169
+		working, err = d.Repo.Clone(ctx, d.GitConfig)
149 170
 		if err != nil {
150 171
 			return err
151 172
 		}
152 173
 		defer working.Clean()
153 174
 	}
154 175
 
155
-	// TODO logging, metrics?
176
+	// For comparison later.
177
+	oldTagRev, err := working.TagRevision(ctx, working.Config.SyncTag)
178
+	if err != nil && !isUnknownRevision(err) {
179
+		return err
180
+	}
181
+
156 182
 	// Get a map of all resources defined in the repo
157 183
 	allResources, err := d.Manifests.LoadManifests(working.ManifestDir())
158 184
 	if err != nil {
@@ -178,11 +204,11 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
178 204
 	{
179 205
 		var err error
180 206
 		ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
181
-		commits, err = working.CommitsBetween(ctx, working.SyncTag, "HEAD")
207
+		commits, err = d.Repo.CommitsBetween(ctx, working.SyncTag, "HEAD", d.GitConfig.Path)
182 208
 		if isUnknownRevision(err) {
183 209
 			// No sync tag, grab all revisions
184 210
 			initialSync = true
185
-			commits, err = working.CommitsBefore(ctx, "HEAD")
211
+			commits, err = d.Repo.CommitsBefore(ctx, "HEAD", d.GitConfig.Path)
186 212
 		}
187 213
 		cancel()
188 214
 		if err != nil {
@@ -350,33 +376,16 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
350 376
 		}
351 377
 	}
352 378
 
353
-	// Pull the tag if it has changed
354
-	{
355
-		ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
356
-		if err := d.pullIfTagMoved(ctx, working, logger); err != nil {
357
-			logger.Log("err", errors.Wrap(err, "updating tag"))
358
-		}
359
-		cancel()
360
-	}
361
-
362
-	return nil
363
-}
364
-
365
-func (d *Daemon) pullIfTagMoved(ctx context.Context, working *git.Checkout, logger log.Logger) error {
366
-	oldTagRev, err := d.Checkout.TagRevision(ctx, d.Checkout.SyncTag)
367
-	if err != nil && !strings.Contains(err.Error(), "unknown revision or path not in the working tree") {
368
-		return err
369
-	}
370 379
 	newTagRev, err := working.TagRevision(ctx, working.SyncTag)
371 380
 	if err != nil {
372 381
 		return err
373 382
 	}
374
-
375 383
 	if oldTagRev != newTagRev {
376
-		logger.Log("tag", d.Checkout.SyncTag, "old", oldTagRev, "new", newTagRev)
377
-		if err := d.Checkout.Pull(ctx); err != nil {
378
-			return err
379
-		}
384
+		logger.Log("tag", working.SyncTag, "old", oldTagRev, "new", newTagRev)
385
+		ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
386
+		err := d.Repo.Refresh(ctx)
387
+		cancel()
388
+		return err
380 389
 	}
381 390
 
382 391
 	return nil

+ 77
- 39
daemon/loop_test.go View File

@@ -26,6 +26,7 @@ import (
26 26
 )
27 27
 
28 28
 const (
29
+	gitPath     = ""
29 30
 	gitSyncTag  = "flux-sync"
30 31
 	gitNotesRef = "flux"
31 32
 	gitUser     = "Weave Flux"
@@ -39,15 +40,6 @@ var (
39 40
 
40 41
 func daemon(t *testing.T) (*Daemon, func()) {
41 42
 	repo, repoCleanup := gittest.Repo(t)
42
-	working, err := repo.Clone(context.Background(), git.Config{
43
-		SyncTag:   gitSyncTag,
44
-		NotesRef:  gitNotesRef,
45
-		UserName:  gitUser,
46
-		UserEmail: gitEmail,
47
-	})
48
-	if err != nil {
49
-		t.Fatal(err)
50
-	}
51 43
 
52 44
 	k8s = &cluster.Mock{}
53 45
 	k8s.LoadManifestsFunc = kresource.Load
@@ -62,12 +54,26 @@ func daemon(t *testing.T) (*Daemon, func()) {
62 54
 
63 55
 	wg := &sync.WaitGroup{}
64 56
 	shutdown := make(chan struct{})
57
+
58
+	wg.Add(1)
59
+	go repo.Start(shutdown, wg)
60
+	gittest.WaitForRepoReady(repo, t)
61
+
62
+	gitConfig := git.Config{
63
+		Branch:    "master",
64
+		SyncTag:   gitSyncTag,
65
+		NotesRef:  gitNotesRef,
66
+		UserName:  gitUser,
67
+		UserEmail: gitEmail,
68
+	}
69
+
65 70
 	jobs := job.NewQueue(shutdown, wg)
66 71
 	d := &Daemon{
67 72
 		Cluster:        k8s,
68 73
 		Manifests:      k8s,
69 74
 		Registry:       &registryMock.Registry{},
70
-		Checkout:       working,
75
+		Repo:           repo,
76
+		GitConfig:      gitConfig,
71 77
 		Jobs:           jobs,
72 78
 		JobStatusCache: &job.StatusCache{Size: 100},
73 79
 		EventWriter:    events,
@@ -129,9 +135,9 @@ func TestPullAndSync_InitialSync(t *testing.T) {
129 135
 		}
130 136
 	}
131 137
 	// It creates the tag at HEAD
132
-	if err := d.Checkout.Pull(context.Background()); err != nil {
138
+	if err := d.Repo.Refresh(context.Background()); err != nil {
133 139
 		t.Errorf("pulling sync tag: %v", err)
134
-	} else if revs, err := d.Checkout.CommitsBefore(context.Background(), gitSyncTag); err != nil {
140
+	} else if revs, err := d.Repo.CommitsBefore(context.Background(), gitSyncTag, gitPath); err != nil {
135 141
 		t.Errorf("finding revisions before sync tag: %v", err)
136 142
 	} else if len(revs) <= 0 {
137 143
 		t.Errorf("Found no revisions before the sync tag")
@@ -139,13 +145,25 @@ func TestPullAndSync_InitialSync(t *testing.T) {
139 145
 }
140 146
 
141 147
 func TestDoSync_NoNewCommits(t *testing.T) {
142
-	// Tag exists
143 148
 	d, cleanup := daemon(t)
144 149
 	defer cleanup()
145
-	if err := d.Checkout.MoveTagAndPush(context.Background(), "HEAD", "Sync pointer"); err != nil {
150
+
151
+	ctx := context.Background()
152
+	err := d.WithClone(ctx, func(co *git.Checkout) error {
153
+		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
154
+		defer cancel()
155
+		return co.MoveTagAndPush(ctx, "HEAD", "Sync pointer")
156
+	})
157
+	if err != nil {
146 158
 		t.Fatal(err)
147 159
 	}
148 160
 
161
+	// NB this would usually trigger a sync in a running loop; but we
162
+	// have not run the loop.
163
+	if err = d.Repo.Refresh(ctx); err != nil {
164
+		t.Error(err)
165
+	}
166
+
149 167
 	syncCalled := 0
150 168
 	var syncDef *cluster.SyncDef
151 169
 	expectedServiceIDs := flux.ResourceIDs{
@@ -159,7 +177,9 @@ func TestDoSync_NoNewCommits(t *testing.T) {
159 177
 		return nil
160 178
 	}
161 179
 
162
-	d.doSync(log.NewLogfmtLogger(ioutil.Discard))
180
+	if err := d.doSync(log.NewLogfmtLogger(ioutil.Discard)); err != nil {
181
+		t.Error(err)
182
+	}
163 183
 
164 184
 	// It applies everything
165 185
 	if syncCalled != 1 {
@@ -179,13 +199,12 @@ func TestDoSync_NoNewCommits(t *testing.T) {
179 199
 	}
180 200
 
181 201
 	// It doesn't move the tag
182
-	oldRevs, err := d.Checkout.CommitsBefore(context.Background(), gitSyncTag)
202
+	oldRevs, err := d.Repo.CommitsBefore(ctx, gitSyncTag, gitPath)
183 203
 	if err != nil {
184 204
 		t.Fatal(err)
185 205
 	}
186
-	if err := d.Checkout.Pull(context.Background()); err != nil {
187
-		t.Errorf("pulling sync tag: %v", err)
188
-	} else if revs, err := d.Checkout.CommitsBefore(context.Background(), gitSyncTag); err != nil {
206
+
207
+	if revs, err := d.Repo.CommitsBefore(ctx, gitSyncTag, gitPath); err != nil {
189 208
 		t.Errorf("finding revisions before sync tag: %v", err)
190 209
 	} else if !reflect.DeepEqual(revs, oldRevs) {
191 210
 		t.Errorf("Should have kept the sync tag at HEAD")
@@ -193,32 +212,49 @@ func TestDoSync_NoNewCommits(t *testing.T) {
193 212
 }
194 213
 
195 214
 func TestDoSync_WithNewCommit(t *testing.T) {
196
-	// Tag exists
197 215
 	d, cleanup := daemon(t)
198 216
 	defer cleanup()
217
+
218
+	ctx := context.Background()
199 219
 	// Set the sync tag to head
200
-	if err := d.Checkout.MoveTagAndPush(context.Background(), "HEAD", "Sync pointer"); err != nil {
201
-		t.Fatal(err)
202
-	}
203
-	oldRevision, err := d.Checkout.HeadRevision(context.Background())
220
+	var oldRevision, newRevision string
221
+	err := d.WithClone(ctx, func(checkout *git.Checkout) error {
222
+		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
223
+		defer cancel()
224
+
225
+		var err error
226
+		err = checkout.MoveTagAndPush(ctx, "HEAD", "Sync pointer")
227
+		if err != nil {
228
+			return err
229
+		}
230
+		oldRevision, err = checkout.HeadRevision(ctx)
231
+		if err != nil {
232
+			return err
233
+		}
234
+		// Push some new changes
235
+		err = cluster.UpdateManifest(k8s, checkout.ManifestDir(), flux.MustParseResourceID("default:deployment/helloworld"), func(def []byte) ([]byte, error) {
236
+			// A simple modification so we have changes to push
237
+			return []byte(strings.Replace(string(def), "replicas: 5", "replicas: 4", -1)), nil
238
+		})
239
+		if err != nil {
240
+			return err
241
+		}
242
+
243
+		commitAction := &git.CommitAction{Author: "", Message: "test commit"}
244
+		err = checkout.CommitAndPush(ctx, commitAction, nil)
245
+		if err != nil {
246
+			return err
247
+		}
248
+		newRevision, err = checkout.HeadRevision(ctx)
249
+		return err
250
+	})
204 251
 	if err != nil {
205 252
 		t.Fatal(err)
206 253
 	}
207
-	// Push some new changes
208
-	if err := cluster.UpdateManifest(k8s, d.Checkout.ManifestDir(), flux.MustParseResourceID("default:deployment/helloworld"), func(def []byte) ([]byte, error) {
209
-		// A simple modification so we have changes to push
210
-		return []byte(strings.Replace(string(def), "replicas: 5", "replicas: 4", -1)), nil
211
-	}); err != nil {
212
-		t.Fatal(err)
213
-	}
214 254
 
215
-	commitAction := &git.CommitAction{Author: "", Message: "test commit"}
216
-	if err := d.Checkout.CommitAndPush(context.Background(), commitAction, nil); err != nil {
217
-		t.Fatal(err)
218
-	}
219
-	newRevision, err := d.Checkout.HeadRevision(context.Background())
255
+	err = d.Repo.Refresh(ctx)
220 256
 	if err != nil {
221
-		t.Fatal(err)
257
+		t.Error(err)
222 258
 	}
223 259
 
224 260
 	syncCalled := 0
@@ -262,9 +298,11 @@ func TestDoSync_WithNewCommit(t *testing.T) {
262 298
 		}
263 299
 	}
264 300
 	// It moves the tag
265
-	if err := d.Checkout.Pull(context.Background()); err != nil {
301
+	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
302
+	defer cancel()
303
+	if err := d.Repo.Refresh(ctx); err != nil {
266 304
 		t.Errorf("pulling sync tag: %v", err)
267
-	} else if revs, err := d.Checkout.CommitsBetween(context.Background(), oldRevision, gitSyncTag); err != nil {
305
+	} else if revs, err := d.Repo.CommitsBetween(ctx, oldRevision, gitSyncTag, gitPath); err != nil {
268 306
 		t.Errorf("finding revisions before sync tag: %v", err)
269 307
 	} else if len(revs) <= 0 {
270 308
 		t.Errorf("Should have moved sync tag forward")

+ 0
- 11
flux.go View File

@@ -271,17 +271,6 @@ type Container struct {
271 271
 
272 272
 // --- config types
273 273
 
274
-func NewGitRemoteConfig(url, branch, path string) (GitRemoteConfig, error) {
275
-	if len(path) > 0 && path[0] == '/' {
276
-		return GitRemoteConfig{}, errors.New("git subdirectory (--git-path) should not have leading forward slash")
277
-	}
278
-	return GitRemoteConfig{
279
-		URL:    url,
280
-		Branch: branch,
281
-		Path:   path,
282
-	}, nil
283
-}
284
-
285 274
 type GitRemoteConfig struct {
286 275
 	URL    string `json:"url"`
287 276
 	Branch string `json:"branch"`

+ 33
- 5
git/gittest/repo.go View File

@@ -4,7 +4,9 @@ import (
4 4
 	"io/ioutil"
5 5
 	"os/exec"
6 6
 	"path/filepath"
7
+	"sync"
7 8
 	"testing"
9
+	"time"
8 10
 
9 11
 	"context"
10 12
 	"github.com/weaveworks/flux"
@@ -14,7 +16,7 @@ import (
14 16
 
15 17
 // Repo creates a new clone-able git repo, pre-populated with some kubernetes
16 18
 // files and a few commits. Also returns a cleanup func to clean up after.
17
-func Repo(t *testing.T) (git.Repo, func()) {
19
+func Repo(t *testing.T) (*git.Repo, func()) {
18 20
 	newDir, cleanup := testfiles.TempDir(t)
19 21
 
20 22
 	filesDir := filepath.Join(newDir, "files")
@@ -53,15 +55,20 @@ func Repo(t *testing.T) (git.Repo, func()) {
53 55
 		t.Fatal(err)
54 56
 	}
55 57
 
56
-	conf, _ := flux.NewGitRemoteConfig(gitDir, "master", "")
57
-	return git.Repo{
58
-		GitRemoteConfig: conf,
59
-	}, cleanup
58
+	return git.NewRepo(git.Remote{
59
+		URL: "file://" + gitDir,
60
+	}), cleanup
60 61
 }
61 62
 
62 63
 func Checkout(t *testing.T) (*git.Checkout, func()) {
63 64
 	repo, cleanup := Repo(t)
65
+	shutdown, wg := make(chan struct{}), &sync.WaitGroup{}
66
+	wg.Add(1)
67
+	go repo.Start(shutdown, wg)
68
+	WaitForRepoReady(repo, t)
69
+
64 70
 	config := git.Config{
71
+		Branch:    "master",
65 72
 		UserName:  "example",
66 73
 		UserEmail: "example@example.com",
67 74
 		SyncTag:   "flux-test",
@@ -69,10 +76,12 @@ func Checkout(t *testing.T) (*git.Checkout, func()) {
69 76
 	}
70 77
 	co, err := repo.Clone(context.Background(), config)
71 78
 	if err != nil {
79
+		close(shutdown)
72 80
 		cleanup()
73 81
 		t.Fatal(err)
74 82
 	}
75 83
 	return co, func() {
84
+		close(shutdown)
76 85
 		co.Clean()
77 86
 		cleanup()
78 87
 	}
@@ -84,3 +93,22 @@ func execCommand(cmd string, args ...string) error {
84 93
 	c.Stdout = ioutil.Discard
85 94
 	return c.Run()
86 95
 }
96
+
97
+func WaitForRepoReady(r *git.Repo, t *testing.T) {
98
+	retries := 5
99
+	for {
100
+		s, err := r.Status()
101
+		if err != nil {
102
+			t.Fatal(err)
103
+		}
104
+		if s == flux.RepoReady {
105
+			return
106
+		}
107
+		if retries == 0 {
108
+			t.Fatalf("repo was not ready after 5 seconds")
109
+			return
110
+		}
111
+		retries--
112
+		time.Sleep(100 * time.Millisecond)
113
+	}
114
+}

+ 21
- 20
git/gittest/repo_test.go View File

@@ -4,6 +4,7 @@ import (
4 4
 	"io/ioutil"
5 5
 	"path/filepath"
6 6
 	"reflect"
7
+	"sync"
7 8
 	"testing"
8 9
 
9 10
 	"context"
@@ -19,9 +20,16 @@ func TestCheckout(t *testing.T) {
19 20
 	repo, cleanup := Repo(t)
20 21
 	defer cleanup()
21 22
 
23
+	sd, sg := make(chan struct{}), &sync.WaitGroup{}
24
+
25
+	sg.Add(1)
26
+	go repo.Start(sd, sg)
27
+	WaitForRepoReady(repo, t)
28
+
22 29
 	ctx := context.Background()
23 30
 
24 31
 	params := git.Config{
32
+		Branch:    "master",
25 33
 		UserName:  "example",
26 34
 		UserEmail: "example@example.com",
27 35
 		SyncTag:   "flux-test",
@@ -47,17 +55,9 @@ func TestCheckout(t *testing.T) {
47 55
 		t.Errorf("Expected no note on head revision; got %#v", note)
48 56
 	}
49 57
 
50
-	// Make a working clone and push changes back; then make sure they
51
-	// are visible in the original repo
52
-	working, err := checkout.WorkingClone(ctx)
53
-	if err != nil {
54
-		t.Fatal(err)
55
-	}
56
-	defer working.Clean()
57
-
58 58
 	changedFile := ""
59 59
 	for file, _ := range testfiles.Files {
60
-		path := filepath.Join(working.ManifestDir(), file)
60
+		path := filepath.Join(checkout.ManifestDir(), file)
61 61
 		if err := ioutil.WriteFile(path, []byte("FIRST CHANGE"), 0666); err != nil {
62 62
 			t.Fatal(err)
63 63
 		}
@@ -65,11 +65,11 @@ func TestCheckout(t *testing.T) {
65 65
 		break
66 66
 	}
67 67
 	commitAction := &git.CommitAction{Author: "", Message: "Changed file"}
68
-	if err := working.CommitAndPush(ctx, commitAction, nil); err != nil {
68
+	if err := checkout.CommitAndPush(ctx, commitAction, nil); err != nil {
69 69
 		t.Fatal(err)
70 70
 	}
71 71
 
72
-	path := filepath.Join(working.ManifestDir(), changedFile)
72
+	path := filepath.Join(checkout.ManifestDir(), changedFile)
73 73
 	if err := ioutil.WriteFile(path, []byte("SECOND CHANGE"), 0666); err != nil {
74 74
 		t.Fatal(err)
75 75
 	}
@@ -89,7 +89,7 @@ func TestCheckout(t *testing.T) {
89 89
 		},
90 90
 	}
91 91
 	commitAction = &git.CommitAction{Author: "", Message: "Changed file again"}
92
-	if err := working.CommitAndPush(ctx, commitAction, &expectedNote); err != nil {
92
+	if err := checkout.CommitAndPush(ctx, commitAction, &expectedNote); err != nil {
93 93
 		t.Fatal(err)
94 94
 	}
95 95
 
@@ -114,17 +114,18 @@ func TestCheckout(t *testing.T) {
114 114
 		}
115 115
 	}
116 116
 
117
-	// Do we see the changes if we pull into the original checkout?
118
-	if err := checkout.Pull(ctx); err != nil {
119
-		t.Fatal(err)
117
+	// Do we see the changes if we make another working checkout?
118
+	if err := repo.Refresh(ctx); err != nil {
119
+		t.Error(err)
120 120
 	}
121
-	check(checkout)
122 121
 
123
-	// Do we see the changes if we clone again?
124
-	anotherCheckout, err := repo.Clone(ctx, params)
122
+	another, err := repo.Clone(ctx, params)
125 123
 	if err != nil {
126 124
 		t.Fatal(err)
127 125
 	}
128
-	defer anotherCheckout.Clean()
129
-	check(checkout)
126
+	defer another.Clean()
127
+	check(another)
128
+
129
+	close(sd)
130
+	sg.Wait()
130 131
 }

+ 18
- 14
git/operations.go View File

@@ -28,7 +28,7 @@ func config(ctx context.Context, workingDir, user, email string) error {
28 28
 	return nil
29 29
 }
30 30
 
31
-func clone(ctx context.Context, workingDir string, repoURL, repoBranch string) (path string, err error) {
31
+func clone(ctx context.Context, workingDir, repoURL, repoBranch string) (path string, err error) {
32 32
 	repoPath := filepath.Join(workingDir, "repo")
33 33
 	args := []string{"clone"}
34 34
 	if repoBranch != "" {
@@ -41,9 +41,19 @@ func clone(ctx context.Context, workingDir string, repoURL, repoBranch string) (
41 41
 	return repoPath, nil
42 42
 }
43 43
 
44
-// checkPush sanity-checks that we can write to the upstream repo with
45
-// the given keyring (being able to `clone` is an adequate check that
46
-// we can read the upstream).
44
+func mirror(ctx context.Context, workingDir, repoURL string) (path string, err error) {
45
+	repoPath := filepath.Join(workingDir, "repo")
46
+	args := []string{"clone", "--mirror"}
47
+	args = append(args, repoURL, repoPath)
48
+	if err := execGitCmd(ctx, workingDir, nil, args...); err != nil {
49
+		return "", errors.Wrap(err, "git clone --mirror")
50
+	}
51
+	return repoPath, nil
52
+}
53
+
54
+// checkPush sanity-checks that we can write to the upstream repo
55
+// (being able to `clone` is an adequate check that we can read the
56
+// upstream).
47 57
 func checkPush(ctx context.Context, workingDir, upstream string) error {
48 58
 	// --force just in case we fetched the tag from upstream when cloning
49 59
 	if err := execGitCmd(ctx, workingDir, nil, "tag", "--force", CheckPushTag); err != nil {
@@ -86,16 +96,10 @@ func push(ctx context.Context, workingDir, upstream string, refs []string) error
86 96
 	return nil
87 97
 }
88 98
 
89
-// pull the specific ref from upstream
90
-func pull(ctx context.Context, workingDir, upstream, ref string) error {
91
-	if err := execGitCmd(ctx, workingDir, nil, "pull", "--ff-only", upstream, ref); err != nil {
92
-		return errors.Wrap(err, fmt.Sprintf("git pull --ff-only %s %s", upstream, ref))
93
-	}
94
-	return nil
95
-}
96
-
97
-func fetch(ctx context.Context, workingDir, upstream, refspec string) error {
98
-	if err := execGitCmd(ctx, workingDir, nil, "fetch", "--tags", upstream, refspec); err != nil &&
99
+// fetch updates refs from the upstream.
100
+func fetch(ctx context.Context, workingDir, upstream string, refspec ...string) error {
101
+	args := append([]string{"fetch", "--tags", upstream}, refspec...)
102
+	if err := execGitCmd(ctx, workingDir, nil, args...); err != nil &&
99 103
 		!strings.Contains(err.Error(), "Couldn't find remote ref") {
100 104
 		return errors.Wrap(err, fmt.Sprintf("git fetch --tags %s %s", upstream, refspec))
101 105
 	}

+ 203
- 201
git/repo.go View File

@@ -4,7 +4,6 @@ import (
4 4
 	"errors"
5 5
 	"io/ioutil"
6 6
 	"os"
7
-	"path/filepath"
8 7
 	"sync"
9 8
 
10 9
 	"context"
@@ -14,253 +13,256 @@ import (
14 13
 )
15 14
 
16 15
 const (
16
+	interval  = 5 * time.Minute
17
+	opTimeout = 20 * time.Second
18
+
17 19
 	DefaultCloneTimeout = 2 * time.Minute
18 20
 	CheckPushTag        = "flux-write-check"
19 21
 )
20 22
 
21 23
 var (
22 24
 	ErrNoChanges = errors.New("no changes made in repo")
25
+	ErrNotReady  = errors.New("git repo not ready")
26
+	ErrNoConfig  = errors.New("git repo has not valid config")
23 27
 )
24 28
 
25
-// Repo represents a (remote) git repo.
26
-type Repo struct {
27
-	flux.GitRemoteConfig
28
-}
29
-
30
-// Checkout is a local clone of the remote repo.
31
-type Checkout struct {
32
-	repo Repo
33
-	Dir  string
34
-	Config
35
-	realNotesRef string
36
-	sync.RWMutex
29
+// Remote points at a git repo somewhere.
30
+type Remote struct {
31
+	URL string // clone from here
37 32
 }
38 33
 
39
-// Config holds some values we use when working in the local copy of
40
-// the repo
41
-type Config struct {
42
-	SyncTag   string
43
-	NotesRef  string
44
-	UserName  string
45
-	UserEmail string
46
-	SetAuthor bool
47
-}
34
+type Repo struct {
35
+	// As supplied to constructor
36
+	origin Remote
48 37
 
49
-type Commit struct {
50
-	Revision string
51
-	Message  string
52
-}
38
+	// State
39
+	mu     sync.RWMutex
40
+	status flux.GitRepoStatus
41
+	err    error
42
+	dir    string
53 43
 
54
-// CommitAction - struct holding commit information
55
-type CommitAction struct {
56
-	Author  string
57
-	Message string
44
+	notify chan struct{}
45
+	C      chan struct{}
58 46
 }
59 47
 
60
-// Get a local clone of the upstream repo, and use the config given.
61
-func (r Repo) Clone(ctx context.Context, c Config) (*Checkout, error) {
62
-	if r.URL == "" {
63
-		return nil, NoRepoError
64
-	}
65
-
66
-	workingDir, err := ioutil.TempDir(os.TempDir(), "flux-gitclone")
67
-	if err != nil {
68
-		return nil, err
69
-	}
70
-
71
-	repoDir, err := clone(ctx, workingDir, r.URL, r.Branch)
72
-	if err != nil {
73
-		return nil, CloningError(r.URL, err)
74
-	}
75
-
76
-	if err := config(ctx, repoDir, c.UserName, c.UserEmail); err != nil {
77
-		return nil, err
78
-	}
79
-
80
-	notesRef, err := getNotesRef(ctx, repoDir, c.NotesRef)
81
-	if err != nil {
82
-		return nil, err
83
-	}
84
-
85
-	// this fetches and updates the local ref, so we'll see notes
86
-	if err := fetch(ctx, repoDir, r.URL, notesRef+":"+notesRef); err != nil {
87
-		return nil, err
48
+// NewRepo constructs a repo mirror which will sync itself.
49
+func NewRepo(origin Remote) *Repo {
50
+	r := &Repo{
51
+		origin: origin,
52
+		status: flux.RepoNew,
53
+		err:    nil,
54
+		notify: make(chan struct{}, 1), // `1` so that Notify doesn't block
55
+		C:      make(chan struct{}, 1), // `1` so we don't block on completing a refresh
88 56
 	}
57
+	return r
58
+}
89 59
 
90
-	return &Checkout{
91
-		repo:         r,
92
-		Dir:          repoDir,
93
-		Config:       c,
94
-		realNotesRef: notesRef,
95
-	}, nil
60
+// Origin returns the Remote with which the Repo was constructed.
61
+func (r *Repo) Origin() Remote {
62
+	r.mu.RLock()
63
+	defer r.mu.RUnlock()
64
+	return r.origin
96 65
 }
97 66
 
98
-// WorkingClone makes a(nother) clone of the repository to use for
99
-// e.g., rewriting files, so we can keep a pristine clone for reading
100
-// out of.
101
-func (c *Checkout) WorkingClone(ctx context.Context) (*Checkout, error) {
102
-	c.Lock()
103
-	defer c.Unlock()
104
-	workingDir, err := ioutil.TempDir(os.TempDir(), "flux-working")
105
-	if err != nil {
106
-		return nil, err
107
-	}
67
+// Dir returns the local directory into which the repo has been
68
+// cloned, if it has been cloned.
69
+func (r *Repo) Dir() string {
70
+	r.mu.RLock()
71
+	defer r.mu.RUnlock()
72
+	return r.dir
73
+}
108 74
 
109
-	repoDir, err := clone(ctx, workingDir, c.Dir, c.repo.Branch)
110
-	if err != nil {
111
-		return nil, err
112
-	}
75
+// Status reports that readiness status of this Git repo: whether it
76
+// has been cloned, whether it is writable, and if not, the error
77
+// stopping it getting to the next state.
78
+func (r *Repo) Status() (flux.GitRepoStatus, error) {
79
+	r.mu.RLock()
80
+	defer r.mu.RUnlock()
81
+	return r.status, r.err
82
+}
113 83
 
114
-	if err := config(ctx, repoDir, c.UserName, c.UserEmail); err != nil {
115
-		return nil, err
116
-	}
84
+func (r *Repo) setStatus(s flux.GitRepoStatus, err error) {
85
+	r.mu.Lock()
86
+	r.status = s
87
+	r.err = err
88
+	r.mu.Unlock()
89
+}
117 90
 
118
-	// this fetches and updates the local ref, so we'll see notes
119
-	if err := fetch(ctx, repoDir, c.Dir, c.realNotesRef+":"+c.realNotesRef); err != nil {
120
-		return nil, err
91
+// Notify tells the repo that it should fetch from the origin as soon
92
+// as possible. It does not block.
93
+func (r *Repo) Notify() {
94
+	select {
95
+	case r.notify <- struct{}{}:
96
+		// duly notified
97
+	default:
98
+		// notification already pending
121 99
 	}
122
-
123
-	return &Checkout{
124
-		repo:         c.repo,
125
-		Dir:          repoDir,
126
-		Config:       c.Config,
127
-		realNotesRef: c.realNotesRef,
128
-	}, nil
129 100
 }
130 101
 
131
-// Clean a Checkout up (remove the clone)
132
-func (c *Checkout) Clean() {
133
-	if c.Dir != "" {
134
-		os.RemoveAll(c.Dir)
102
+// Revision returns the revision (SHA1) of the ref passed in
103
+func (r *Repo) Revision(ctx context.Context, ref string) (string, error) {
104
+	r.mu.RLock()
105
+	defer r.mu.RUnlock()
106
+	if r.dir == "" {
107
+		return "", errors.New("git repo not initialised")
135 108
 	}
109
+	return refRevision(ctx, r.dir, ref)
136 110
 }
137 111
 
138
-// ManifestDir returns a path to where the files are
139
-func (c *Checkout) ManifestDir() string {
140
-	return filepath.Join(c.Dir, c.repo.Path)
112
+func (r *Repo) CommitsBefore(ctx context.Context, ref, path string) ([]Commit, error) {
113
+	r.mu.RLock()
114
+	defer r.mu.RUnlock()
115
+	return onelinelog(ctx, r.dir, ref, path)
141 116
 }
142 117
 
143
-// CheckOriginWritable tests that we can write to the origin
144
-// repository; we need to be able to do this to push the sync tag, for
145
-// example.
146
-func (c *Checkout) CheckOriginWritable(ctx context.Context) error {
147
-	c.Lock()
148
-	defer c.Unlock()
149
-	if err := checkPush(ctx, c.Dir, c.repo.URL); err != nil {
150
-		return ErrUpstreamNotWritable(c.repo.URL, err)
151
-	}
152
-	return nil
118
+func (r *Repo) CommitsBetween(ctx context.Context, ref1, ref2, path string) ([]Commit, error) {
119
+	r.mu.RLock()
120
+	defer r.mu.RUnlock()
121
+	return onelinelog(ctx, r.dir, ref1+".."+ref2, path)
153 122
 }
154 123
 
155
-// CommitAndPush commits changes made in this checkout, along with any
156
-// extra data as a note, and pushes the commit and note to the remote repo.
157
-func (c *Checkout) CommitAndPush(ctx context.Context, commitAction *CommitAction, note *Note) error {
158
-	c.Lock()
159
-	defer c.Unlock()
160
-	if !check(ctx, c.Dir, c.repo.Path) {
161
-		return ErrNoChanges
162
-	}
163
-	if err := commit(ctx, c.Dir, commitAction); err != nil {
164
-		return err
165
-	}
166
-
167
-	if note != nil {
168
-		rev, err := refRevision(ctx, c.Dir, "HEAD")
169
-		if err != nil {
170
-			return err
124
+// Start begins synchronising the repo by cloning it, then fetching
125
+// the required tags and so on.
126
+func (r *Repo) Start(shutdown <-chan struct{}, done *sync.WaitGroup) error {
127
+	defer done.Done()
128
+
129
+	for {
130
+
131
+		r.mu.RLock()
132
+		url := r.origin.URL
133
+		dir := r.dir
134
+		status := r.status
135
+		r.mu.RUnlock()
136
+
137
+		bg := context.Background()
138
+
139
+		switch status {
140
+
141
+		// TODO(michael): I don't think this is a real status; perhaps
142
+		// have a no-op repo instead.
143
+		case flux.RepoNoConfig:
144
+			// this is not going to change in the lifetime of this
145
+			// process
146
+			return ErrNoConfig
147
+		case flux.RepoNew:
148
+
149
+			rootdir, err := ioutil.TempDir(os.TempDir(), "flux-gitclone")
150
+			if err != nil {
151
+				return err
152
+			}
153
+
154
+			ctx, cancel := context.WithTimeout(bg, opTimeout)
155
+			dir, err = mirror(ctx, rootdir, url)
156
+			cancel()
157
+			if err == nil {
158
+				r.mu.Lock()
159
+				r.dir = dir
160
+				ctx, cancel := context.WithTimeout(bg, opTimeout)
161
+				err = r.fetch(ctx)
162
+				cancel()
163
+				r.mu.Unlock()
164
+			}
165
+			if err == nil {
166
+				r.setStatus(flux.RepoCloned, nil)
167
+				continue // with new status, skipping timer
168
+			}
169
+			dir = ""
170
+			os.RemoveAll(rootdir)
171
+			r.setStatus(flux.RepoNew, err)
172
+
173
+		case flux.RepoCloned:
174
+			ctx, cancel := context.WithTimeout(bg, opTimeout)
175
+			err := checkPush(ctx, dir, url)
176
+			cancel()
177
+			if err == nil {
178
+				r.setStatus(flux.RepoReady, nil)
179
+				continue // with new status, skipping timer
180
+			}
181
+			r.setStatus(flux.RepoCloned, err)
182
+
183
+		case flux.RepoReady:
184
+			if err := r.refreshLoop(shutdown); err != nil {
185
+				r.setStatus(flux.RepoNew, err)
186
+				continue // with new status, skipping timer
187
+			}
171 188
 		}
172
-		if err := addNote(ctx, c.Dir, rev, c.realNotesRef, note); err != nil {
173
-			return err
189
+
190
+		tryAgain := time.NewTimer(10 * time.Second)
191
+		select {
192
+		case <-shutdown:
193
+			if !tryAgain.Stop() {
194
+				<-tryAgain.C
195
+			}
196
+			return nil
197
+		case <-tryAgain.C:
198
+			continue
174 199
 		}
175 200
 	}
201
+}
176 202
 
177
-	refs := []string{c.repo.Branch}
178
-	ok, err := refExists(ctx, c.Dir, c.realNotesRef)
179
-	if ok {
180
-		refs = append(refs, c.realNotesRef)
181
-	} else if err != nil {
203
+func (r *Repo) Refresh(ctx context.Context) error {
204
+	// the lock here and below is difficult to avoid; possibly we
205
+	// could clone to another repo and pull there, then swap when complete.
206
+	r.mu.Lock()
207
+	defer r.mu.Unlock()
208
+	if r.status != flux.RepoReady {
209
+		return ErrNotReady
210
+	}
211
+	if err := r.fetch(ctx); err != nil {
182 212
 		return err
183 213
 	}
184
-
185
-	if err := push(ctx, c.Dir, c.repo.URL, refs); err != nil {
186
-		return PushError(c.repo.URL, err)
214
+	select {
215
+	case r.C <- struct{}{}:
216
+	default:
187 217
 	}
188 218
 	return nil
189 219
 }
190 220
 
191
-// GetNote gets a note for the revision specified, or nil if there is no such note.
192
-func (c *Checkout) GetNote(ctx context.Context, rev string) (*Note, error) {
193
-	c.RLock()
194
-	defer c.RUnlock()
195
-	return getNote(ctx, c.Dir, c.realNotesRef, rev)
221
+func (r *Repo) refreshLoop(shutdown <-chan struct{}) error {
222
+	gitPoll := time.NewTimer(interval)
223
+	for {
224
+		select {
225
+		case <-shutdown:
226
+			if !gitPoll.Stop() {
227
+				<-gitPoll.C
228
+			}
229
+			return nil
230
+		case <-gitPoll.C:
231
+			r.Notify()
232
+		case <-r.notify:
233
+			if !gitPoll.Stop() {
234
+				select {
235
+				case <-gitPoll.C:
236
+				default:
237
+				}
238
+			}
239
+			ctx, cancel := context.WithTimeout(context.Background(), interval)
240
+			err := r.Refresh(ctx)
241
+			cancel()
242
+			if err != nil {
243
+				return err
244
+			}
245
+			gitPoll.Reset(interval)
246
+		}
247
+	}
196 248
 }
197 249
 
198
-// Pull fetches the latest commits on the branch we're using, and the latest notes
199
-func (c *Checkout) Pull(ctx context.Context) error {
200
-	c.Lock()
201
-	defer c.Unlock()
202
-	if err := pull(ctx, c.Dir, c.repo.URL, c.repo.Branch); err != nil {
250
+// fetch gets updated refs, and associated objects, from the upstream.
251
+func (r *Repo) fetch(ctx context.Context) error {
252
+	if err := fetch(ctx, r.dir, "origin"); err != nil {
203 253
 		return err
204 254
 	}
205
-	for _, ref := range []string{
206
-		c.realNotesRef + ":" + c.realNotesRef,
207
-		c.SyncTag,
208
-	} {
209
-		// this fetches and updates the local ref, so we'll see the new
210
-		// notes; but it's possible that the upstream doesn't have this
211
-		// ref.
212
-		if err := fetch(ctx, c.Dir, c.repo.URL, ref); err != nil {
213
-			return err
214
-		}
215
-	}
216 255
 	return nil
217 256
 }
218 257
 
219
-func (c *Checkout) HeadRevision(ctx context.Context) (string, error) {
220
-	c.RLock()
221
-	defer c.RUnlock()
222
-	return refRevision(ctx, c.Dir, "HEAD")
223
-}
224
-
225
-func (c *Checkout) TagRevision(ctx context.Context, tag string) (string, error) {
226
-	c.RLock()
227
-	defer c.RUnlock()
228
-	return refRevision(ctx, c.Dir, tag)
229
-}
230
-
231
-func (c *Checkout) CommitsBetween(ctx context.Context, ref1, ref2 string) ([]Commit, error) {
232
-	c.RLock()
233
-	defer c.RUnlock()
234
-	return onelinelog(ctx, c.Dir, ref1+".."+ref2, c.repo.GitRemoteConfig.Path)
235
-}
236
-
237
-func (c *Checkout) CommitsBefore(ctx context.Context, ref string) ([]Commit, error) {
238
-	c.RLock()
239
-	defer c.RUnlock()
240
-	return onelinelog(ctx, c.Dir, ref, c.repo.GitRemoteConfig.Path)
241
-}
242
-
243
-func (c *Checkout) MoveTagAndPush(ctx context.Context, ref, msg string) error {
244
-	c.Lock()
245
-	defer c.Unlock()
246
-	return moveTagAndPush(ctx, c.Dir, c.SyncTag, ref, msg, c.repo.URL)
247
-}
248
-
249
-// ChangedFiles does a git diff listing changed files
250
-func (c *Checkout) ChangedFiles(ctx context.Context, ref string) ([]string, error) {
251
-	c.Lock()
252
-	defer c.Unlock()
253
-	list, err := changedFiles(ctx, c.Dir, c.repo.Path, ref)
254
-	if err == nil {
255
-		for i, file := range list {
256
-			list[i] = filepath.Join(c.Dir, file)
257
-		}
258
+// workingClone makes a non-bare clone, at `ref` (probably a branch),
259
+// and returns the filesystem path to it.
260
+func (r *Repo) workingClone(ctx context.Context, ref string) (string, error) {
261
+	r.mu.RLock()
262
+	defer r.mu.RUnlock()
263
+	working, err := ioutil.TempDir(os.TempDir(), "flux-working")
264
+	if err != nil {
265
+		return "", err
258 266
 	}
259
-	return list, err
260
-}
261
-
262
-func (c *Checkout) NoteRevList(ctx context.Context) (map[string]struct{}, error) {
263
-	c.Lock()
264
-	defer c.Unlock()
265
-	return noteRevList(ctx, c.Dir, c.realNotesRef)
267
+	return clone(ctx, working, r.dir, ref)
266 268
 }

+ 171
- 0
git/working.go View File

@@ -0,0 +1,171 @@
1
+package git
2
+
3
+import (
4
+	"context"
5
+	"os"
6
+	"path/filepath"
7
+	"sync"
8
+)
9
+
10
+// Config holds some values we use when working in the working clone of
11
+// a repo.
12
+type Config struct {
13
+	Branch    string // branch we're syncing to
14
+	Path      string // path within the repo containing files we care about
15
+	SyncTag   string
16
+	NotesRef  string
17
+	UserName  string
18
+	UserEmail string
19
+	SetAuthor bool
20
+}
21
+
22
+// Checkout is a local working clone of the remote repo.
23
+type Checkout struct {
24
+	Dir string
25
+	Config
26
+
27
+	upstream     Remote
28
+	realNotesRef string // cache the notes ref, since we use it to push as well
29
+	sync.RWMutex        // the release code at least needs to lock/unlock the checkout
30
+}
31
+
32
+type Commit struct {
33
+	Revision string
34
+	Message  string
35
+}
36
+
37
+// CommitAction - struct holding commit information
38
+type CommitAction struct {
39
+	Author  string
40
+	Message string
41
+}
42
+
43
+// Clone returns a local working clone of the sync'ed `*Repo`, using
44
+// the config given.
45
+func (r *Repo) Clone(ctx context.Context, conf Config) (*Checkout, error) {
46
+	upstream := r.Origin()
47
+	repoDir, err := r.workingClone(ctx, conf.Branch)
48
+	if err != nil {
49
+		return nil, CloningError(upstream.URL, err)
50
+	}
51
+
52
+	if err := config(ctx, repoDir, conf.UserName, conf.UserEmail); err != nil {
53
+		os.RemoveAll(repoDir)
54
+		return nil, err
55
+	}
56
+
57
+	// We'll need the notes ref for pushing it, so make sure we have
58
+	// it. This assumes we're syncing it (otherwise we'll likely get conflicts)
59
+	realNotesRef, err := getNotesRef(ctx, repoDir, conf.NotesRef)
60
+	if err != nil {
61
+		os.RemoveAll(repoDir)
62
+		return nil, err
63
+	}
64
+
65
+	r.mu.RLock()
66
+	if err := fetch(ctx, repoDir, r.dir, realNotesRef+":"+realNotesRef); err != nil {
67
+		os.RemoveAll(repoDir)
68
+		r.mu.RUnlock()
69
+		return nil, err
70
+	}
71
+	r.mu.RUnlock()
72
+
73
+	return &Checkout{
74
+		upstream:     upstream,
75
+		realNotesRef: realNotesRef,
76
+		Dir:          repoDir,
77
+		Config:       conf,
78
+	}, nil
79
+}
80
+
81
+// Clean a Checkout up (remove the clone)
82
+func (c *Checkout) Clean() {
83
+	if c.Dir != "" {
84
+		os.RemoveAll(c.Dir)
85
+	}
86
+}
87
+
88
+// ManifestDir returns a path to where the files are
89
+func (c *Checkout) ManifestDir() string {
90
+	return filepath.Join(c.Dir, c.Config.Path)
91
+}
92
+
93
+// CommitAndPush commits changes made in this checkout, along with any
94
+// extra data as a note, and pushes the commit and note to the remote repo.
95
+func (c *Checkout) CommitAndPush(ctx context.Context, commitAction *CommitAction, note *Note) error {
96
+	c.Lock()
97
+	defer c.Unlock()
98
+	if !check(ctx, c.Dir, c.Config.Path) {
99
+		return ErrNoChanges
100
+	}
101
+	if err := commit(ctx, c.Dir, commitAction); err != nil {
102
+		return err
103
+	}
104
+
105
+	if note != nil {
106
+		rev, err := refRevision(ctx, c.Dir, "HEAD")
107
+		if err != nil {
108
+			return err
109
+		}
110
+		if err := addNote(ctx, c.Dir, rev, c.Config.NotesRef, note); err != nil {
111
+			return err
112
+		}
113
+	}
114
+
115
+	refs := []string{c.Config.Branch}
116
+	ok, err := refExists(ctx, c.Dir, c.realNotesRef)
117
+	if ok {
118
+		refs = append(refs, c.realNotesRef)
119
+	} else if err != nil {
120
+		return err
121
+	}
122
+
123
+	if err := push(ctx, c.Dir, c.upstream.URL, refs); err != nil {
124
+		return PushError(c.upstream.URL, err)
125
+	}
126
+	return nil
127
+}
128
+
129
+// GetNote gets a note for the revision specified, or nil if there is no such note.
130
+func (c *Checkout) GetNote(ctx context.Context, rev string) (*Note, error) {
131
+	c.RLock()
132
+	defer c.RUnlock()
133
+	return getNote(ctx, c.Dir, c.realNotesRef, rev)
134
+}
135
+
136
+func (c *Checkout) HeadRevision(ctx context.Context) (string, error) {
137
+	c.RLock()
138
+	defer c.RUnlock()
139
+	return refRevision(ctx, c.Dir, "HEAD")
140
+}
141
+
142
+func (c *Checkout) TagRevision(ctx context.Context, tag string) (string, error) {
143
+	c.RLock()
144
+	defer c.RUnlock()
145
+	return refRevision(ctx, c.Dir, tag)
146
+}
147
+
148
+func (c *Checkout) MoveTagAndPush(ctx context.Context, ref, msg string) error {
149
+	c.Lock()
150
+	defer c.Unlock()
151
+	return moveTagAndPush(ctx, c.Dir, c.Config.SyncTag, ref, msg, c.upstream.URL)
152
+}
153
+
154
+// ChangedFiles does a git diff listing changed files
155
+func (c *Checkout) ChangedFiles(ctx context.Context, ref string) ([]string, error) {
156
+	c.Lock()
157
+	defer c.Unlock()
158
+	list, err := changedFiles(ctx, c.Dir, c.Config.Path, ref)
159
+	if err == nil {
160
+		for i, file := range list {
161
+			list[i] = filepath.Join(c.Dir, file)
162
+		}
163
+	}
164
+	return list, err
165
+}
166
+
167
+func (c *Checkout) NoteRevList(ctx context.Context) (map[string]struct{}, error) {
168
+	c.Lock()
169
+	defer c.Unlock()
170
+	return noteRevList(ctx, c.Dir, c.realNotesRef)
171
+}

+ 1
- 14
sync/sync_test.go View File

@@ -188,20 +188,7 @@ var gitconf = git.Config{
188 188
 }
189 189
 
190 190
 func setup(t *testing.T) (*git.Checkout, func()) {
191
-	// All the mocks, mockity mock.
192
-	repo, cleanupRepo := gittest.Repo(t)
193
-
194
-	// Clone the repo so we can mess with the files
195
-	working, err := repo.Clone(context.Background(), gitconf)
196
-	if err != nil {
197
-		t.Fatal(err)
198
-	}
199
-	cleanup := func() {
200
-		cleanupRepo()
201
-		working.Clean()
202
-	}
203
-
204
-	return working, cleanup
191
+	return gittest.Checkout(t)
205 192
 }
206 193
 
207 194
 func execCommand(cmd string, args ...string) error {

Loading…
Cancel
Save