GitOps for k8s
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

operator.go 9.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package operator
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/go-kit/kit/log"
  8. "github.com/google/go-cmp/cmp"
  9. corev1 "k8s.io/api/core/v1"
  10. k8serrors "k8s.io/apimachinery/pkg/api/errors"
  11. "k8s.io/apimachinery/pkg/util/runtime"
  12. "k8s.io/apimachinery/pkg/util/wait"
  13. "k8s.io/client-go/kubernetes"
  14. "k8s.io/client-go/kubernetes/scheme"
  15. typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
  16. "k8s.io/client-go/tools/cache"
  17. "k8s.io/client-go/tools/record"
  18. "k8s.io/client-go/util/workqueue"
  19. flux_v1beta1 "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1"
  20. ifscheme "github.com/weaveworks/flux/integrations/client/clientset/versioned/scheme"
  21. fhrv1 "github.com/weaveworks/flux/integrations/client/informers/externalversions/flux.weave.works/v1beta1"
  22. iflister "github.com/weaveworks/flux/integrations/client/listers/flux.weave.works/v1beta1"
  23. "github.com/weaveworks/flux/integrations/helm/chartsync"
  24. )
  25. const (
  26. controllerAgentName = "helm-operator"
  27. CacheSyncTimeout = 180 * time.Second
  28. )
  29. const (
  30. // ChartSynced is used as part of the Event 'reason' when the Chart related to the
  31. // a HelmRelease gets released/updated
  32. ChartSynced = "ChartSynced"
  33. // ErrChartSync is used as part of the Event 'reason' when the related Chart related to the
  34. // a HelmRelease fails to be released/updated
  35. ErrChartSync = "ErrChartSync"
  36. // MessageChartSynced - the message used for an Event fired when a HelmRelease
  37. // is synced.
  38. MessageChartSynced = "Chart managed by HelmRelease processed"
  39. )
  40. // Controller is the operator implementation for HelmRelease resources
  41. type Controller struct {
  42. logger log.Logger
  43. logDiffs bool
  44. fhrLister iflister.HelmReleaseLister
  45. fhrSynced cache.InformerSynced
  46. sync *chartsync.ChartChangeSync
  47. // workqueue is a rate limited work queue. This is used to queue work to be
  48. // processed instead of performing it as soon as a change happens. This
  49. // means we can ensure we only process a fixed amount of resources at a
  50. // time, and makes it easy to ensure we are never processing the same item
  51. // simultaneously in two different workers.
  52. releaseWorkqueue workqueue.RateLimitingInterface
  53. // recorder is an event recorder for recording Event resources to the
  54. // Kubernetes API.
  55. recorder record.EventRecorder
  56. }
  57. // New returns a new helm-operator
  58. func New(
  59. logger log.Logger,
  60. logReleaseDiffs bool,
  61. kubeclientset kubernetes.Interface,
  62. fhrInformer fhrv1.HelmReleaseInformer,
  63. releaseWorkqueue workqueue.RateLimitingInterface,
  64. sync *chartsync.ChartChangeSync) *Controller {
  65. // Add helm-operator types to the default Kubernetes Scheme so Events can be
  66. // logged for helm-operator types.
  67. ifscheme.AddToScheme(scheme.Scheme)
  68. eventBroadcaster := record.NewBroadcaster()
  69. eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
  70. recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
  71. controller := &Controller{
  72. logger: logger,
  73. logDiffs: logReleaseDiffs,
  74. fhrLister: fhrInformer.Lister(),
  75. fhrSynced: fhrInformer.Informer().HasSynced,
  76. releaseWorkqueue: releaseWorkqueue,
  77. recorder: recorder,
  78. sync: sync,
  79. }
  80. controller.logger.Log("info", "setting up event handlers")
  81. // ----- EVENT HANDLERS for HelmRelease resources change ---------
  82. fhrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  83. AddFunc: func(new interface{}) {
  84. _, ok := checkCustomResourceType(controller.logger, new)
  85. if ok {
  86. controller.enqueueJob(new)
  87. }
  88. },
  89. UpdateFunc: func(old, new interface{}) {
  90. controller.enqueueUpdateJob(old, new)
  91. },
  92. DeleteFunc: func(old interface{}) {
  93. fhr, ok := checkCustomResourceType(controller.logger, old)
  94. if ok {
  95. controller.deleteRelease(fhr)
  96. }
  97. },
  98. })
  99. controller.logger.Log("info", "event handlers set up")
  100. return controller
  101. }
  102. // Run sets up the event handlers for our Custom Resource, as well
  103. // as syncing informer caches and starting workers. It will block until stopCh
  104. // is closed, at which point it will shutdown the workqueue and wait for
  105. // workers to finish processing their current work items.
  106. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
  107. defer runtime.HandleCrash()
  108. defer c.releaseWorkqueue.ShutDown()
  109. c.logger.Log("info", "starting operator")
  110. // Wait for the caches to be synced before starting workers
  111. c.logger.Log("info", "waiting for informer caches to sync")
  112. if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok {
  113. return errors.New("failed to wait for caches to sync")
  114. }
  115. c.logger.Log("info", "unformer caches synced")
  116. c.logger.Log("info", "starting workers")
  117. for i := 0; i < threadiness; i++ {
  118. wg.Add(1)
  119. go wait.Until(c.runWorker, time.Second, stopCh)
  120. }
  121. <-stopCh
  122. for i := 0; i < threadiness; i++ {
  123. wg.Done()
  124. }
  125. c.logger.Log("info", "stopping workers")
  126. return nil
  127. }
  128. // runWorker is a long-running function calling the
  129. // processNextWorkItem function to read and process a message
  130. // on a workqueue.
  131. func (c *Controller) runWorker() {
  132. for c.processNextWorkItem() {
  133. }
  134. }
  135. // processNextWorkItem will read a single work item off the workqueue and
  136. // attempt to process it, by calling the syncHandler.
  137. func (c *Controller) processNextWorkItem() bool {
  138. obj, shutdown := c.releaseWorkqueue.Get()
  139. if shutdown {
  140. return false
  141. }
  142. // wrapping block in a func to defer c.workqueue.Done
  143. err := func(obj interface{}) error {
  144. // We call Done here so the workqueue knows we have finished
  145. // processing this item. We must call Forget if we do not want
  146. // this work item being re-queued. If a transient error
  147. // occurs, we do not call Forget. Instead the item is put back
  148. // on the workqueue and attempted again after a back-off
  149. // period.
  150. defer c.releaseWorkqueue.Done(obj)
  151. var key string
  152. var ok bool
  153. // We expect strings to come off the workqueue. These are of the
  154. // form "namespace/fhr(custom resource) name". We do this as the delayed nature of the
  155. // workqueue means the items in the informer cache may actually be
  156. // more up to date than when the item was initially put onto the
  157. // workqueue.
  158. if key, ok = obj.(string); !ok {
  159. // As the item in the workqueue is actually invalid, we call
  160. // Forget not to get into a loop of attempting to
  161. // process a work item that is invalid.
  162. c.releaseWorkqueue.Forget(obj)
  163. runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
  164. return nil
  165. }
  166. // Run the syncHandler, passing it the namespace/name string of the
  167. // HelmRelease resource to sync the corresponding Chart release.
  168. // If the sync failed, then we return while the item will get requeued
  169. if err := c.syncHandler(key); err != nil {
  170. return fmt.Errorf("errored syncing HelmRelease '%s': %s", key, err.Error())
  171. }
  172. // If no error occurs we Forget this item so it does not
  173. // get queued again until another change happens.
  174. c.releaseWorkqueue.Forget(obj)
  175. return nil
  176. }(obj)
  177. if err != nil {
  178. runtime.HandleError(err)
  179. return true
  180. }
  181. return true
  182. }
  183. // syncHandler acts according to the action
  184. // Deletes/creates or updates a Chart release
  185. func (c *Controller) syncHandler(key string) error {
  186. // Retrieve namespace and Custom Resource name from the key
  187. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  188. if err != nil {
  189. c.logger.Log("error", fmt.Sprintf("key '%s' is invalid: %v", key, err))
  190. runtime.HandleError(fmt.Errorf("key '%s' is invalid", key))
  191. return nil
  192. }
  193. // Custom Resource fhr contains all information we need to know about the Chart release
  194. fhr, err := c.fhrLister.HelmReleases(namespace).Get(name)
  195. if err != nil {
  196. if k8serrors.IsNotFound(err) {
  197. c.logger.Log("info", fmt.Sprintf("HelmRelease '%s' referred to in work queue no longer exists", key))
  198. runtime.HandleError(fmt.Errorf("HelmRelease '%s' referred to in work queue no longer exists", key))
  199. return nil
  200. }
  201. c.logger.Log("error", err.Error())
  202. return err
  203. }
  204. c.sync.ReconcileReleaseDef(*fhr)
  205. c.recorder.Event(fhr, corev1.EventTypeNormal, ChartSynced, MessageChartSynced)
  206. return nil
  207. }
  208. func checkCustomResourceType(logger log.Logger, obj interface{}) (flux_v1beta1.HelmRelease, bool) {
  209. var fhr *flux_v1beta1.HelmRelease
  210. var ok bool
  211. if fhr, ok = obj.(*flux_v1beta1.HelmRelease); !ok {
  212. logger.Log("error", fmt.Sprintf("HelmRelease Event Watch received an invalid object: %#v", obj))
  213. return flux_v1beta1.HelmRelease{}, false
  214. }
  215. return *fhr, true
  216. }
  217. func getCacheKey(obj interface{}) (string, error) {
  218. var key string
  219. var err error
  220. if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
  221. runtime.HandleError(err)
  222. return "", err
  223. }
  224. return key, nil
  225. }
  226. // enqueueJob takes a HelmRelease resource and converts it into a namespace/name
  227. // string which is then put onto the work queue. This method should not be
  228. // passed resources of any type other than HelmRelease.
  229. func (c *Controller) enqueueJob(obj interface{}) {
  230. var key string
  231. var err error
  232. if key, err = getCacheKey(obj); err != nil {
  233. return
  234. }
  235. c.releaseWorkqueue.AddRateLimited(key)
  236. }
  237. // enqueueUpdateJob decides if there is a genuine resource update
  238. func (c *Controller) enqueueUpdateJob(old, new interface{}) {
  239. oldFhr, ok := checkCustomResourceType(c.logger, old)
  240. if !ok {
  241. return
  242. }
  243. newFhr, ok := checkCustomResourceType(c.logger, new)
  244. if !ok {
  245. return
  246. }
  247. log := []string{"info", "enqueuing release upgrade"}
  248. if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" && c.logDiffs {
  249. log = append(log, "diff", diff)
  250. }
  251. log = append(log, "resource", newFhr.ResourceID().String())
  252. l := make([]interface{}, len(log))
  253. for i, v := range log {
  254. l[i] = v
  255. }
  256. c.logger.Log(l...)
  257. c.enqueueJob(new)
  258. }
  259. func (c *Controller) deleteRelease(fhr flux_v1beta1.HelmRelease) {
  260. c.logger.Log("info", "deleting release", "resource", fhr.ResourceID().String())
  261. c.sync.DeleteRelease(fhr)
  262. }