GitOps for k8s
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

daemon.go 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. package daemon
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "sort"
  7. "time"
  8. "github.com/go-kit/kit/log"
  9. "github.com/pkg/errors"
  10. "github.com/weaveworks/flux"
  11. "github.com/weaveworks/flux/api"
  12. "github.com/weaveworks/flux/api/v6"
  13. "github.com/weaveworks/flux/api/v9"
  14. "github.com/weaveworks/flux/cluster"
  15. fluxerr "github.com/weaveworks/flux/errors"
  16. "github.com/weaveworks/flux/event"
  17. "github.com/weaveworks/flux/git"
  18. "github.com/weaveworks/flux/guid"
  19. "github.com/weaveworks/flux/image"
  20. "github.com/weaveworks/flux/job"
  21. "github.com/weaveworks/flux/policy"
  22. "github.com/weaveworks/flux/registry"
  23. "github.com/weaveworks/flux/release"
  24. "github.com/weaveworks/flux/update"
  25. )
  26. const (
  27. // This is set to be in sympathy with the request / RPC timeout (i.e., empirically)
  28. defaultHandlerTimeout = 10 * time.Second
  29. // A job can take an arbitrary amount of time but we want to have
  30. // a (generous) threshold for considering a job stuck and
  31. // abandoning it
  32. defaultJobTimeout = 60 * time.Second
  33. )
  34. // Daemon is the fully-functional state of a daemon (compare to
  35. // `NotReadyDaemon`).
  36. type Daemon struct {
  37. V string
  38. Cluster cluster.Cluster
  39. Manifests cluster.Manifests
  40. Registry registry.Registry
  41. ImageRefresh chan image.Name
  42. Repo *git.Repo
  43. GitConfig git.Config
  44. Jobs *job.Queue
  45. JobStatusCache *job.StatusCache
  46. EventWriter event.EventWriter
  47. Logger log.Logger
  48. // bookkeeping
  49. *LoopVars
  50. }
  51. // Invariant.
  52. var _ api.Server = &Daemon{}
  53. func (d *Daemon) Version(ctx context.Context) (string, error) {
  54. return d.V, nil
  55. }
  56. func (d *Daemon) Ping(ctx context.Context) error {
  57. return d.Cluster.Ping()
  58. }
  59. func (d *Daemon) Export(ctx context.Context) ([]byte, error) {
  60. return d.Cluster.Export()
  61. }
  62. func (d *Daemon) ListServices(ctx context.Context, namespace string) ([]v6.ControllerStatus, error) {
  63. clusterServices, err := d.Cluster.AllControllers(namespace)
  64. if err != nil {
  65. return nil, errors.Wrap(err, "getting services from cluster")
  66. }
  67. var services policy.ResourceMap
  68. err = d.WithClone(ctx, func(checkout *git.Checkout) error {
  69. var err error
  70. services, err = d.Manifests.ServicesWithPolicies(checkout.ManifestDir())
  71. return err
  72. })
  73. if err != nil {
  74. return nil, errors.Wrap(err, "getting service policies")
  75. }
  76. var res []v6.ControllerStatus
  77. for _, service := range clusterServices {
  78. policies := services[service.ID]
  79. res = append(res, v6.ControllerStatus{
  80. ID: service.ID,
  81. Containers: containers2containers(service.ContainersOrNil()),
  82. Status: service.Status,
  83. Automated: policies.Contains(policy.Automated),
  84. Locked: policies.Contains(policy.Locked),
  85. Ignore: policies.Contains(policy.Ignore),
  86. Policies: policies.ToStringMap(),
  87. })
  88. }
  89. return res, nil
  90. }
  91. // List the images available for set of services
  92. func (d *Daemon) ListImages(ctx context.Context, spec update.ResourceSpec) ([]v6.ImageStatus, error) {
  93. var services []cluster.Controller
  94. var err error
  95. if spec == update.ResourceSpecAll {
  96. services, err = d.Cluster.AllControllers("")
  97. } else {
  98. id, err := spec.AsID()
  99. if err != nil {
  100. return nil, errors.Wrap(err, "treating service spec as ID")
  101. }
  102. services, err = d.Cluster.SomeControllers([]flux.ResourceID{id})
  103. }
  104. images, err := update.CollectAvailableImages(d.Registry, services, d.Logger)
  105. if err != nil {
  106. return nil, errors.Wrap(err, "getting images for services")
  107. }
  108. var res []v6.ImageStatus
  109. for _, service := range services {
  110. containers := containersWithAvailable(service, images)
  111. res = append(res, v6.ImageStatus{
  112. ID: service.ID,
  113. Containers: containers,
  114. })
  115. }
  116. return res, nil
  117. }
  118. // Let's use the CommitEventMetadata as a convenient transport for the
  119. // results of a job; if no commit was made (e.g., if it was a dry
  120. // run), leave the revision field empty.
  121. type DaemonJobFunc func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*event.CommitEventMetadata, error)
  122. // executeJob runs a job func in a cloned working directory, keeping track of its status.
  123. func (d *Daemon) executeJob(id job.ID, do DaemonJobFunc, logger log.Logger) (*event.CommitEventMetadata, error) {
  124. ctx, cancel := context.WithTimeout(context.Background(), defaultJobTimeout)
  125. defer cancel()
  126. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusRunning})
  127. // make a working clone so we don't mess with files we
  128. // will be reading from elsewhere
  129. var metadata *event.CommitEventMetadata
  130. err := d.WithClone(ctx, func(working *git.Checkout) error {
  131. var err error
  132. metadata, err = do(ctx, id, working, logger)
  133. if err != nil {
  134. return err
  135. }
  136. return nil
  137. })
  138. if err != nil {
  139. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusFailed, Err: err.Error()})
  140. return nil, err
  141. }
  142. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusSucceeded, Result: *metadata})
  143. return metadata, nil
  144. }
  145. // queueJob queues a job func to be executed.
  146. func (d *Daemon) queueJob(do DaemonJobFunc) job.ID {
  147. id := job.ID(guid.New())
  148. enqueuedAt := time.Now()
  149. d.Jobs.Enqueue(&job.Job{
  150. ID: id,
  151. Do: func(logger log.Logger) error {
  152. queueDuration.Observe(time.Since(enqueuedAt).Seconds())
  153. started := time.Now().UTC()
  154. metadata, err := d.executeJob(id, do, logger)
  155. if err != nil {
  156. return err
  157. }
  158. logger.Log("revision", metadata.Revision)
  159. if metadata.Revision != "" {
  160. var serviceIDs []flux.ResourceID
  161. for id, result := range metadata.Result {
  162. if result.Status == update.ReleaseStatusSuccess {
  163. serviceIDs = append(serviceIDs, id)
  164. }
  165. }
  166. return d.LogEvent(event.Event{
  167. ServiceIDs: serviceIDs,
  168. Type: event.EventCommit,
  169. StartedAt: started,
  170. EndedAt: started,
  171. LogLevel: event.LogLevelInfo,
  172. Metadata: metadata,
  173. })
  174. }
  175. return nil
  176. },
  177. })
  178. queueLength.Set(float64(d.Jobs.Len()))
  179. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusQueued})
  180. return id
  181. }
  182. // Apply the desired changes to the config files
  183. func (d *Daemon) UpdateManifests(ctx context.Context, spec update.Spec) (job.ID, error) {
  184. var id job.ID
  185. if spec.Type == "" {
  186. return id, errors.New("no type in update spec")
  187. }
  188. switch s := spec.Spec.(type) {
  189. case release.Changes:
  190. if s.ReleaseKind() == update.ReleaseKindPlan {
  191. id := job.ID(guid.New())
  192. _, err := d.executeJob(id, d.release(spec, s), d.Logger)
  193. return id, err
  194. }
  195. return d.queueJob(d.release(spec, s)), nil
  196. case policy.Updates:
  197. return d.queueJob(d.updatePolicy(spec, s)), nil
  198. default:
  199. return id, fmt.Errorf(`unknown update type "%s"`, spec.Type)
  200. }
  201. }
  202. func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJobFunc {
  203. return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*event.CommitEventMetadata, error) {
  204. // For each update
  205. var serviceIDs []flux.ResourceID
  206. metadata := &event.CommitEventMetadata{
  207. Spec: &spec,
  208. Result: update.Result{},
  209. }
  210. // A shortcut to make things more responsive: if anything
  211. // was (probably) set to automated, we will ask for an
  212. // automation run straight ASAP.
  213. var anythingAutomated bool
  214. for serviceID, u := range updates {
  215. if policy.Set(u.Add).Contains(policy.Automated) {
  216. anythingAutomated = true
  217. }
  218. // find the service manifest
  219. err := cluster.UpdateManifest(d.Manifests, working.ManifestDir(), serviceID, func(def []byte) ([]byte, error) {
  220. newDef, err := d.Manifests.UpdatePolicies(def, u)
  221. if err != nil {
  222. metadata.Result[serviceID] = update.ControllerResult{
  223. Status: update.ReleaseStatusFailed,
  224. Error: err.Error(),
  225. }
  226. return nil, err
  227. }
  228. if string(newDef) == string(def) {
  229. metadata.Result[serviceID] = update.ControllerResult{
  230. Status: update.ReleaseStatusSkipped,
  231. }
  232. } else {
  233. serviceIDs = append(serviceIDs, serviceID)
  234. metadata.Result[serviceID] = update.ControllerResult{
  235. Status: update.ReleaseStatusSuccess,
  236. }
  237. }
  238. return newDef, nil
  239. })
  240. switch err {
  241. case cluster.ErrNoResourceFilesFoundForService, cluster.ErrMultipleResourceFilesFoundForService:
  242. metadata.Result[serviceID] = update.ControllerResult{
  243. Status: update.ReleaseStatusFailed,
  244. Error: err.Error(),
  245. }
  246. case nil:
  247. // continue
  248. default:
  249. return nil, err
  250. }
  251. }
  252. if len(serviceIDs) == 0 {
  253. return metadata, nil
  254. }
  255. commitAuthor := ""
  256. if d.GitConfig.SetAuthor {
  257. commitAuthor = spec.Cause.User
  258. }
  259. commitAction := git.CommitAction{Author: commitAuthor, Message: policyCommitMessage(updates, spec.Cause)}
  260. if err := working.CommitAndPush(ctx, commitAction, &git.Note{JobID: jobID, Spec: spec}); err != nil {
  261. // On the chance pushing failed because it was not
  262. // possible to fast-forward, ask for a sync so the
  263. // next attempt is more likely to succeed.
  264. d.AskForSync()
  265. return nil, err
  266. }
  267. if anythingAutomated {
  268. d.AskForImagePoll()
  269. }
  270. var err error
  271. metadata.Revision, err = working.HeadRevision(ctx)
  272. if err != nil {
  273. return nil, err
  274. }
  275. return metadata, nil
  276. }
  277. }
  278. func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
  279. return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*event.CommitEventMetadata, error) {
  280. rc := release.NewReleaseContext(d.Cluster, d.Manifests, d.Registry, working)
  281. result, err := release.Release(rc, c, logger)
  282. if err != nil {
  283. return nil, err
  284. }
  285. var revision string
  286. if c.ReleaseKind() == update.ReleaseKindExecute {
  287. commitMsg := spec.Cause.Message
  288. if commitMsg == "" {
  289. commitMsg = c.CommitMessage()
  290. }
  291. commitAuthor := ""
  292. if d.GitConfig.SetAuthor {
  293. commitAuthor = spec.Cause.User
  294. }
  295. commitAction := git.CommitAction{Author: commitAuthor, Message: commitMsg}
  296. if err := working.CommitAndPush(ctx, commitAction, &git.Note{JobID: jobID, Spec: spec, Result: result}); err != nil {
  297. // On the chance pushing failed because it was not
  298. // possible to fast-forward, ask the repo to fetch
  299. // from upstream ASAP, so the next attempt is more
  300. // likely to succeed.
  301. d.Repo.Notify()
  302. return nil, err
  303. }
  304. revision, err = working.HeadRevision(ctx)
  305. if err != nil {
  306. return nil, err
  307. }
  308. }
  309. return &event.CommitEventMetadata{
  310. Revision: revision,
  311. Spec: &spec,
  312. Result: result,
  313. }, nil
  314. }
  315. }
  316. // Tell the daemon to synchronise the cluster with the manifests in
  317. // the git repo. This has an error return value because upstream there
  318. // may be comms difficulties or other sources of problems; here, we
  319. // always succeed because it's just bookkeeping.
  320. func (d *Daemon) NotifyChange(ctx context.Context, change v9.Change) error {
  321. switch change.Kind {
  322. case v9.GitChange:
  323. gitUpdate := change.Source.(v9.GitUpdate)
  324. if gitUpdate.URL != d.Repo.Origin().URL && gitUpdate.Branch != d.GitConfig.Branch {
  325. // It isn't strictly an _error_ to be notified about a repo/branch pair
  326. // that isn't ours, but it's worth logging anyway for debugging.
  327. d.Logger.Log("msg", "notified about unrelated change",
  328. "url", gitUpdate.URL,
  329. "branch", gitUpdate.Branch)
  330. break
  331. }
  332. d.Repo.Notify()
  333. case v9.ImageChange:
  334. imageUpdate := change.Source.(v9.ImageUpdate)
  335. d.ImageRefresh <- imageUpdate.Name
  336. }
  337. return nil
  338. }
  339. // JobStatus - Ask the daemon how far it's got committing things; in particular, is the job
  340. // queued? running? committed? If it is done, the commit ref is returned.
  341. func (d *Daemon) JobStatus(ctx context.Context, jobID job.ID) (job.Status, error) {
  342. // Is the job queued, running, or recently finished?
  343. status, ok := d.JobStatusCache.Status(jobID)
  344. if ok {
  345. return status, nil
  346. }
  347. // Look through the commits for a note referencing this job. This
  348. // means that even if fluxd restarts, we will at least remember
  349. // jobs which have pushed a commit.
  350. // FIXME(michael): consider looking at the repo for this, since read op
  351. err := d.WithClone(ctx, func(working *git.Checkout) error {
  352. notes, err := working.NoteRevList(ctx)
  353. if err != nil {
  354. return errors.Wrap(err, "enumerating commit notes")
  355. }
  356. commits, err := d.Repo.CommitsBefore(ctx, "HEAD", d.GitConfig.Path)
  357. if err != nil {
  358. return errors.Wrap(err, "checking revisions for status")
  359. }
  360. for _, commit := range commits {
  361. if _, ok := notes[commit.Revision]; ok {
  362. note, _ := working.GetNote(ctx, commit.Revision)
  363. if note != nil && note.JobID == jobID {
  364. status = job.Status{
  365. StatusString: job.StatusSucceeded,
  366. Result: event.CommitEventMetadata{
  367. Revision: commit.Revision,
  368. Spec: &note.Spec,
  369. Result: note.Result,
  370. },
  371. }
  372. return nil
  373. }
  374. }
  375. }
  376. return unknownJobError(jobID)
  377. })
  378. return status, err
  379. }
  380. // Ask the daemon how far it's got applying things; in particular, is it
  381. // past the given commit? Return the list of commits between where
  382. // we have applied (the sync tag) and the ref given, inclusive. E.g., if you send HEAD,
  383. // you'll get all the commits yet to be applied. If you send a hash
  384. // and it's applied at or _past_ it, you'll get an empty list.
  385. func (d *Daemon) SyncStatus(ctx context.Context, commitRef string) ([]string, error) {
  386. commits, err := d.Repo.CommitsBetween(ctx, d.GitConfig.SyncTag, commitRef, d.GitConfig.Path)
  387. if err != nil {
  388. return nil, err
  389. }
  390. // NB we could use the messages too if we decide to change the
  391. // signature of the API to include it.
  392. revs := make([]string, len(commits))
  393. for i, commit := range commits {
  394. revs[i] = commit.Revision
  395. }
  396. return revs, nil
  397. }
  398. func (d *Daemon) GitRepoConfig(ctx context.Context, regenerate bool) (v6.GitConfig, error) {
  399. publicSSHKey, err := d.Cluster.PublicSSHKey(regenerate)
  400. if err != nil {
  401. return v6.GitConfig{}, err
  402. }
  403. origin := d.Repo.Origin()
  404. status, _ := d.Repo.Status()
  405. return v6.GitConfig{
  406. Remote: v6.GitRemoteConfig{
  407. URL: origin.URL,
  408. Branch: d.GitConfig.Branch,
  409. Path: d.GitConfig.Path,
  410. },
  411. PublicSSHKey: publicSSHKey,
  412. Status: status,
  413. }, nil
  414. }
  415. // Non-api.Server methods
  416. func (d *Daemon) WithClone(ctx context.Context, fn func(*git.Checkout) error) error {
  417. co, err := d.Repo.Clone(ctx, d.GitConfig)
  418. if err != nil {
  419. return err
  420. }
  421. defer co.Clean()
  422. return fn(co)
  423. }
  424. func unknownJobError(id job.ID) error {
  425. return &fluxerr.Error{
  426. Type: fluxerr.Missing,
  427. Err: fmt.Errorf("unknown job %q", string(id)),
  428. Help: `Job not found
  429. This is often because the job did not result in committing changes,
  430. and therefore had no lasting effect. A release dry-run is an example
  431. of a job that does not result in a commit.
  432. If you were expecting changes to be committed, this may mean that the
  433. job failed, but its status was lost.
  434. In both of the above cases it is OK to retry the operation that
  435. resulted in this error.
  436. If you get this error repeatedly, it's probably a bug. Please log an
  437. issue describing what you were attempting, and posting logs from the
  438. daemon if possible:
  439. https://github.com/weaveworks/flux/issues
  440. `,
  441. }
  442. }
  443. func (d *Daemon) LogEvent(ev event.Event) error {
  444. if d.EventWriter == nil {
  445. d.Logger.Log("event", ev, "logupstream", "false")
  446. return nil
  447. }
  448. d.Logger.Log("event", ev, "logupstream", "true")
  449. return d.EventWriter.LogEvent(ev)
  450. }
  451. // vvv helpers vvv
  452. func containers2containers(cs []cluster.Container) []v6.Container {
  453. res := make([]v6.Container, len(cs))
  454. for i, c := range cs {
  455. id, _ := image.ParseRef(c.Image)
  456. res[i] = v6.Container{
  457. Name: c.Name,
  458. Current: image.Info{
  459. ID: id,
  460. },
  461. }
  462. }
  463. return res
  464. }
  465. func containersWithAvailable(service cluster.Controller, images update.ImageMap) (res []v6.Container) {
  466. for _, c := range service.ContainersOrNil() {
  467. im, _ := image.ParseRef(c.Image)
  468. available := images.Available(im.Name)
  469. availableErr := ""
  470. if available == nil {
  471. availableErr = registry.ErrNoImageData.Error()
  472. }
  473. res = append(res, v6.Container{
  474. Name: c.Name,
  475. Current: image.Info{
  476. ID: im,
  477. },
  478. Available: available,
  479. AvailableError: availableErr,
  480. })
  481. }
  482. return res
  483. }
  484. func policyCommitMessage(us policy.Updates, cause update.Cause) string {
  485. // shortcut, since we want roughly the same information
  486. events := policyEvents(us, time.Now())
  487. commitMsg := &bytes.Buffer{}
  488. prefix := "- "
  489. switch {
  490. case cause.Message != "":
  491. fmt.Fprintf(commitMsg, "%s\n\n", cause.Message)
  492. case len(events) > 1:
  493. fmt.Fprintf(commitMsg, "Updated service policies\n\n")
  494. default:
  495. prefix = ""
  496. }
  497. for _, event := range events {
  498. fmt.Fprintf(commitMsg, "%s%v\n", prefix, event)
  499. }
  500. return commitMsg.String()
  501. }
  502. // policyEvents builds a map of events (by type), for all the events in this set of
  503. // updates. There will be one event per type, containing all service ids
  504. // affected by that event. e.g. all automated services will share an event.
  505. func policyEvents(us policy.Updates, now time.Time) map[string]event.Event {
  506. eventsByType := map[string]event.Event{}
  507. for serviceID, update := range us {
  508. for _, eventType := range policyEventTypes(update) {
  509. e, ok := eventsByType[eventType]
  510. if !ok {
  511. e = event.Event{
  512. ServiceIDs: []flux.ResourceID{},
  513. Type: eventType,
  514. StartedAt: now,
  515. EndedAt: now,
  516. LogLevel: event.LogLevelInfo,
  517. }
  518. }
  519. e.ServiceIDs = append(e.ServiceIDs, serviceID)
  520. eventsByType[eventType] = e
  521. }
  522. }
  523. return eventsByType
  524. }
  525. // policyEventTypes is a deduped list of all event types this update contains
  526. func policyEventTypes(u policy.Update) []string {
  527. types := map[string]struct{}{}
  528. for p, _ := range u.Add {
  529. switch {
  530. case p == policy.Automated:
  531. types[event.EventAutomate] = struct{}{}
  532. case p == policy.Locked:
  533. types[event.EventLock] = struct{}{}
  534. default:
  535. types[event.EventUpdatePolicy] = struct{}{}
  536. }
  537. }
  538. for p, _ := range u.Remove {
  539. switch {
  540. case p == policy.Automated:
  541. types[event.EventDeautomate] = struct{}{}
  542. case p == policy.Locked:
  543. types[event.EventUnlock] = struct{}{}
  544. default:
  545. types[event.EventUpdatePolicy] = struct{}{}
  546. }
  547. }
  548. var result []string
  549. for t := range types {
  550. result = append(result, t)
  551. }
  552. sort.Strings(result)
  553. return result
  554. }