Browse Source

Provide hook to instruct helm op to sync mirrors

Todo:
- put some authentication (simple user configured key?) in front of
  API route
- make it pretty
- tests
Hidde Beydals 6 months ago
parent
commit
c20e5c84c2

+ 1
- 1
cmd/helm-operator/main.go View File

@@ -186,7 +186,7 @@ func main() {
186 186
 	checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))
187 187
 
188 188
 	// start HTTP server
189
-	go daemonhttp.ListenAndServe(*listenAddr, log.With(logger, "component", "daemonhttp"), shutdown)
189
+	go daemonhttp.ListenAndServe(*listenAddr, chartSync, log.With(logger, "component", "daemonhttp"), shutdown)
190 190
 
191 191
 	// start operator
192 192
 	go func() {

+ 24
- 2
git/mirrors.go View File

@@ -1,7 +1,9 @@
1 1
 package git
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"sync"
6
+	"time"
5 7
 )
6 8
 
7 9
 // Maintains several git mirrors as a set, with a mechanism for
@@ -95,7 +97,7 @@ func (m *Mirrors) Get(name string) (*Repo, bool) {
95 97
 	return nil, false
96 98
 }
97 99
 
98
-// stopAllAndWait stops all the repos refreshing, and waits for them
100
+// StopAllAndWait stops all the repos refreshing, and waits for them
99 101
 // to indicate they've done so.
100 102
 func (m *Mirrors) StopAllAndWait() {
101 103
 	m.reposMu.Lock()
@@ -108,7 +110,7 @@ func (m *Mirrors) StopAllAndWait() {
108 110
 	m.wg.Wait()
109 111
 }
110 112
 
111
-// stopAndRemove stops the repo given by `remote`, and cleans up after
113
+// StopOne stops the repo given by `remote`, and cleans up after
112 114
 // it (i.e., removes filesystem traces), if it is being tracked.
113 115
 func (m *Mirrors) StopOne(name string) {
114 116
 	m.reposMu.Lock()
@@ -120,6 +122,26 @@ func (m *Mirrors) StopOne(name string) {
120 122
 	m.reposMu.Unlock()
121 123
 }
122 124
 
125
+// RefreshAll instructs all the repos to refresh, this means
126
+// fetching updated refs, and associated objects. The given
127
+// timeout is the timeout per mirror and _not_ the timeout
128
+// for the whole operation. It returns a collection of
129
+// eventual errors it encountered.
130
+func (m *Mirrors) RefreshAll(timeout time.Duration) []error {
131
+	m.reposMu.Lock()
132
+	defer m.reposMu.Unlock()
133
+
134
+	var errs []error
135
+	for _, state := range m.repos {
136
+		ctx, cancel := context.WithTimeout(context.Background(), timeout)
137
+		if err := state.repo.Refresh(ctx); err != nil {
138
+			errs = append(errs, err)
139
+		}
140
+		cancel()
141
+	}
142
+	return errs
143
+}
144
+
123 145
 // ---
124 146
 
125 147
 type mirroringState struct {

+ 7
- 0
integrations/helm/api/api.go View File

@@ -0,0 +1,7 @@
1
+package api
2
+
3
+// Server is the interface that must be satisfied in order to serve
4
+// HTTP API requests.
5
+type Server interface {
6
+	SyncMirrors()
7
+}

+ 12
- 0
integrations/helm/chartsync/chartsync.go View File

@@ -59,6 +59,7 @@ import (
59 59
 	fluxv1beta1 "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1"
60 60
 	ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned"
61 61
 	helmop "github.com/weaveworks/flux/integrations/helm"
62
+	"github.com/weaveworks/flux/integrations/helm/api"
62 63
 	"github.com/weaveworks/flux/integrations/helm/release"
63 64
 	"github.com/weaveworks/flux/integrations/helm/status"
64 65
 )
@@ -121,6 +122,8 @@ type ChartChangeSync struct {
121 122
 	namespace string
122 123
 }
123 124
 
125
+var _ api.Server = &ChartChangeSync{}
126
+
124 127
 func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config Config, namespace string) *ChartChangeSync {
125 128
 	return &ChartChangeSync{
126 129
 		logger:     logger,
@@ -402,6 +405,15 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) {
402 405
 	}
403 406
 }
404 407
 
408
+// SyncMirrors instructs all mirrors to refresh from their upstream.
409
+func (chs *ChartChangeSync) SyncMirrors() {
410
+	chs.logger.Log("info", "Starting mirror sync")
411
+	for _, err := range chs.mirrors.RefreshAll(chs.config.GitTimeout) {
412
+		chs.logger.Log("error", fmt.Sprintf("Failure while syncing mirror: %s", err))
413
+	}
414
+	chs.logger.Log("info", "Finished syncing mirrors")
415
+}
416
+
405 417
 // getCustomResources assembles all custom resources in all namespaces
406 418
 // or in the allowed namespace if specified
407 419
 func (chs *ChartChangeSync) getCustomResources() ([]fluxv1beta1.HelmRelease, error) {

+ 36
- 2
integrations/helm/http/daemon/server.go View File

@@ -4,20 +4,30 @@ import (
4 4
 	"context"
5 5
 	"fmt"
6 6
 	"github.com/go-kit/kit/log"
7
+	"github.com/gorilla/mux"
7 8
 	"github.com/prometheus/client_golang/prometheus/promhttp"
9
+	"github.com/weaveworks/flux/integrations/helm/api"
10
+	transport "github.com/weaveworks/flux/integrations/helm/http"
8 11
 	"net/http"
9 12
 	"time"
10 13
 )
11 14
 
12
-// ListenAndServe starts a HTTP server instrumented with Prometheus on the specified address
13
-func ListenAndServe(listenAddr string, logger log.Logger, stopCh <-chan struct{}) {
15
+// ListenAndServe starts a HTTP server instrumented with Prometheus metrics,
16
+// health and API endpoints on the specified address.
17
+func ListenAndServe(listenAddr string, apiServer api.Server, logger log.Logger, stopCh <-chan struct{}) {
14 18
 	mux := http.DefaultServeMux
19
+
20
+	// setup metrics and health endpoints
15 21
 	mux.Handle("/metrics", promhttp.Handler())
16 22
 	mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
17 23
 		w.WriteHeader(http.StatusOK)
18 24
 		w.Write([]byte("OK"))
19 25
 	})
20 26
 
27
+	// setup api endpoints
28
+	handler := NewHandler(apiServer, transport.NewRouter())
29
+	mux.Handle("/api/", http.StripPrefix("/api", handler))
30
+
21 31
 	srv := &http.Server{
22 32
 		Addr:         listenAddr,
23 33
 		Handler:      mux,
@@ -46,3 +56,27 @@ func ListenAndServe(listenAddr string, logger log.Logger, stopCh <-chan struct{}
46 56
 		logger.Log("info", "HTTP server stopped")
47 57
 	}
48 58
 }
59
+
60
+// NewHandler registers handlers on the given router.
61
+func NewHandler(s api.Server, r *mux.Router) http.Handler {
62
+	handle := APIServer{s}
63
+	r.Get(transport.SyncGit).HandlerFunc(handle.SyncGit)
64
+	return r
65
+}
66
+
67
+type APIServer struct {
68
+	server api.Server
69
+}
70
+
71
+// SyncGit starts a goroutine in the background to sync all git mirrors
72
+// and writes back a HTTP 200 status header and 'OK' body to inform it
73
+// has been started.
74
+// TODO(hidde): in the future we may want to give users the option to
75
+// request the status after it has been started. The Flux (daemon) API
76
+// archives this by working with jobs whos IDs can be tracked.
77
+func (s APIServer) SyncGit(w http.ResponseWriter, r *http.Request) {
78
+	go s.server.SyncMirrors()
79
+
80
+	w.WriteHeader(http.StatusOK)
81
+	w.Write([]byte("OK"))
82
+}

+ 5
- 0
integrations/helm/http/routes.go View File

@@ -0,0 +1,5 @@
1
+package http
2
+
3
+const (
4
+	SyncGit = "SyncGit"
5
+)

+ 13
- 0
integrations/helm/http/transport.go View File

@@ -0,0 +1,13 @@
1
+package http
2
+
3
+import (
4
+	"github.com/gorilla/mux"
5
+)
6
+
7
+// NewRouter creates a new routeri nstance, registers all API routes
8
+// and returns it.
9
+func NewRouter() *mux.Router {
10
+	r := mux.NewRouter()
11
+	r.NewRoute().Name(SyncGit).Methods("POST").Path("/v1/sync-git")
12
+	return r
13
+}

+ 2
- 2
integrations/helm/operator/operator.go View File

@@ -109,7 +109,7 @@ func New(
109 109
 			}
110 110
 		},
111 111
 		UpdateFunc: func(old, new interface{}) {
112
-			controller.enqueueUpateJob(old, new)
112
+			controller.enqueueUpdateJob(old, new)
113 113
 		},
114 114
 		DeleteFunc: func(old interface{}) {
115 115
 			fhr, ok := checkCustomResourceType(controller.logger, old)
@@ -287,7 +287,7 @@ func (c *Controller) enqueueJob(obj interface{}) {
287 287
 }
288 288
 
289 289
 // enqueueUpdateJob decides if there is a genuine resource update
290
-func (c *Controller) enqueueUpateJob(old, new interface{}) {
290
+func (c *Controller) enqueueUpdateJob(old, new interface{}) {
291 291
 	oldFhr, ok := checkCustomResourceType(c.logger, old)
292 292
 	if !ok {
293 293
 		return

Loading…
Cancel
Save