Browse Source

Move Upstream methods into public API

This commit moves the Upstream methods (Ping, etc., available only
over RPC) to the Server v11 interface, and abandons the Upstream
interface. This is so that any client, including
weaveworks/flux-adapter, can access these methods.

It also updates the upstream registration point to v11 -- an
incidental but related change.
Michael Bridgen 2 months ago
parent
commit
cf792287bb

+ 0
- 7
api/api.go View File

@@ -8,10 +8,3 @@ import "github.com/weaveworks/flux/api/v11"
8 8
 type Server interface {
9 9
 	v11.Server
10 10
 }
11
-
12
-// UpstreamServer is the interface a Flux must satisfy in order to communicate with
13
-// Weave Cloud.
14
-type UpstreamServer interface {
15
-	v11.Server
16
-	v11.Upstream
17
-}

+ 2
- 2
api/v11/api.go View File

@@ -18,8 +18,8 @@ type Server interface {
18 18
 	v10.Server
19 19
 
20 20
 	ListServicesWithOptions(ctx context.Context, opts ListServicesOptions) ([]v6.ControllerStatus, error)
21
-}
22 21
 
23
-type Upstream interface {
22
+	// NB Upstream methods move into the public API, since
23
+	// weaveworks/flux-adapter now relies on the public API
24 24
 	v10.Upstream
25 25
 }

+ 1
- 1
cmd/fluxd/main.go View File

@@ -609,7 +609,7 @@ func main() {
609 609
 				client.Token(*token),
610 610
 				transport.NewUpstreamRouter(),
611 611
 				*upstreamURL,
612
-				remote.NewErrorLoggingUpstreamServer(daemon, upstreamLogger),
612
+				remote.NewErrorLoggingServer(daemon, upstreamLogger),
613 613
 				*rpcTimeout,
614 614
 				upstreamLogger,
615 615
 			)

+ 22
- 5
http/client/client.go View File

@@ -16,6 +16,7 @@ import (
16 16
 	"github.com/weaveworks/flux/api/v10"
17 17
 	"github.com/weaveworks/flux/api/v11"
18 18
 	"github.com/weaveworks/flux/api/v6"
19
+	"github.com/weaveworks/flux/api/v9"
19 20
 	fluxerr "github.com/weaveworks/flux/errors"
20 21
 	"github.com/weaveworks/flux/event"
21 22
 	transport "github.com/weaveworks/flux/http"
@@ -53,6 +54,20 @@ func New(c *http.Client, router *mux.Router, endpoint string, t Token) *Client {
53 54
 	}
54 55
 }
55 56
 
57
+func (c *Client) Ping(ctx context.Context) error {
58
+	return c.Get(ctx, nil, transport.Ping)
59
+}
60
+
61
+func (c *Client) Version(ctx context.Context) (string, error) {
62
+	var v string
63
+	err := c.Get(ctx, &v, transport.Version)
64
+	return v, err
65
+}
66
+
67
+func (c *Client) NotifyChange(ctx context.Context, change v9.Change) error {
68
+	return c.PostWithBody(ctx, transport.Notify, change)
69
+}
70
+
56 71
 func (c *Client) ListServices(ctx context.Context, namespace string) ([]v6.ControllerStatus, error) {
57 72
 	var res []v6.ControllerStatus
58 73
 	err := c.Get(ctx, &res, transport.ListServices, "namespace", namespace)
@@ -117,7 +132,7 @@ func (c *Client) GitRepoConfig(ctx context.Context, regenerate bool) (v6.GitConf
117 132
 
118 133
 // --- Request helpers
119 134
 
120
-// post is a simple query-param only post request
135
+// Post is a simple query-param only post request
121 136
 func (c *Client) Post(ctx context.Context, route string, queryParams ...string) error {
122 137
 	return c.PostWithBody(ctx, route, nil, queryParams...)
123 138
 }
@@ -178,7 +193,7 @@ func (c *Client) methodWithResp(ctx context.Context, method string, dest interfa
178 193
 	return nil
179 194
 }
180 195
 
181
-// get executes a get request against the Flux server. it unmarshals the response into dest.
196
+// Get executes a get request against the Flux server. it unmarshals the response into dest, if not nil.
182 197
 func (c *Client) Get(ctx context.Context, dest interface{}, route string, queryParams ...string) error {
183 198
 	u, err := transport.MakeURL(c.endpoint, c.router, route, queryParams...)
184 199
 	if err != nil {
@@ -200,8 +215,10 @@ func (c *Client) Get(ctx context.Context, dest interface{}, route string, queryP
200 215
 	}
201 216
 	defer resp.Body.Close()
202 217
 
203
-	if err := json.NewDecoder(resp.Body).Decode(dest); err != nil {
204
-		return errors.Wrap(err, "decoding response from server")
218
+	if dest != nil {
219
+		if err := json.NewDecoder(resp.Body).Decode(dest); err != nil {
220
+			return errors.Wrap(err, "decoding response from server")
221
+		}
205 222
 	}
206 223
 	return nil
207 224
 }
@@ -212,7 +229,7 @@ func (c *Client) executeRequest(req *http.Request) (*http.Response, error) {
212 229
 		return nil, errors.Wrap(err, "executing HTTP request")
213 230
 	}
214 231
 	switch resp.StatusCode {
215
-	case http.StatusOK, http.StatusCreated, http.StatusNoContent:
232
+	case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
216 233
 		return resp, nil
217 234
 	case http.StatusUnauthorized:
218 235
 		return resp, transport.ErrorUnauthorized

+ 41
- 0
http/daemon/server.go View File

@@ -13,6 +13,7 @@ import (
13 13
 	"github.com/weaveworks/flux/api"
14 14
 	"github.com/weaveworks/flux/api/v10"
15 15
 	"github.com/weaveworks/flux/api/v11"
16
+	"github.com/weaveworks/flux/api/v9"
16 17
 	transport "github.com/weaveworks/flux/http"
17 18
 	"github.com/weaveworks/flux/job"
18 19
 	fluxmetrics "github.com/weaveworks/flux/metrics"
@@ -47,6 +48,13 @@ func NewRouter() *mux.Router {
47 48
 
48 49
 func NewHandler(s api.Server, r *mux.Router) http.Handler {
49 50
 	handle := HTTPServer{s}
51
+
52
+	// Erstwhile Upstream(Server) methods, now part of v11
53
+	r.Get(transport.Ping).HandlerFunc(handle.Ping)
54
+	r.Get(transport.Version).HandlerFunc(handle.Version)
55
+	r.Get(transport.Notify).HandlerFunc(handle.Notify)
56
+
57
+	// v6-v11 handlers
50 58
 	r.Get(transport.ListServices).HandlerFunc(handle.ListServicesWithOptions)
51 59
 	r.Get(transport.ListServicesWithOptions).HandlerFunc(handle.ListServicesWithOptions)
52 60
 	r.Get(transport.ListImages).HandlerFunc(handle.ListImagesWithOptions)
@@ -74,6 +82,39 @@ type HTTPServer struct {
74 82
 	server api.Server
75 83
 }
76 84
 
85
+func (s HTTPServer) Ping(w http.ResponseWriter, r *http.Request) {
86
+	if err := s.server.Ping(r.Context()); err != nil {
87
+		transport.ErrorResponse(w, r, err)
88
+		return
89
+	}
90
+	w.WriteHeader(http.StatusNoContent)
91
+	return
92
+}
93
+
94
+func (s HTTPServer) Version(w http.ResponseWriter, r *http.Request) {
95
+	version, err := s.server.Version(r.Context())
96
+	if err != nil {
97
+		transport.ErrorResponse(w, r, err)
98
+		return
99
+	}
100
+	transport.JSONResponse(w, r, version)
101
+}
102
+
103
+func (s HTTPServer) Notify(w http.ResponseWriter, r *http.Request) {
104
+	var change v9.Change
105
+	defer r.Body.Close()
106
+
107
+	if err := json.NewDecoder(r.Body).Decode(&change); err != nil {
108
+		transport.WriteError(w, r, http.StatusBadRequest, err)
109
+		return
110
+	}
111
+	if err := s.server.NotifyChange(r.Context(), change); err != nil {
112
+		transport.ErrorResponse(w, r, err)
113
+		return
114
+	}
115
+	w.WriteHeader(http.StatusAccepted)
116
+}
117
+
77 118
 func (s HTTPServer) JobStatus(w http.ResponseWriter, r *http.Request) {
78 119
 	id := job.ID(mux.Vars(r)["id"])
79 120
 	status, err := s.server.JobStatus(r.Context(), id)

+ 3
- 3
http/daemon/upstream.go View File

@@ -32,7 +32,7 @@ type Upstream struct {
32 32
 	url       *url.URL
33 33
 	endpoint  string
34 34
 	apiClient *fluxclient.Client
35
-	server    api.UpstreamServer
35
+	server    api.Server
36 36
 	timeout   time.Duration
37 37
 	logger    log.Logger
38 38
 	quit      chan struct{}
@@ -50,13 +50,13 @@ var (
50 50
 	}, []string{"target"})
51 51
 )
52 52
 
53
-func NewUpstream(client *http.Client, ua string, t fluxclient.Token, router *mux.Router, endpoint string, s api.UpstreamServer, timeout time.Duration, logger log.Logger) (*Upstream, error) {
53
+func NewUpstream(client *http.Client, ua string, t fluxclient.Token, router *mux.Router, endpoint string, s api.Server, timeout time.Duration, logger log.Logger) (*Upstream, error) {
54 54
 	httpEndpoint, wsEndpoint, err := inferEndpoints(endpoint)
55 55
 	if err != nil {
56 56
 		return nil, errors.Wrap(err, "inferring WS/HTTP endpoints")
57 57
 	}
58 58
 
59
-	u, err := transport.MakeURL(wsEndpoint, router, transport.RegisterDaemonV10)
59
+	u, err := transport.MakeURL(wsEndpoint, router, transport.RegisterDaemonV11)
60 60
 	if err != nil {
61 61
 		return nil, errors.Wrap(err, "constructing URL")
62 62
 	}

+ 5
- 0
http/routes.go View File

@@ -1,6 +1,11 @@
1 1
 package http
2 2
 
3 3
 const (
4
+	// Formerly Upstream methods, now (in v11) included in server API
5
+	Ping    = "Ping"
6
+	Version = "Version"
7
+	Notify  = "Notify"
8
+
4 9
 	ListServices            = "ListServices"
5 10
 	ListServicesWithOptions = "ListServicesWithOptions"
6 11
 	ListImages              = "ListImages"

+ 4
- 0
http/transport.go View File

@@ -29,6 +29,10 @@ func DeprecateVersions(r *mux.Router, versions ...string) {
29 29
 func NewAPIRouter() *mux.Router {
30 30
 	r := mux.NewRouter()
31 31
 
32
+	r.NewRoute().Name(Ping).Methods("GET").Path("/v11/ping")
33
+	r.NewRoute().Name(Version).Methods("GET").Path("/v11/version")
34
+	r.NewRoute().Name(Notify).Methods("POST").Path("/v11/notify")
35
+
32 36
 	r.NewRoute().Name(ListServices).Methods("GET").Path("/v6/services")
33 37
 	r.NewRoute().Name(ListServicesWithOptions).Methods("GET").Path("/v11/services")
34 38
 	r.NewRoute().Name(ListImages).Methods("GET").Path("/v6/images")

+ 3
- 16
remote/logging.go View File

@@ -15,7 +15,6 @@ import (
15 15
 )
16 16
 
17 17
 var _ api.Server = &ErrorLoggingServer{}
18
-var _ api.UpstreamServer = &ErrorLoggingUpstreamServer{}
19 18
 
20 19
 type ErrorLoggingServer struct {
21 20
 	server api.Server
@@ -108,19 +107,7 @@ func (p *ErrorLoggingServer) GitRepoConfig(ctx context.Context, regenerate bool)
108 107
 	return p.server.GitRepoConfig(ctx, regenerate)
109 108
 }
110 109
 
111
-type ErrorLoggingUpstreamServer struct {
112
-	*ErrorLoggingServer
113
-	server api.UpstreamServer
114
-}
115
-
116
-func NewErrorLoggingUpstreamServer(s api.UpstreamServer, l log.Logger) *ErrorLoggingUpstreamServer {
117
-	return &ErrorLoggingUpstreamServer{
118
-		NewErrorLoggingServer(s, l),
119
-		s,
120
-	}
121
-}
122
-
123
-func (p *ErrorLoggingUpstreamServer) Ping(ctx context.Context) (err error) {
110
+func (p *ErrorLoggingServer) Ping(ctx context.Context) (err error) {
124 111
 	defer func() {
125 112
 		if err != nil {
126 113
 			p.logger.Log("method", "Ping", "error", err)
@@ -129,7 +116,7 @@ func (p *ErrorLoggingUpstreamServer) Ping(ctx context.Context) (err error) {
129 116
 	return p.server.Ping(ctx)
130 117
 }
131 118
 
132
-func (p *ErrorLoggingUpstreamServer) Version(ctx context.Context) (v string, err error) {
119
+func (p *ErrorLoggingServer) Version(ctx context.Context) (v string, err error) {
133 120
 	defer func() {
134 121
 		if err != nil {
135 122
 			p.logger.Log("method", "Version", "error", err, "version", v)
@@ -138,7 +125,7 @@ func (p *ErrorLoggingUpstreamServer) Version(ctx context.Context) (v string, err
138 125
 	return p.server.Version(ctx)
139 126
 }
140 127
 
141
-func (p *ErrorLoggingUpstreamServer) NotifyChange(ctx context.Context, change v9.Change) (err error) {
128
+func (p *ErrorLoggingServer) NotifyChange(ctx context.Context, change v9.Change) (err error) {
142 129
 	defer func() {
143 130
 		if err != nil {
144 131
 			p.logger.Log("method", "NotifyChange", "error", err)

+ 3
- 17
remote/metrics.go View File

@@ -128,21 +128,7 @@ func (i *instrumentedServer) GitRepoConfig(ctx context.Context, regenerate bool)
128 128
 	return i.s.GitRepoConfig(ctx, regenerate)
129 129
 }
130 130
 
131
-var _ api.UpstreamServer = &instrumentedUpstreamServer{}
132
-
133
-type instrumentedUpstreamServer struct {
134
-	*instrumentedServer
135
-	s api.UpstreamServer
136
-}
137
-
138
-func InstrumentUpstream(s api.UpstreamServer) *instrumentedUpstreamServer {
139
-	return &instrumentedUpstreamServer{
140
-		Instrument(s),
141
-		s,
142
-	}
143
-}
144
-
145
-func (i *instrumentedUpstreamServer) Ping(ctx context.Context) (err error) {
131
+func (i *instrumentedServer) Ping(ctx context.Context) (err error) {
146 132
 	defer func(begin time.Time) {
147 133
 		requestDuration.With(
148 134
 			fluxmetrics.LabelMethod, "Ping",
@@ -152,7 +138,7 @@ func (i *instrumentedUpstreamServer) Ping(ctx context.Context) (err error) {
152 138
 	return i.s.Ping(ctx)
153 139
 }
154 140
 
155
-func (i *instrumentedUpstreamServer) Version(ctx context.Context) (v string, err error) {
141
+func (i *instrumentedServer) Version(ctx context.Context) (v string, err error) {
156 142
 	defer func(begin time.Time) {
157 143
 		requestDuration.With(
158 144
 			fluxmetrics.LabelMethod, "Version",
@@ -162,7 +148,7 @@ func (i *instrumentedUpstreamServer) Version(ctx context.Context) (v string, err
162 148
 	return i.s.Version(ctx)
163 149
 }
164 150
 
165
-func (i *instrumentedUpstreamServer) NotifyChange(ctx context.Context, change v9.Change) (err error) {
151
+func (i *instrumentedServer) NotifyChange(ctx context.Context, change v9.Change) (err error) {
166 152
 	defer func(begin time.Time) {
167 153
 		requestDuration.With(
168 154
 			fluxmetrics.LabelMethod, "NotifyChange",

+ 2
- 2
remote/mock.go View File

@@ -104,13 +104,13 @@ func (p *MockServer) GitRepoConfig(ctx context.Context, regenerate bool) (v6.Git
104 104
 	return p.GitRepoConfigAnswer, p.GitRepoConfigError
105 105
 }
106 106
 
107
-var _ api.UpstreamServer = &MockServer{}
107
+var _ api.Server = &MockServer{}
108 108
 
109 109
 // -- Battery of tests for an api.Server implementation. Since these
110 110
 // essentially wrap the server in various transports, we expect
111 111
 // arguments and answers to be preserved.
112 112
 
113
-func ServerTestBattery(t *testing.T, wrap func(mock api.UpstreamServer) api.UpstreamServer) {
113
+func ServerTestBattery(t *testing.T, wrap func(mock api.Server) api.Server) {
114 114
 	// set up
115 115
 	namespace := "the-space-of-names"
116 116
 	serviceID := resource.MustParseID(namespace + "/service")

+ 1
- 1
remote/mock_test.go View File

@@ -8,5 +8,5 @@ import (
8 8
 
9 9
 // Just test that the mock does its job.
10 10
 func TestMock(t *testing.T) {
11
-	ServerTestBattery(t, func(mock api.UpstreamServer) api.UpstreamServer { return mock })
11
+	ServerTestBattery(t, func(mock api.Server) api.Server { return mock })
12 12
 }

+ 1
- 1
remote/rpc/baseclient.go View File

@@ -17,7 +17,7 @@ import (
17 17
 
18 18
 type baseClient struct{}
19 19
 
20
-var _ api.UpstreamServer = baseClient{}
20
+var _ api.Server = baseClient{}
21 21
 
22 22
 func (bc baseClient) Version(context.Context) (string, error) {
23 23
 	return "", remote.UpgradeNeededError(errors.New("Version method not implemented"))

+ 0
- 1
remote/rpc/clientV11.go View File

@@ -19,7 +19,6 @@ type RPCClientV11 struct {
19 19
 
20 20
 type clientV11 interface {
21 21
 	v11.Server
22
-	v11.Upstream
23 22
 }
24 23
 
25 24
 var _ clientV11 = &RPCClientV11{}

+ 1
- 1
remote/rpc/rpc_test.go View File

@@ -24,7 +24,7 @@ func pipes() (io.ReadWriteCloser, io.ReadWriteCloser) {
24 24
 }
25 25
 
26 26
 func TestRPC(t *testing.T) {
27
-	wrap := func(mock api.UpstreamServer) api.UpstreamServer {
27
+	wrap := func(mock api.Server) api.Server {
28 28
 		clientConn, serverConn := pipes()
29 29
 
30 30
 		server, err := NewServer(mock, 10*time.Second)

+ 2
- 2
remote/rpc/server.go View File

@@ -26,7 +26,7 @@ type Server struct {
26 26
 
27 27
 // NewServer instantiates a new RPC server, handling requests on the
28 28
 // conn by invoking methods on the underlying (assumed local) server.
29
-func NewServer(s api.UpstreamServer, t time.Duration) (*Server, error) {
29
+func NewServer(s api.Server, t time.Duration) (*Server, error) {
30 30
 	server := rpc.NewServer()
31 31
 	if err := server.Register(&RPCServer{s, t}); err != nil {
32 32
 		return nil, err
@@ -39,7 +39,7 @@ func (c *Server) ServeConn(conn io.ReadWriteCloser) {
39 39
 }
40 40
 
41 41
 type RPCServer struct {
42
-	s       api.UpstreamServer
42
+	s       api.Server
43 43
 	timeout time.Duration
44 44
 }
45 45
 

Loading…
Cancel
Save