GitOps for k8s
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

daemon.go 18KB


  1. package daemon
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "sort"
  7. "time"
  8. "github.com/go-kit/kit/log"
  9. "github.com/pkg/errors"
  10. "github.com/weaveworks/flux"
  11. "github.com/weaveworks/flux/api"
  12. "github.com/weaveworks/flux/api/v6"
  13. "github.com/weaveworks/flux/api/v9"
  14. "github.com/weaveworks/flux/cluster"
  15. fluxerr "github.com/weaveworks/flux/errors"
  16. "github.com/weaveworks/flux/event"
  17. "github.com/weaveworks/flux/git"
  18. "github.com/weaveworks/flux/guid"
  19. "github.com/weaveworks/flux/image"
  20. "github.com/weaveworks/flux/job"
  21. "github.com/weaveworks/flux/policy"
  22. "github.com/weaveworks/flux/registry"
  23. "github.com/weaveworks/flux/release"
  24. "github.com/weaveworks/flux/resource"
  25. "github.com/weaveworks/flux/update"
  26. )
  27. const (
  28. // This is set to be in sympathy with the request / RPC timeout (i.e., empirically)
  29. defaultHandlerTimeout = 10 * time.Second
  30. // A job can take an arbitrary amount of time but we want to have
  31. // a (generous) threshold for considering a job stuck and
  32. // abandoning it
  33. defaultJobTimeout = 60 * time.Second
  34. )
  35. // Daemon is the fully-functional state of a daemon (compare to
  36. // `NotReadyDaemon`).
  37. type Daemon struct {
  38. V string
  39. Cluster cluster.Cluster
  40. Manifests cluster.Manifests
  41. Registry registry.Registry
  42. ImageRefresh chan image.Name
  43. Repo *git.Repo
  44. GitConfig git.Config
  45. Jobs *job.Queue
  46. JobStatusCache *job.StatusCache
  47. EventWriter event.EventWriter
  48. Logger log.Logger
  49. // bookkeeping
  50. *LoopVars
  51. }
  52. // Invariant.
  53. var _ api.Server = &Daemon{}
  54. func (d *Daemon) Version(ctx context.Context) (string, error) {
  55. return d.V, nil
  56. }
  57. func (d *Daemon) Ping(ctx context.Context) error {
  58. return d.Cluster.Ping()
  59. }
  60. func (d *Daemon) Export(ctx context.Context) ([]byte, error) {
  61. return d.Cluster.Export()
  62. }
  63. func (d *Daemon) ListServices(ctx context.Context, namespace string) ([]v6.ControllerStatus, error) {
  64. clusterServices, err := d.Cluster.AllControllers(namespace)
  65. if err != nil {
  66. return nil, errors.Wrap(err, "getting services from cluster")
  67. }
  68. var services policy.ResourceMap
  69. var globalReadOnly v6.ReadOnlyReason
  70. err = d.WithClone(ctx, func(checkout *git.Checkout) error {
  71. var err error
  72. services, err = d.Manifests.ServicesWithPolicies(checkout.ManifestDir())
  73. return err
  74. })
  75. switch {
  76. case err == git.ErrNotReady:
  77. globalReadOnly = v6.ReadOnlyNotReady
  78. case err == git.ErrNoConfig:
  79. globalReadOnly = v6.ReadOnlyNoRepo
  80. case err != nil:
  81. return nil, errors.Wrap(err, "getting service policies")
  82. }
  83. var res []v6.ControllerStatus
  84. for _, service := range clusterServices {
  85. var readOnly v6.ReadOnlyReason
  86. policies, ok := services[service.ID]
  87. switch {
  88. case globalReadOnly != "":
  89. readOnly = globalReadOnly
  90. case !ok:
  91. readOnly = v6.ReadOnlyMissing
  92. case service.IsSystem:
  93. readOnly = v6.ReadOnlySystem
  94. }
  95. res = append(res, v6.ControllerStatus{
  96. ID: service.ID,
  97. Containers: containers2containers(service.ContainersOrNil()),
  98. ReadOnly: readOnly,
  99. Status: service.Status,
  100. Automated: policies.Contains(policy.Automated),
  101. Locked: policies.Contains(policy.Locked),
  102. Ignore: policies.Contains(policy.Ignore),
  103. Policies: policies.ToStringMap(),
  104. })
  105. }
  106. return res, nil
  107. }
  108. // List the images available for set of services
  109. func (d *Daemon) ListImages(ctx context.Context, spec update.ResourceSpec) ([]v6.ImageStatus, error) {
  110. var services []cluster.Controller
  111. var err error
  112. if spec == update.ResourceSpecAll {
  113. services, err = d.Cluster.AllControllers("")
  114. } else {
  115. id, err := spec.AsID()
  116. if err != nil {
  117. return nil, errors.Wrap(err, "treating service spec as ID")
  118. }
  119. services, err = d.Cluster.SomeControllers([]flux.ResourceID{id})
  120. }
  121. images, err := update.CollectAvailableImages(d.Registry, services, d.Logger)
  122. if err != nil {
  123. return nil, errors.Wrap(err, "getting images for services")
  124. }
  125. var res []v6.ImageStatus
  126. for _, service := range services {
  127. containers := containersWithAvailable(service, images)
  128. res = append(res, v6.ImageStatus{
  129. ID: service.ID,
  130. Containers: containers,
  131. })
  132. }
  133. return res, nil
  134. }
  135. type daemonJobFunc func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (job.Result, error)
  136. // executeJob runs a job func in a cloned working directory, keeping track of its status.
  137. func (d *Daemon) executeJob(id job.ID, do daemonJobFunc, logger log.Logger) (job.Result, error) {
  138. ctx, cancel := context.WithTimeout(context.Background(), defaultJobTimeout)
  139. defer cancel()
  140. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusRunning})
  141. // make a working clone so we don't mess with files we
  142. // will be reading from elsewhere
  143. var result job.Result
  144. err := d.WithClone(ctx, func(working *git.Checkout) error {
  145. var err error
  146. result, err = do(ctx, id, working, logger)
  147. if err != nil {
  148. return err
  149. }
  150. return nil
  151. })
  152. if err != nil {
  153. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusFailed, Err: err.Error()})
  154. return result, err
  155. }
  156. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusSucceeded, Result: result})
  157. return result, nil
  158. }
  159. // queueJob queues a job func to be executed.
  160. func (d *Daemon) queueJob(do daemonJobFunc) job.ID {
  161. id := job.ID(guid.New())
  162. enqueuedAt := time.Now()
  163. d.Jobs.Enqueue(&job.Job{
  164. ID: id,
  165. Do: func(logger log.Logger) error {
  166. queueDuration.Observe(time.Since(enqueuedAt).Seconds())
  167. started := time.Now().UTC()
  168. result, err := d.executeJob(id, do, logger)
  169. if err != nil {
  170. return err
  171. }
  172. logger.Log("revision", result.Revision)
  173. if result.Revision != "" {
  174. var serviceIDs []flux.ResourceID
  175. for id, result := range result.Result {
  176. if result.Status == update.ReleaseStatusSuccess {
  177. serviceIDs = append(serviceIDs, id)
  178. }
  179. }
  180. metadata := &event.CommitEventMetadata{
  181. Revision: result.Revision,
  182. Spec: result.Spec,
  183. Result: result.Result,
  184. }
  185. return d.LogEvent(event.Event{
  186. ServiceIDs: serviceIDs,
  187. Type: event.EventCommit,
  188. StartedAt: started,
  189. EndedAt: started,
  190. LogLevel: event.LogLevelInfo,
  191. Metadata: metadata,
  192. })
  193. }
  194. return nil
  195. },
  196. })
  197. queueLength.Set(float64(d.Jobs.Len()))
  198. d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusQueued})
  199. return id
  200. }
  201. // Apply the desired changes to the config files
  202. func (d *Daemon) UpdateManifests(ctx context.Context, spec update.Spec) (job.ID, error) {
  203. var id job.ID
  204. if spec.Type == "" {
  205. return id, errors.New("no type in update spec")
  206. }
  207. switch s := spec.Spec.(type) {
  208. case release.Changes:
  209. if s.ReleaseKind() == update.ReleaseKindPlan {
  210. id := job.ID(guid.New())
  211. _, err := d.executeJob(id, d.release(spec, s), d.Logger)
  212. return id, err
  213. }
  214. return d.queueJob(d.release(spec, s)), nil
  215. case policy.Updates:
  216. return d.queueJob(d.updatePolicy(spec, s)), nil
  217. default:
  218. return id, fmt.Errorf(`unknown update type "%s"`, spec.Type)
  219. }
  220. }
  221. func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) daemonJobFunc {
  222. return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (job.Result, error) {
  223. // For each update
  224. var serviceIDs []flux.ResourceID
  225. result := job.Result{
  226. Spec: &spec,
  227. Result: update.Result{},
  228. }
  229. // A shortcut to make things more responsive: if anything
  230. // was (probably) set to automated, we will ask for an
  231. // automation run straight ASAP.
  232. var anythingAutomated bool
  233. for serviceID, u := range updates {
  234. if policy.Set(u.Add).Contains(policy.Automated) {
  235. anythingAutomated = true
  236. }
  237. // find the service manifest
  238. err := cluster.UpdateManifest(d.Manifests, working.ManifestDir(), serviceID, func(def []byte) ([]byte, error) {
  239. newDef, err := d.Manifests.UpdatePolicies(def, u)
  240. if err != nil {
  241. result.Result[serviceID] = update.ControllerResult{
  242. Status: update.ReleaseStatusFailed,
  243. Error: err.Error(),
  244. }
  245. return nil, err
  246. }
  247. if string(newDef) == string(def) {
  248. result.Result[serviceID] = update.ControllerResult{
  249. Status: update.ReleaseStatusSkipped,
  250. }
  251. } else {
  252. serviceIDs = append(serviceIDs, serviceID)
  253. result.Result[serviceID] = update.ControllerResult{
  254. Status: update.ReleaseStatusSuccess,
  255. }
  256. }
  257. return newDef, nil
  258. })
  259. switch err {
  260. case cluster.ErrNoResourceFilesFoundForService, cluster.ErrMultipleResourceFilesFoundForService:
  261. result.Result[serviceID] = update.ControllerResult{
  262. Status: update.ReleaseStatusFailed,
  263. Error: err.Error(),
  264. }
  265. case nil:
  266. // continue
  267. default:
  268. return result, err
  269. }
  270. }
  271. if len(serviceIDs) == 0 {
  272. return result, nil
  273. }
  274. commitAuthor := ""
  275. if d.GitConfig.SetAuthor {
  276. commitAuthor = spec.Cause.User
  277. }
  278. commitAction := git.CommitAction{Author: commitAuthor, Message: policyCommitMessage(updates, spec.Cause)}
  279. if err := working.CommitAndPush(ctx, commitAction, &note{JobID: jobID, Spec: spec}); err != nil {
  280. // On the chance pushing failed because it was not
  281. // possible to fast-forward, ask for a sync so the
  282. // next attempt is more likely to succeed.
  283. d.AskForSync()
  284. return result, err
  285. }
  286. if anythingAutomated {
  287. d.AskForImagePoll()
  288. }
  289. var err error
  290. result.Revision, err = working.HeadRevision(ctx)
  291. if err != nil {
  292. return result, err
  293. }
  294. return result, nil
  295. }
  296. }
  297. func (d *Daemon) release(spec update.Spec, c release.Changes) daemonJobFunc {
  298. return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (job.Result, error) {
  299. rc := release.NewReleaseContext(d.Cluster, d.Manifests, d.Registry, working)
  300. result, err := release.Release(rc, c, logger)
  301. var zero job.Result
  302. if err != nil {
  303. return zero, err
  304. }
  305. var revision string
  306. if c.ReleaseKind() == update.ReleaseKindExecute {
  307. commitMsg := spec.Cause.Message
  308. if commitMsg == "" {
  309. commitMsg = c.CommitMessage()
  310. }
  311. commitAuthor := ""
  312. if d.GitConfig.SetAuthor {
  313. commitAuthor = spec.Cause.User
  314. }
  315. commitAction := git.CommitAction{Author: commitAuthor, Message: commitMsg}
  316. if err := working.CommitAndPush(ctx, commitAction, &note{JobID: jobID, Spec: spec, Result: result}); err != nil {
  317. // On the chance pushing failed because it was not
  318. // possible to fast-forward, ask the repo to fetch
  319. // from upstream ASAP, so the next attempt is more
  320. // likely to succeed.
  321. d.Repo.Notify()
  322. return zero, err
  323. }
  324. revision, err = working.HeadRevision(ctx)
  325. if err != nil {
  326. return zero, err
  327. }
  328. }
  329. return job.Result{
  330. Revision: revision,
  331. Spec: &spec,
  332. Result: result,
  333. }, nil
  334. }
  335. }
  336. // Tell the daemon to synchronise the cluster with the manifests in
  337. // the git repo. This has an error return value because upstream there
  338. // may be comms difficulties or other sources of problems; here, we
  339. // always succeed because it's just bookkeeping.
  340. func (d *Daemon) NotifyChange(ctx context.Context, change v9.Change) error {
  341. switch change.Kind {
  342. case v9.GitChange:
  343. gitUpdate := change.Source.(v9.GitUpdate)
  344. if gitUpdate.URL != d.Repo.Origin().URL && gitUpdate.Branch != d.GitConfig.Branch {
  345. // It isn't strictly an _error_ to be notified about a repo/branch pair
  346. // that isn't ours, but it's worth logging anyway for debugging.
  347. d.Logger.Log("msg", "notified about unrelated change",
  348. "url", gitUpdate.URL,
  349. "branch", gitUpdate.Branch)
  350. break
  351. }
  352. d.Repo.Notify()
  353. case v9.ImageChange:
  354. imageUpdate := change.Source.(v9.ImageUpdate)
  355. d.ImageRefresh <- imageUpdate.Name
  356. }
  357. return nil
  358. }
  359. // JobStatus - Ask the daemon how far it's got committing things; in particular, is the job
  360. // queued? running? committed? If it is done, the commit ref is returned.
  361. func (d *Daemon) JobStatus(ctx context.Context, jobID job.ID) (job.Status, error) {
  362. // Is the job queued, running, or recently finished?
  363. status, ok := d.JobStatusCache.Status(jobID)
  364. if ok {
  365. return status, nil
  366. }
  367. // Look through the commits for a note referencing this job. This
  368. // means that even if fluxd restarts, we will at least remember
  369. // jobs which have pushed a commit.
  370. // FIXME(michael): consider looking at the repo for this, since read op
  371. err := d.WithClone(ctx, func(working *git.Checkout) error {
  372. notes, err := working.NoteRevList(ctx)
  373. if err != nil {
  374. return errors.Wrap(err, "enumerating commit notes")
  375. }
  376. commits, err := d.Repo.CommitsBefore(ctx, "HEAD", d.GitConfig.Path)
  377. if err != nil {
  378. return errors.Wrap(err, "checking revisions for status")
  379. }
  380. for _, commit := range commits {
  381. if _, ok := notes[commit.Revision]; ok {
  382. var n note
  383. ok, err := working.GetNote(ctx, commit.Revision, &n)
  384. if ok && err == nil && n.JobID == jobID {
  385. status = job.Status{
  386. StatusString: job.StatusSucceeded,
  387. Result: job.Result{
  388. Revision: commit.Revision,
  389. Spec: &n.Spec,
  390. Result: n.Result,
  391. },
  392. }
  393. return nil
  394. }
  395. }
  396. }
  397. return unknownJobError(jobID)
  398. })
  399. return status, err
  400. }
  401. // Ask the daemon how far it's got applying things; in particular, is it
  402. // past the given commit? Return the list of commits between where
  403. // we have applied (the sync tag) and the ref given, inclusive. E.g., if you send HEAD,
  404. // you'll get all the commits yet to be applied. If you send a hash
  405. // and it's applied at or _past_ it, you'll get an empty list.
  406. func (d *Daemon) SyncStatus(ctx context.Context, commitRef string) ([]string, error) {
  407. commits, err := d.Repo.CommitsBetween(ctx, d.GitConfig.SyncTag, commitRef, d.GitConfig.Path)
  408. if err != nil {
  409. return nil, err
  410. }
  411. // NB we could use the messages too if we decide to change the
  412. // signature of the API to include it.
  413. revs := make([]string, len(commits))
  414. for i, commit := range commits {
  415. revs[i] = commit.Revision
  416. }
  417. return revs, nil
  418. }
  419. func (d *Daemon) GitRepoConfig(ctx context.Context, regenerate bool) (v6.GitConfig, error) {
  420. publicSSHKey, err := d.Cluster.PublicSSHKey(regenerate)
  421. if err != nil {
  422. return v6.GitConfig{}, err
  423. }
  424. origin := d.Repo.Origin()
  425. status, _ := d.Repo.Status()
  426. return v6.GitConfig{
  427. Remote: v6.GitRemoteConfig{
  428. URL: origin.URL,
  429. Branch: d.GitConfig.Branch,
  430. Path: d.GitConfig.Path,
  431. },
  432. PublicSSHKey: publicSSHKey,
  433. Status: status,
  434. }, nil
  435. }
  436. // Non-api.Server methods
  437. func (d *Daemon) WithClone(ctx context.Context, fn func(*git.Checkout) error) error {
  438. co, err := d.Repo.Clone(ctx, d.GitConfig)
  439. if err != nil {
  440. return err
  441. }
  442. defer co.Clean()
  443. return fn(co)
  444. }
  445. func unknownJobError(id job.ID) error {
  446. return &fluxerr.Error{
  447. Type: fluxerr.Missing,
  448. Err: fmt.Errorf("unknown job %q", string(id)),
  449. Help: `Job not found
  450. This is often because the job did not result in committing changes,
  451. and therefore had no lasting effect. A release dry-run is an example
  452. of a job that does not result in a commit.
  453. If you were expecting changes to be committed, this may mean that the
  454. job failed, but its status was lost.
  455. In both of the above cases it is OK to retry the operation that
  456. resulted in this error.
  457. If you get this error repeatedly, it's probably a bug. Please log an
  458. issue describing what you were attempting, and posting logs from the
  459. daemon if possible:
  460. https://github.com/weaveworks/flux/issues
  461. `,
  462. }
  463. }
  464. func (d *Daemon) LogEvent(ev event.Event) error {
  465. if d.EventWriter == nil {
  466. d.Logger.Log("event", ev, "logupstream", "false")
  467. return nil
  468. }
  469. d.Logger.Log("event", ev, "logupstream", "true")
  470. return d.EventWriter.LogEvent(ev)
  471. }
  472. // vvv helpers vvv
  473. func containers2containers(cs []resource.Container) []v6.Container {
  474. res := make([]v6.Container, len(cs))
  475. for i, c := range cs {
  476. res[i] = v6.Container{
  477. Name: c.Name,
  478. Current: image.Info{
  479. ID: c.Image,
  480. },
  481. }
  482. }
  483. return res
  484. }
  485. func containersWithAvailable(service cluster.Controller, images update.ImageMap) (res []v6.Container) {
  486. for _, c := range service.ContainersOrNil() {
  487. available := images.Available(c.Image.Name)
  488. availableErr := ""
  489. if available == nil {
  490. availableErr = registry.ErrNoImageData.Error()
  491. }
  492. res = append(res, v6.Container{
  493. Name: c.Name,
  494. Current: image.Info{
  495. ID: c.Image,
  496. },
  497. Available: available,
  498. AvailableError: availableErr,
  499. })
  500. }
  501. return res
  502. }
  503. func policyCommitMessage(us policy.Updates, cause update.Cause) string {
  504. // shortcut, since we want roughly the same information
  505. events := policyEvents(us, time.Now())
  506. commitMsg := &bytes.Buffer{}
  507. prefix := "- "
  508. switch {
  509. case cause.Message != "":
  510. fmt.Fprintf(commitMsg, "%s\n\n", cause.Message)
  511. case len(events) > 1:
  512. fmt.Fprintf(commitMsg, "Updated service policies\n\n")
  513. default:
  514. prefix = ""
  515. }
  516. for _, event := range events {
  517. fmt.Fprintf(commitMsg, "%s%v\n", prefix, event)
  518. }
  519. return commitMsg.String()
  520. }
  521. // policyEvents builds a map of events (by type), for all the events in this set of
  522. // updates. There will be one event per type, containing all service ids
  523. // affected by that event. e.g. all automated services will share an event.
  524. func policyEvents(us policy.Updates, now time.Time) map[string]event.Event {
  525. eventsByType := map[string]event.Event{}
  526. for serviceID, update := range us {
  527. for _, eventType := range policyEventTypes(update) {
  528. e, ok := eventsByType[eventType]
  529. if !ok {
  530. e = event.Event{
  531. ServiceIDs: []flux.ResourceID{},
  532. Type: eventType,
  533. StartedAt: now,
  534. EndedAt: now,
  535. LogLevel: event.LogLevelInfo,
  536. }
  537. }
  538. e.ServiceIDs = append(e.ServiceIDs, serviceID)
  539. eventsByType[eventType] = e
  540. }
  541. }
  542. return eventsByType
  543. }
  544. // policyEventTypes is a deduped list of all event types this update contains
  545. func policyEventTypes(u policy.Update) []string {
  546. types := map[string]struct{}{}
  547. for p, _ := range u.Add {
  548. switch {
  549. case p == policy.Automated:
  550. types[event.EventAutomate] = struct{}{}
  551. case p == policy.Locked:
  552. types[event.EventLock] = struct{}{}
  553. default:
  554. types[event.EventUpdatePolicy] = struct{}{}
  555. }
  556. }
  557. for p, _ := range u.Remove {
  558. switch {
  559. case p == policy.Automated:
  560. types[event.EventDeautomate] = struct{}{}
  561. case p == policy.Locked:
  562. types[event.EventUnlock] = struct{}{}
  563. default:
  564. types[event.EventUpdatePolicy] = struct{}{}
  565. }
  566. }
  567. var result []string
  568. for t := range types {
  569. result = append(result, t)
  570. }
  571. sort.Strings(result)
  572. return result
  573. }