GitOps for k8s
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

sync.go 12KB


  1. package daemon
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "encoding/base64"
  6. "github.com/go-kit/kit/log"
  7. "github.com/pkg/errors"
  8. "path/filepath"
  9. "time"
  10. "github.com/fluxcd/flux/cluster"
  11. "github.com/fluxcd/flux/event"
  12. "github.com/fluxcd/flux/git"
  13. "github.com/fluxcd/flux/manifests"
  14. "github.com/fluxcd/flux/resource"
  15. fluxsync "github.com/fluxcd/flux/sync"
  16. "github.com/fluxcd/flux/update"
  17. )
  18. // revisionRatchet is for keeping track of transitions between
  19. // revisions. This is slightly more complicated than just setting the
  20. // state, since we want to notice unexpected transitions (e.g., when
  21. // the apparent current state is not what we'd recorded).
  22. type revisionRatchet interface {
  23. Current(ctx context.Context) (string, error)
  24. Update(ctx context.Context, oldRev, newRev string) (bool, error)
  25. }
  26. type eventLogger interface {
  27. LogEvent(e event.Event) error
  28. }
  29. type changeSet struct {
  30. commits []git.Commit
  31. oldTagRev string
  32. newTagRev string
  33. initialSync bool
  34. }
  35. // Sync starts the synchronization of the cluster with git.
  36. func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string, ratchet revisionRatchet) error {
  37. // Make a read-only clone used for this sync
  38. ctxt, cancel := context.WithTimeout(ctx, d.GitTimeout)
  39. working, err := d.Repo.Export(ctxt, newRevision)
  40. if err != nil {
  41. return err
  42. }
  43. cancel()
  44. defer working.Clean()
  45. // Retrieve change set of commits we need to sync
  46. c, err := getChangeSet(ctx, ratchet, newRevision, d.Repo, d.GitTimeout, d.GitConfig.Paths)
  47. if err != nil {
  48. return err
  49. }
  50. // Run actual sync of resources on cluster
  51. syncSetName := makeGitConfigHash(d.Repo.Origin(), d.GitConfig)
  52. resourceStore, err := d.getManifestStore(working)
  53. if err != nil {
  54. return errors.Wrap(err, "reading the repository checkout")
  55. }
  56. resources, resourceErrors, err := doSync(ctx, resourceStore, d.Cluster, syncSetName, d.Logger)
  57. if err != nil {
  58. return err
  59. }
  60. // Determine what resources changed during the sync
  61. changedResources, err := d.getChangedResources(ctx, c, d.GitTimeout, working, resourceStore, resources)
  62. serviceIDs := resource.IDSet{}
  63. for _, r := range changedResources {
  64. serviceIDs.Add([]resource.ID{r.ResourceID()})
  65. }
  66. // Retrieve git notes and collect events from them
  67. notes, err := d.getNotes(ctx, d.GitTimeout)
  68. if err != nil {
  69. return err
  70. }
  71. noteEvents, includesEvents, err := d.collectNoteEvents(ctx, c, notes, d.GitTimeout, started, d.Logger)
  72. if err != nil {
  73. return err
  74. }
  75. // Report all synced commits
  76. if err := logCommitEvent(d, c, serviceIDs, started, includesEvents, resourceErrors, d.Logger); err != nil {
  77. return err
  78. }
  79. // Report all collected events
  80. for _, event := range noteEvents {
  81. if err = d.LogEvent(event); err != nil {
  82. d.Logger.Log("err", err)
  83. // Abort early to ensure at least once delivery of events
  84. return err
  85. }
  86. }
  87. // Move the revision the sync state points to
  88. if ok, err := ratchet.Update(ctx, c.oldTagRev, c.newTagRev); err != nil {
  89. return err
  90. } else if !ok {
  91. return nil
  92. }
  93. err = refresh(ctx, d.GitTimeout, d.Repo)
  94. return err
  95. }
  96. // getChangeSet returns the change set of commits for this sync,
  97. // including the revision range and if it is an initial sync.
  98. func getChangeSet(ctx context.Context, state revisionRatchet, headRev string, repo *git.Repo, timeout time.Duration, paths []string) (changeSet, error) {
  99. var c changeSet
  100. var err error
  101. currentRev, err := state.Current(ctx)
  102. if err != nil {
  103. return c, err
  104. }
  105. c.oldTagRev = currentRev
  106. c.newTagRev = headRev
  107. ctx, cancel := context.WithTimeout(ctx, timeout)
  108. if c.oldTagRev != "" {
  109. c.commits, err = repo.CommitsBetween(ctx, c.oldTagRev, c.newTagRev, paths...)
  110. } else {
  111. c.initialSync = true
  112. c.commits, err = repo.CommitsBefore(ctx, c.newTagRev, paths...)
  113. }
  114. cancel()
  115. return c, err
  116. }
  117. // doSync runs the actual sync of workloads on the cluster. It returns
  118. // a map with all resources it applied and sync errors it encountered.
  119. func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cluster, syncSetName string,
  120. logger log.Logger) (map[string]resource.Resource, []event.ResourceError, error) {
  121. resources, err := manifestsStore.GetAllResourcesByID(ctx)
  122. if err != nil {
  123. return nil, nil, errors.Wrap(err, "loading resources from repo")
  124. }
  125. var resourceErrors []event.ResourceError
  126. if err := fluxsync.Sync(syncSetName, resources, clus); err != nil {
  127. switch syncerr := err.(type) {
  128. case cluster.SyncError:
  129. logger.Log("err", err)
  130. for _, e := range syncerr {
  131. resourceErrors = append(resourceErrors, event.ResourceError{
  132. ID: e.ResourceID,
  133. Path: e.Source,
  134. Error: e.Error.Error(),
  135. })
  136. }
  137. default:
  138. return nil, nil, err
  139. }
  140. }
  141. return resources, resourceErrors, nil
  142. }
  143. // getChangedResources calculates what resources are modified during
  144. // this sync.
  145. func (d *Daemon) getChangedResources(ctx context.Context, c changeSet, timeout time.Duration, working *git.Export,
  146. manifestsStore manifests.Store, resources map[string]resource.Resource) (map[string]resource.Resource, error) {
  147. if c.initialSync {
  148. return resources, nil
  149. }
  150. errorf := func(err error) error { return errors.Wrap(err, "loading resources from repo") }
  151. ctx, cancel := context.WithTimeout(ctx, timeout)
  152. changedFiles, err := working.ChangedFiles(ctx, c.oldTagRev, d.GitConfig.Paths)
  153. if err != nil {
  154. return nil, errorf(err)
  155. }
  156. cancel()
  157. // Get the resources by source
  158. resourcesByID, err := manifestsStore.GetAllResourcesByID(ctx)
  159. if err != nil {
  160. return nil, errorf(err)
  161. }
  162. resourcesBySource := make(map[string]resource.Resource, len(resourcesByID))
  163. for _, r := range resourcesByID {
  164. resourcesBySource[r.Source()] = r
  165. }
  166. changedResources := map[string]resource.Resource{}
  167. // FIXME(michael): this won't be accurate when a file can have more than one resource
  168. for _, absolutePath := range changedFiles {
  169. relPath, err := filepath.Rel(working.Dir(), absolutePath)
  170. if err != nil {
  171. return nil, errorf(err)
  172. }
  173. if r, ok := resourcesBySource[relPath]; ok {
  174. changedResources[r.ResourceID().String()] = r
  175. }
  176. }
  177. // All resources generated from .flux.yaml files need to be considered as changed
  178. // (even if the .flux.yaml file itself didn't) since external dependencies of the file
  179. // (e.g. scripts invoked), which we cannot track, may have changed
  180. for sourcePath, r := range resourcesBySource {
  181. _, sourceFilename := filepath.Split(sourcePath)
  182. if sourceFilename == manifests.ConfigFilename {
  183. changedResources[r.ResourceID().String()] = r
  184. }
  185. }
  186. return changedResources, nil
  187. }
  188. // getNotes retrieves the git notes from the working clone.
  189. func (d *Daemon) getNotes(ctx context.Context, timeout time.Duration) (map[string]struct{}, error) {
  190. ctx, cancel := context.WithTimeout(ctx, timeout)
  191. notes, err := d.Repo.NoteRevList(ctx, d.GitConfig.NotesRef)
  192. cancel()
  193. if err != nil {
  194. return nil, errors.Wrap(err, "loading notes from repo")
  195. }
  196. return notes, nil
  197. }
  198. // collectNoteEvents collects any events that come from notes attached
  199. // to the commits we just synced. While we're doing this, keep track
  200. // of what other things this sync includes e.g., releases and
  201. // autoreleases, that we're already posting as events, so upstream
  202. // can skip the sync event if it wants to.
  203. func (d *Daemon) collectNoteEvents(ctx context.Context, c changeSet, notes map[string]struct{}, timeout time.Duration,
  204. started time.Time, logger log.Logger) ([]event.Event, map[string]bool, error) {
  205. if len(c.commits) == 0 {
  206. return nil, nil, nil
  207. }
  208. var noteEvents []event.Event
  209. var eventTypes = make(map[string]bool)
  210. // Find notes in revisions.
  211. for i := len(c.commits) - 1; i >= 0; i-- {
  212. if _, ok := notes[c.commits[i].Revision]; !ok {
  213. eventTypes[event.NoneOfTheAbove] = true
  214. continue
  215. }
  216. var n note
  217. ctx, cancel := context.WithTimeout(ctx, timeout)
  218. ok, err := d.Repo.GetNote(ctx, c.commits[i].Revision, d.GitConfig.NotesRef, &n)
  219. cancel()
  220. if err != nil {
  221. return nil, nil, errors.Wrap(err, "loading notes from repo")
  222. }
  223. if !ok {
  224. eventTypes[event.NoneOfTheAbove] = true
  225. continue
  226. }
  227. // If this is the first sync, we should expect no notes,
  228. // since this is supposedly the first time we're seeing
  229. // the repo. But there are circumstances in which we can
  230. // nonetheless see notes -- if the tag was deleted from
  231. // the upstream repo, or if this accidentally has the same
  232. // notes ref as another daemon using the same repo (but a
  233. // different tag). Either way, we don't want to report any
  234. // notes on an initial sync, since they (most likely)
  235. // don't belong to us.
  236. if c.initialSync {
  237. logger.Log("warning", "no notes expected on initial sync; this repo may be in use by another fluxd")
  238. return noteEvents, eventTypes, nil
  239. }
  240. // Interpret some notes as events to send to the upstream
  241. switch n.Spec.Type {
  242. case update.Containers:
  243. spec := n.Spec.Spec.(update.ReleaseContainersSpec)
  244. noteEvents = append(noteEvents, event.Event{
  245. ServiceIDs: n.Result.AffectedResources(),
  246. Type: event.EventRelease,
  247. StartedAt: started,
  248. EndedAt: time.Now().UTC(),
  249. LogLevel: event.LogLevelInfo,
  250. Metadata: &event.ReleaseEventMetadata{
  251. ReleaseEventCommon: event.ReleaseEventCommon{
  252. Revision: c.commits[i].Revision,
  253. Result: n.Result,
  254. Error: n.Result.Error(),
  255. },
  256. Spec: event.ReleaseSpec{
  257. Type: event.ReleaseContainersSpecType,
  258. ReleaseContainersSpec: &spec,
  259. },
  260. Cause: n.Spec.Cause,
  261. },
  262. })
  263. eventTypes[event.EventRelease] = true
  264. case update.Images:
  265. spec := n.Spec.Spec.(update.ReleaseImageSpec)
  266. noteEvents = append(noteEvents, event.Event{
  267. ServiceIDs: n.Result.AffectedResources(),
  268. Type: event.EventRelease,
  269. StartedAt: started,
  270. EndedAt: time.Now().UTC(),
  271. LogLevel: event.LogLevelInfo,
  272. Metadata: &event.ReleaseEventMetadata{
  273. ReleaseEventCommon: event.ReleaseEventCommon{
  274. Revision: c.commits[i].Revision,
  275. Result: n.Result,
  276. Error: n.Result.Error(),
  277. },
  278. Spec: event.ReleaseSpec{
  279. Type: event.ReleaseImageSpecType,
  280. ReleaseImageSpec: &spec,
  281. },
  282. Cause: n.Spec.Cause,
  283. },
  284. })
  285. eventTypes[event.EventRelease] = true
  286. case update.Auto:
  287. spec := n.Spec.Spec.(update.Automated)
  288. noteEvents = append(noteEvents, event.Event{
  289. ServiceIDs: n.Result.AffectedResources(),
  290. Type: event.EventAutoRelease,
  291. StartedAt: started,
  292. EndedAt: time.Now().UTC(),
  293. LogLevel: event.LogLevelInfo,
  294. Metadata: &event.AutoReleaseEventMetadata{
  295. ReleaseEventCommon: event.ReleaseEventCommon{
  296. Revision: c.commits[i].Revision,
  297. Result: n.Result,
  298. Error: n.Result.Error(),
  299. },
  300. Spec: spec,
  301. },
  302. })
  303. eventTypes[event.EventAutoRelease] = true
  304. case update.Policy:
  305. // Use this to mean any change to policy
  306. eventTypes[event.EventUpdatePolicy] = true
  307. default:
  308. // Presume it's not something we're otherwise sending
  309. // as an event
  310. eventTypes[event.NoneOfTheAbove] = true
  311. }
  312. }
  313. return noteEvents, eventTypes, nil
  314. }
  315. // logCommitEvent reports all synced commits to the upstream.
  316. func logCommitEvent(el eventLogger, c changeSet, serviceIDs resource.IDSet, started time.Time,
  317. includesEvents map[string]bool, resourceErrors []event.ResourceError, logger log.Logger) error {
  318. if len(c.commits) == 0 {
  319. return nil
  320. }
  321. cs := make([]event.Commit, len(c.commits))
  322. for i, ci := range c.commits {
  323. cs[i].Revision = ci.Revision
  324. cs[i].Message = ci.Message
  325. }
  326. if err := el.LogEvent(event.Event{
  327. ServiceIDs: serviceIDs.ToSlice(),
  328. Type: event.EventSync,
  329. StartedAt: started,
  330. EndedAt: started,
  331. LogLevel: event.LogLevelInfo,
  332. Metadata: &event.SyncEventMetadata{
  333. Commits: cs,
  334. InitialSync: c.initialSync,
  335. Includes: includesEvents,
  336. Errors: resourceErrors,
  337. },
  338. }); err != nil {
  339. logger.Log("err", err)
  340. return err
  341. }
  342. return nil
  343. }
  344. // refresh refreshes the repository, notifying the daemon we have a new
  345. // sync head.
  346. func refresh(ctx context.Context, timeout time.Duration, repo *git.Repo) error {
  347. ctx, cancel := context.WithTimeout(ctx, timeout)
  348. err := repo.Refresh(ctx)
  349. cancel()
  350. return err
  351. }
  352. func makeGitConfigHash(remote git.Remote, conf git.Config) string {
  353. urlbit := remote.SafeURL()
  354. pathshash := sha256.New()
  355. pathshash.Write([]byte(urlbit))
  356. pathshash.Write([]byte(conf.Branch))
  357. for _, path := range conf.Paths {
  358. pathshash.Write([]byte(path))
  359. }
  360. return base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil))
  361. }