GitOps for k8s
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

upstream.go 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package daemon
  2. import (
  3. "context"
  4. "net/http"
  5. "net/url"
  6. "os"
  7. "time"
  8. "github.com/go-kit/kit/log"
  9. "github.com/go-kit/kit/metrics/prometheus"
  10. "github.com/gorilla/mux"
  11. "github.com/pkg/errors"
  12. stdprometheus "github.com/prometheus/client_golang/prometheus"
  13. "github.com/weaveworks/flux/api"
  14. "github.com/weaveworks/flux/event"
  15. transport "github.com/weaveworks/flux/http"
  16. fluxclient "github.com/weaveworks/flux/http/client"
  17. "github.com/weaveworks/flux/http/websocket"
  18. "github.com/weaveworks/flux/remote/rpc"
  19. )
  20. // Upstream handles communication from the daemon to a service
  21. type Upstream struct {
  22. client *http.Client
  23. ua string
  24. token fluxclient.Token
  25. url *url.URL
  26. endpoint string
  27. apiClient *fluxclient.Client
  28. server api.UpstreamServer
  29. logger log.Logger
  30. quit chan struct{}
  31. ws websocket.Websocket
  32. }
  33. var (
  34. ErrEndpointDeprecated = errors.New("Your fluxd version is deprecated - please upgrade, see https://github.com/weaveworks/flux/releases")
  35. connectionDuration = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
  36. Namespace: "flux",
  37. Subsystem: "fluxd",
  38. Name: "connection_duration_seconds",
  39. Help: "Duration in seconds of the current connection to fluxsvc. Zero means unconnected.",
  40. }, []string{"target"})
  41. )
  42. func NewUpstream(client *http.Client, ua string, t fluxclient.Token, router *mux.Router, endpoint string, s api.UpstreamServer, logger log.Logger) (*Upstream, error) {
  43. httpEndpoint, wsEndpoint, err := inferEndpoints(endpoint)
  44. if err != nil {
  45. return nil, errors.Wrap(err, "inferring WS/HTTP endpoints")
  46. }
  47. u, err := transport.MakeURL(wsEndpoint, router, "RegisterDaemonV9")
  48. if err != nil {
  49. return nil, errors.Wrap(err, "constructing URL")
  50. }
  51. a := &Upstream{
  52. client: client,
  53. ua: ua,
  54. token: t,
  55. url: u,
  56. endpoint: wsEndpoint,
  57. apiClient: fluxclient.New(client, router, httpEndpoint, t),
  58. server: s,
  59. logger: logger,
  60. quit: make(chan struct{}),
  61. }
  62. go a.loop()
  63. return a, nil
  64. }
  65. func inferEndpoints(endpoint string) (httpEndpoint, wsEndpoint string, err error) {
  66. endpointURL, err := url.Parse(endpoint)
  67. if err != nil {
  68. return "", "", errors.Wrapf(err, "parsing endpoint %s", endpoint)
  69. }
  70. switch endpointURL.Scheme {
  71. case "ws":
  72. httpURL := *endpointURL
  73. httpURL.Scheme = "http"
  74. return httpURL.String(), endpointURL.String(), nil
  75. case "wss":
  76. httpURL := *endpointURL
  77. httpURL.Scheme = "https"
  78. return httpURL.String(), endpointURL.String(), nil
  79. case "http":
  80. wsURL := *endpointURL
  81. wsURL.Scheme = "ws"
  82. return endpointURL.String(), wsURL.String(), nil
  83. case "https":
  84. wsURL := *endpointURL
  85. wsURL.Scheme = "wss"
  86. return endpointURL.String(), wsURL.String(), nil
  87. default:
  88. return "", "", errors.Errorf("unsupported scheme %s", endpointURL.Scheme)
  89. }
  90. }
  91. func (a *Upstream) loop() {
  92. backoff := 5 * time.Second
  93. errc := make(chan error, 1)
  94. for {
  95. go func() {
  96. errc <- a.connect()
  97. }()
  98. select {
  99. case err := <-errc:
  100. if err != nil {
  101. a.logger.Log("err", err)
  102. if err == ErrEndpointDeprecated {
  103. // We have logged the deprecation error, now crashloop to garner attention
  104. os.Exit(1)
  105. }
  106. }
  107. time.Sleep(backoff)
  108. case <-a.quit:
  109. return
  110. }
  111. }
  112. }
  113. func (a *Upstream) connect() error {
  114. a.setConnectionDuration(0)
  115. a.logger.Log("connecting", true)
  116. ws, err := websocket.Dial(a.client, a.ua, a.token, a.url)
  117. if err != nil {
  118. if err, ok := err.(*websocket.DialErr); ok && err.HTTPResponse != nil && err.HTTPResponse.StatusCode == http.StatusGone {
  119. return ErrEndpointDeprecated
  120. }
  121. return errors.Wrapf(err, "executing websocket %s", a.url)
  122. }
  123. a.ws = ws
  124. defer func() {
  125. a.ws = nil
  126. // TODO: handle this error
  127. a.logger.Log("connection closing", true, "err", ws.Close())
  128. }()
  129. a.logger.Log("connected", true)
  130. // Instrument connection lifespan
  131. connectedAt := time.Now()
  132. disconnected := make(chan struct{})
  133. defer close(disconnected)
  134. go func() {
  135. t := time.NewTicker(1 * time.Second)
  136. for {
  137. select {
  138. case now := <-t.C:
  139. a.setConnectionDuration(now.Sub(connectedAt).Seconds())
  140. case <-disconnected:
  141. t.Stop()
  142. a.setConnectionDuration(0)
  143. return
  144. }
  145. }
  146. }()
  147. // Hook up the rpc server. We are a websocket _client_, but an RPC
  148. // _server_.
  149. rpcserver, err := rpc.NewServer(a.server)
  150. if err != nil {
  151. return errors.Wrap(err, "initializing rpc server")
  152. }
  153. rpcserver.ServeConn(ws)
  154. a.logger.Log("disconnected", true)
  155. return nil
  156. }
  157. func (a *Upstream) setConnectionDuration(duration float64) {
  158. connectionDuration.With("target", a.endpoint).Set(duration)
  159. }
  160. func (a *Upstream) LogEvent(event event.Event) error {
  161. return a.apiClient.LogEvent(context.TODO(), event)
  162. }
  163. // Close closes the connection to the service
  164. func (a *Upstream) Close() error {
  165. close(a.quit)
  166. if a.ws == nil {
  167. return nil
  168. }
  169. return a.ws.Close()
  170. }