GitOps for k8s
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.go 28KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763
  1. package main
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/http"
  6. _ "net/http/pprof"
  7. "os"
  8. "os/exec"
  9. "os/signal"
  10. "path/filepath"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. helmopclient "github.com/fluxcd/helm-operator/pkg/client/clientset/versioned"
  18. "github.com/go-kit/kit/log"
  19. "github.com/prometheus/client_golang/prometheus/promhttp"
  20. "github.com/spf13/pflag"
  21. crd "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
  22. k8serrors "k8s.io/apimachinery/pkg/api/errors"
  23. k8sruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. k8sclientdynamic "k8s.io/client-go/dynamic"
  25. k8sclient "k8s.io/client-go/kubernetes"
  26. "k8s.io/client-go/rest"
  27. "k8s.io/client-go/tools/clientcmd"
  28. "k8s.io/klog"
  29. "github.com/weaveworks/flux/checkpoint"
  30. "github.com/weaveworks/flux/cluster"
  31. "github.com/weaveworks/flux/cluster/kubernetes"
  32. "github.com/weaveworks/flux/daemon"
  33. "github.com/weaveworks/flux/git"
  34. "github.com/weaveworks/flux/gpg"
  35. transport "github.com/weaveworks/flux/http"
  36. "github.com/weaveworks/flux/http/client"
  37. daemonhttp "github.com/weaveworks/flux/http/daemon"
  38. "github.com/weaveworks/flux/image"
  39. hrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned"
  40. "github.com/weaveworks/flux/job"
  41. "github.com/weaveworks/flux/manifests"
  42. "github.com/weaveworks/flux/registry"
  43. "github.com/weaveworks/flux/registry/cache"
  44. registryMemcache "github.com/weaveworks/flux/registry/cache/memcached"
  45. registryMiddleware "github.com/weaveworks/flux/registry/middleware"
  46. "github.com/weaveworks/flux/remote"
  47. "github.com/weaveworks/flux/ssh"
  48. fluxsync "github.com/weaveworks/flux/sync"
  49. )
  50. var version = "unversioned"
  51. const (
  52. product = "weave-flux"
  53. // This is used as the "burst" value for rate limiting, and
  54. // therefore also as the limit to the number of concurrent fetches
  55. // and memcached connections, since these in general can't do any
  56. // more work than is allowed by the burst amount.
  57. defaultRemoteConnections = 10
  58. // There are running systems that assume these defaults (by not
  59. // supplying a value for one or both). Don't change them.
  60. defaultGitSyncTag = "flux-sync"
  61. defaultGitNotesRef = "flux"
  62. defaultGitSkipMessage = "\n\n[ci skip]"
  63. RequireECR = "ecr"
  64. )
  65. var (
  66. RequireValues = []string{RequireECR}
  67. )
  68. func optionalVar(fs *pflag.FlagSet, value ssh.OptionalValue, name, usage string) ssh.OptionalValue {
  69. fs.Var(value, name, usage)
  70. return value
  71. }
  72. type stringset []string
  73. func (set stringset) has(possible string) bool {
  74. for _, s := range set {
  75. if s == possible {
  76. return true
  77. }
  78. }
  79. return false
  80. }
  81. func main() {
  82. // Flag domain.
  83. fs := pflag.NewFlagSet("default", pflag.ContinueOnError)
  84. fs.Usage = func() {
  85. fmt.Fprintf(os.Stderr, "DESCRIPTION\n")
  86. fmt.Fprintf(os.Stderr, " fluxd is the agent of flux.\n")
  87. fmt.Fprintf(os.Stderr, "\n")
  88. fmt.Fprintf(os.Stderr, "FLAGS\n")
  89. fs.PrintDefaults()
  90. }
  91. // This mirrors how kubectl extracts information from the environment.
  92. var (
  93. logFormat = fs.String("log-format", "fmt", "change the log format.")
  94. listenAddr = fs.StringP("listen", "l", ":3030", "listen address where /metrics and API will be served")
  95. listenMetricsAddr = fs.String("listen-metrics", "", "listen address for /metrics endpoint")
  96. kubernetesKubectl = fs.String("kubernetes-kubectl", "", "optional, explicit path to kubectl tool")
  97. versionFlag = fs.Bool("version", false, "get version number")
  98. // Git repo & key etc.
  99. gitURL = fs.String("git-url", "", "URL of git repo with Kubernetes manifests; e.g., git@github.com:weaveworks/flux-get-started")
  100. gitBranch = fs.String("git-branch", "master", "branch of git repo to use for Kubernetes manifests")
  101. gitPath = fs.StringSlice("git-path", []string{}, "relative paths within the git repo to locate Kubernetes manifests")
  102. gitReadonly = fs.Bool("git-readonly", false, fmt.Sprintf("use to prevent Flux from pushing changes to git; implies --sync-state=%s", fluxsync.NativeStateMode))
  103. gitUser = fs.String("git-user", "Weave Flux", "username to use as git committer")
  104. gitEmail = fs.String("git-email", "support@weave.works", "email to use as git committer")
  105. gitSetAuthor = fs.Bool("git-set-author", false, "if set, the author of git commits will reflect the user who initiated the commit and will differ from the git committer.")
  106. gitLabel = fs.String("git-label", "", "label to keep track of sync progress; overrides both --git-sync-tag and --git-notes-ref")
  107. gitSecret = fs.Bool("git-secret", false, `if set, git-secret will be run on every git checkout. A gpg key must be imported using --git-gpg-key-import or by mounting a keyring containing it directly`)
  108. // Old git config; still used if --git-label is not supplied, but --git-label is preferred.
  109. gitSyncTag = fs.String("git-sync-tag", defaultGitSyncTag, fmt.Sprintf("tag to use to mark sync progress for this cluster (only relevant when --sync-state=%s)", fluxsync.GitTagStateMode))
  110. gitNotesRef = fs.String("git-notes-ref", defaultGitNotesRef, "ref to use for keeping commit annotations in git notes")
  111. gitSkip = fs.Bool("git-ci-skip", false, `append "[ci skip]" to commit messages so that CI will skip builds`)
  112. gitSkipMessage = fs.String("git-ci-skip-message", "", "additional text for commit messages, useful for skipping builds in CI. Use this to supply specific text, or set --git-ci-skip")
  113. gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits")
  114. gitTimeout = fs.Duration("git-timeout", 20*time.Second, "duration after which git operations time out")
  115. // GPG commit signing
  116. gitImportGPG = fs.StringSlice("git-gpg-key-import", []string{}, "keys at the paths given will be imported for use of signing and verifying commits")
  117. gitSigningKey = fs.String("git-signing-key", "", "if set, commits Flux makes will be signed with this GPG key")
  118. gitVerifySignatures = fs.Bool("git-verify-signatures", false, "if set, the signature of commits will be verified before Flux applies them")
  119. // syncing
  120. syncInterval = fs.Duration("sync-interval", 5*time.Minute, "apply config in git to cluster at least this often, even if there are no new commits")
  121. syncGC = fs.Bool("sync-garbage-collection", false, "experimental; delete resources that were created by fluxd, but are no longer in the git repo")
  122. dryGC = fs.Bool("sync-garbage-collection-dry", false, "experimental; only log what would be garbage collected, rather than deleting. Implies --sync-garbage-collection")
  123. syncState = fs.String("sync-state", fluxsync.GitTagStateMode, fmt.Sprintf("method used by flux for storing state (one of {%s})", strings.Join([]string{fluxsync.GitTagStateMode, fluxsync.NativeStateMode}, ",")))
  124. // registry
  125. memcachedHostname = fs.String("memcached-hostname", "memcached", "hostname for memcached service.")
  126. memcachedPort = fs.Int("memcached-port", 11211, "memcached service port.")
  127. memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "maximum time to wait before giving up on memcached requests.")
  128. memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
  129. automationInterval = fs.Duration("automation-interval", 5*time.Minute, "period at which to check for image updates for automated workloads")
  130. registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images")
  131. registryRPS = fs.Float64("registry-rps", 50, "maximum registry requests per second per host")
  132. registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache")
  133. registryTrace = fs.Bool("registry-trace", false, "output trace of image registry requests to log")
  134. registryInsecure = fs.StringSlice("registry-insecure-host", []string{}, "let these registry hosts skip TLS host verification and fall back to using HTTP instead of HTTPS; this allows man-in-the-middle attacks, so use with extreme caution")
  135. registryExcludeImage = fs.StringSlice("registry-exclude-image", []string{"k8s.gcr.io/*"}, "do not scan images that match these glob expressions; the default is to exclude the 'k8s.gcr.io/*' images")
  136. registryUseLabels = fs.StringSlice("registry-use-labels", []string{"index.docker.io/weaveworks/*", "index.docker.io/fluxcd/*"}, "use the timestamp (RFC3339) from labels for (canonical) image refs that match these glob expression")
  137. // AWS authentication
  138. registryAWSRegions = fs.StringSlice("registry-ecr-region", nil, "include just these AWS regions when scanning images in ECR; when not supplied, the cluster's region will included if it can be detected through the AWS API")
  139. registryAWSAccountIDs = fs.StringSlice("registry-ecr-include-id", nil, "restrict ECR scanning to these AWS account IDs; if not supplied, all account IDs that aren't excluded may be scanned")
  140. registryAWSBlockAccountIDs = fs.StringSlice("registry-ecr-exclude-id", []string{registry.EKS_SYSTEM_ACCOUNT}, "do not scan ECR for images in these AWS account IDs; the default is to exclude the EKS system account")
  141. registryRequire = fs.StringSlice("registry-require", nil, fmt.Sprintf(`exit with an error if auto-authentication with any of the given registries is not possible (possible values: {%s})`, strings.Join(RequireValues, ",")))
  142. // k8s-secret backed ssh keyring configuration
  143. k8sInCluster = fs.Bool("k8s-in-cluster", true, "set this to true if fluxd is deployed as a container inside Kubernetes")
  144. k8sSecretName = fs.String("k8s-secret-name", "flux-git-deploy", "name of the k8s secret used to store the private SSH key")
  145. k8sSecretVolumeMountPath = fs.String("k8s-secret-volume-mount-path", "/etc/fluxd/ssh", "mount location of the k8s secret storing the private SSH key")
  146. k8sSecretDataKey = fs.String("k8s-secret-data-key", "identity", "data key holding the private SSH key within the k8s secret")
  147. k8sNamespaceWhitelist = fs.StringSlice("k8s-namespace-whitelist", []string{}, "experimental, optional: restrict the view of the cluster to the namespaces listed. All namespaces are included if this is not set")
  148. k8sAllowNamespace = fs.StringSlice("k8s-allow-namespace", []string{}, "experimental: restrict all operations to the provided namespaces")
  149. // SSH key generation
  150. sshKeyBits = optionalVar(fs, &ssh.KeyBitsValue{}, "ssh-keygen-bits", "-b argument to ssh-keygen (default unspecified)")
  151. sshKeyType = optionalVar(fs, &ssh.KeyTypeValue{}, "ssh-keygen-type", "-t argument to ssh-keygen (default unspecified)")
  152. sshKeygenDir = fs.String("ssh-keygen-dir", "", "directory, ideally on a tmpfs volume, in which to generate new SSH keys when necessary")
  153. // manifest generation
  154. manifestGeneration = fs.Bool("manifest-generation", false, "experimental; search for .flux.yaml files to generate manifests")
  155. // upstream connection settings
  156. upstreamURL = fs.String("connect", "", "connect to an upstream service e.g., Weave Cloud, at this base address")
  157. token = fs.String("token", "", "authentication token for upstream service")
  158. rpcTimeout = fs.Duration("rpc-timeout", 10*time.Second, "maximum time an operation requested by the upstream may take")
  159. dockerConfig = fs.String("docker-config", "", "path to a docker config to use for image registry credentials")
  160. _ = fs.Duration("registry-cache-expiry", 0, "")
  161. )
  162. fs.MarkDeprecated("registry-cache-expiry", "no longer used; cache entries are expired adaptively according to how often they change")
  163. fs.MarkDeprecated("k8s-namespace-whitelist", "changed to --k8s-allow-namespace, use that instead")
  164. fs.MarkDeprecated("registry-poll-interval", "changed to --automation-interval, use that instead")
  165. var kubeConfig *string
  166. {
  167. // Set the default kube config
  168. if home := homeDir(); home != "" {
  169. kubeConfig = fs.String("kube-config", filepath.Join(home, ".kube", "config"), "the absolute path of the k8s config file.")
  170. } else {
  171. kubeConfig = fs.String("kube-config", "", "the absolute path of the k8s config file.")
  172. }
  173. }
  174. // Explicitly initialize klog to enable stderr logging,
  175. // and parse our own flags.
  176. klog.InitFlags(nil)
  177. err := fs.Parse(os.Args[1:])
  178. switch {
  179. case err == pflag.ErrHelp:
  180. os.Exit(0)
  181. case err != nil:
  182. fmt.Fprintf(os.Stderr, "Error: %s\n\n", err.Error())
  183. fs.Usage()
  184. os.Exit(2)
  185. case *versionFlag:
  186. fmt.Println(version)
  187. os.Exit(0)
  188. }
  189. // Logger component.
  190. var logger log.Logger
  191. {
  192. switch *logFormat {
  193. case "json":
  194. logger = log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
  195. case "fmt":
  196. logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
  197. default:
  198. logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
  199. }
  200. logger = log.With(logger, "ts", log.DefaultTimestampUTC)
  201. logger = log.With(logger, "caller", log.DefaultCaller)
  202. }
  203. logger.Log("version", version)
  204. // Silence access errors logged internally by client-go
  205. k8slog := log.With(logger,
  206. "type", "internal kubernetes error",
  207. "kubernetes_caller", log.Valuer(func() interface{} {
  208. _, file, line, _ := runtime.Caller(5) // we want to log one level deeper than k8sruntime.HandleError
  209. idx := strings.Index(file, "/k8s.io/")
  210. return file[idx+1:] + ":" + strconv.Itoa(line)
  211. }))
  212. logErrorUnlessAccessRelated := func(err error) {
  213. errLower := strings.ToLower(err.Error())
  214. if k8serrors.IsForbidden(err) || k8serrors.IsNotFound(err) ||
  215. strings.Contains(errLower, "forbidden") ||
  216. strings.Contains(errLower, "not found") {
  217. return
  218. }
  219. k8slog.Log("err", err)
  220. }
  221. k8sruntime.ErrorHandlers = []func(error){logErrorUnlessAccessRelated}
  222. // Argument validation
  223. if *gitReadonly {
  224. if *syncState == fluxsync.GitTagStateMode {
  225. logger.Log("warning", fmt.Sprintf("--git-readonly prevents use of --sync-state=%s. Forcing to --sync-state=%s", fluxsync.GitTagStateMode, fluxsync.NativeStateMode))
  226. *syncState = fluxsync.NativeStateMode
  227. }
  228. gitRelatedFlags := []string{
  229. "git-user",
  230. "git-email",
  231. "git-sync-tag",
  232. "git-set-author",
  233. "git-ci-skip",
  234. "git-ci-skip-message",
  235. }
  236. var changedGitRelatedFlags []string
  237. for _, gitRelatedFlag := range gitRelatedFlags {
  238. if fs.Changed(gitRelatedFlag) {
  239. changedGitRelatedFlags = append(changedGitRelatedFlags, gitRelatedFlag)
  240. }
  241. }
  242. if len(changedGitRelatedFlags) > 0 {
  243. logger.Log("warning", fmt.Sprintf("configuring any of {%s} has no effect when --git-readonly is set", strings.Join(changedGitRelatedFlags, ", ")))
  244. }
  245. }
  246. // Maintain backwards compatibility with the --registry-poll-interval
  247. // flag, but only if the --automation-interval is not set to a custom
  248. // (non default) value.
  249. if fs.Changed("registry-poll-interval") && !fs.Changed("automation-interval") {
  250. *automationInterval = *registryPollInterval
  251. }
  252. // Sort out values for the git tag and notes ref. There are
  253. // running deployments that assume the defaults as given, so don't
  254. // mess with those unless explicitly told.
  255. if fs.Changed("git-label") {
  256. *gitSyncTag = *gitLabel
  257. *gitNotesRef = *gitLabel
  258. for _, f := range []string{"git-sync-tag", "git-notes-ref"} {
  259. if fs.Changed(f) {
  260. logger.Log("overridden", f, "value", *gitLabel)
  261. }
  262. }
  263. }
  264. if *gitSkipMessage == "" && *gitSkip {
  265. *gitSkipMessage = defaultGitSkipMessage
  266. }
  267. for _, path := range *gitPath {
  268. if len(path) > 0 && path[0] == '/' {
  269. logger.Log("err", "subdirectory given as --git-path should not have leading forward slash")
  270. os.Exit(1)
  271. }
  272. }
  273. if *sshKeygenDir == "" {
  274. logger.Log("info", fmt.Sprintf("SSH keygen dir (--ssh-keygen-dir) not provided, so using the deploy key volume (--k8s-secret-volume-mount-path=%s); this may cause problems if the deploy key volume is mounted read-only", *k8sSecretVolumeMountPath))
  275. *sshKeygenDir = *k8sSecretVolumeMountPath
  276. }
  277. // Import GPG keys, if we've been told where to look for them
  278. for _, p := range *gitImportGPG {
  279. keyfiles, err := gpg.ImportKeys(p, *gitVerifySignatures)
  280. if err != nil {
  281. logger.Log("error", fmt.Sprintf("failed to import GPG key(s) from %s", p), "err", err.Error())
  282. }
  283. if keyfiles != nil {
  284. logger.Log("info", fmt.Sprintf("imported GPG key(s) from %s", p), "files", fmt.Sprintf("%v", keyfiles))
  285. }
  286. }
  287. possiblyRequired := stringset(RequireValues)
  288. for _, r := range *registryRequire {
  289. if !possiblyRequired.has(r) {
  290. logger.Log("err", fmt.Sprintf("--registry-require value %q is not in possible values {%s}", r, strings.Join(RequireValues, ",")))
  291. os.Exit(1)
  292. }
  293. }
  294. mandatoryRegistry := stringset(*registryRequire)
  295. if *gitSecret && len(*gitImportGPG) == 0 {
  296. logger.Log("warning", fmt.Sprintf("--git-secret is enabled but there is no GPG key(s) provided using --git-gpg-key-import, we assume you mounted the keyring directly and continue"))
  297. }
  298. // Mechanical components.
  299. // When we can receive from this channel, it indicates that we
  300. // are ready to shut down.
  301. errc := make(chan error)
  302. // This signals other routines to shut down;
  303. shutdown := make(chan struct{})
  304. // .. and this is to wait for other routines to shut down cleanly.
  305. shutdownWg := &sync.WaitGroup{}
  306. go func() {
  307. c := make(chan os.Signal, 1)
  308. signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
  309. errc <- fmt.Errorf("%s", <-c)
  310. }()
  311. // Cluster component.
  312. var restClientConfig *rest.Config
  313. {
  314. if *k8sInCluster {
  315. logger.Log("msg", "using in cluster config to connect to the cluster")
  316. restClientConfig, err = rest.InClusterConfig()
  317. if err != nil {
  318. logger.Log("err", err)
  319. os.Exit(1)
  320. }
  321. } else {
  322. logger.Log("msg", fmt.Sprintf("using kube config: %q to connect to the cluster", *kubeConfig))
  323. restClientConfig, err = clientcmd.BuildConfigFromFlags("", *kubeConfig)
  324. if err != nil {
  325. logger.Log("err", err)
  326. os.Exit(1)
  327. }
  328. }
  329. restClientConfig.QPS = 50.0
  330. restClientConfig.Burst = 100
  331. }
  332. var clusterVersion string
  333. var sshKeyRing ssh.KeyRing
  334. var k8s cluster.Cluster
  335. var k8sManifests manifests.Manifests
  336. var imageCreds func() registry.ImageCreds
  337. {
  338. clientset, err := k8sclient.NewForConfig(restClientConfig)
  339. if err != nil {
  340. logger.Log("err", err)
  341. os.Exit(1)
  342. }
  343. dynamicClientset, err := k8sclientdynamic.NewForConfig(restClientConfig)
  344. if err != nil {
  345. logger.Log("err", err)
  346. os.Exit(1)
  347. }
  348. fhrClientset, err := hrclient.NewForConfig(restClientConfig)
  349. if err != nil {
  350. logger.Log("error", fmt.Sprintf("Error building hrclient clientset: %v", err))
  351. os.Exit(1)
  352. }
  353. hrClientset, err := helmopclient.NewForConfig(restClientConfig)
  354. if err != nil {
  355. logger.Log("error", fmt.Sprintf("Error building helm operator clientset: %v", err))
  356. os.Exit(1)
  357. }
  358. crdClient, err := crd.NewForConfig(restClientConfig)
  359. if err != nil {
  360. logger.Log("error", fmt.Sprintf("Error building API extensions (CRD) clientset: %v", err))
  361. os.Exit(1)
  362. }
  363. discoClientset := kubernetes.MakeCachedDiscovery(clientset.Discovery(), crdClient, shutdown)
  364. serverVersion, err := clientset.ServerVersion()
  365. if err != nil {
  366. logger.Log("err", err)
  367. os.Exit(1)
  368. }
  369. clusterVersion = "kubernetes-" + serverVersion.GitVersion
  370. if *k8sInCluster {
  371. namespace, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
  372. if err != nil {
  373. logger.Log("err", err)
  374. os.Exit(1)
  375. }
  376. sshKeyRing, err = kubernetes.NewSSHKeyRing(kubernetes.SSHKeyRingConfig{
  377. SecretAPI: clientset.CoreV1().Secrets(string(namespace)),
  378. SecretName: *k8sSecretName,
  379. SecretVolumeMountPath: *k8sSecretVolumeMountPath,
  380. SecretDataKey: *k8sSecretDataKey,
  381. KeyBits: sshKeyBits,
  382. KeyType: sshKeyType,
  383. KeyGenDir: *sshKeygenDir,
  384. })
  385. if err != nil {
  386. logger.Log("err", err)
  387. os.Exit(1)
  388. }
  389. publicKey, privateKeyPath := sshKeyRing.KeyPair()
  390. logger := log.With(logger, "component", "cluster")
  391. logger.Log("identity", privateKeyPath)
  392. logger.Log("identity.pub", strings.TrimSpace(publicKey.Key))
  393. } else {
  394. sshKeyRing = ssh.NewNopSSHKeyRing()
  395. }
  396. logger.Log("host", restClientConfig.Host, "version", clusterVersion)
  397. kubectl := *kubernetesKubectl
  398. if kubectl == "" {
  399. kubectl, err = exec.LookPath("kubectl")
  400. } else {
  401. _, err = os.Stat(kubectl)
  402. }
  403. if err != nil {
  404. logger.Log("err", err)
  405. os.Exit(1)
  406. }
  407. logger.Log("kubectl", kubectl)
  408. client := kubernetes.MakeClusterClientset(clientset, dynamicClientset, fhrClientset, hrClientset, discoClientset)
  409. kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig)
  410. allowedNamespaces := append(*k8sNamespaceWhitelist, *k8sAllowNamespace...)
  411. k8sInst := kubernetes.NewCluster(client, kubectlApplier, sshKeyRing, logger, allowedNamespaces, *registryExcludeImage)
  412. k8sInst.GC = *syncGC
  413. k8sInst.DryGC = *dryGC
  414. if err := k8sInst.Ping(); err != nil {
  415. logger.Log("ping", err)
  416. } else {
  417. logger.Log("ping", true)
  418. }
  419. k8s = k8sInst
  420. imageCreds = k8sInst.ImagesToFetch
  421. // There is only one way we currently interpret a repo of
  422. // files as manifests, and that's as Kubernetes yamels.
  423. namespacer, err := kubernetes.NewNamespacer(discoClientset)
  424. if err != nil {
  425. logger.Log("err", err)
  426. os.Exit(1)
  427. }
  428. k8sManifests = kubernetes.NewManifests(namespacer, logger)
  429. }
  430. // Wrap the procedure for collecting images to scan
  431. {
  432. awsConf := registry.AWSRegistryConfig{
  433. Regions: *registryAWSRegions,
  434. AccountIDs: *registryAWSAccountIDs,
  435. BlockIDs: *registryAWSBlockAccountIDs,
  436. }
  437. awsPreflight, credsWithAWSAuth := registry.ImageCredsWithAWSAuth(imageCreds, log.With(logger, "component", "aws"), awsConf)
  438. if mandatoryRegistry.has(RequireECR) {
  439. if err := awsPreflight(); err != nil {
  440. logger.Log("error", "AWS API required (due to --registry-require=ecr), but not available", "err", err)
  441. os.Exit(1)
  442. }
  443. }
  444. imageCreds = credsWithAWSAuth
  445. if *dockerConfig != "" {
  446. credsWithDefaults, err := registry.ImageCredsWithDefaults(imageCreds, *dockerConfig)
  447. if err != nil {
  448. logger.Log("warning", "--docker-config not used; pre-flight check failed", "err", err)
  449. } else {
  450. imageCreds = credsWithDefaults
  451. }
  452. }
  453. }
  454. // Registry components
  455. var cacheRegistry registry.Registry
  456. var cacheWarmer *cache.Warmer
  457. {
  458. // Cache client, for use by registry and cache warmer
  459. var cacheClient cache.Client
  460. var memcacheClient *registryMemcache.MemcacheClient
  461. memcacheConfig := registryMemcache.MemcacheConfig{
  462. Host: *memcachedHostname,
  463. Service: *memcachedService,
  464. Timeout: *memcachedTimeout,
  465. UpdateInterval: 1 * time.Minute,
  466. Logger: log.With(logger, "component", "memcached"),
  467. MaxIdleConns: *registryBurst,
  468. }
  469. // if no memcached service is specified use the ClusterIP name instead of SRV records
  470. if *memcachedService == "" {
  471. memcacheClient = registryMemcache.NewFixedServerMemcacheClient(memcacheConfig,
  472. fmt.Sprintf("%s:%d", *memcachedHostname, *memcachedPort))
  473. } else {
  474. memcacheClient = registryMemcache.NewMemcacheClient(memcacheConfig)
  475. }
  476. defer memcacheClient.Stop()
  477. cacheClient = cache.InstrumentClient(memcacheClient)
  478. cacheRegistry = &cache.Cache{
  479. Reader: cacheClient,
  480. Decorators: []cache.Decorator{
  481. cache.TimestampLabelWhitelist(*registryUseLabels),
  482. },
  483. }
  484. cacheRegistry = registry.NewInstrumentedRegistry(cacheRegistry)
  485. // Remote client, for warmer to refresh entries
  486. registryLogger := log.With(logger, "component", "registry")
  487. registryLimits := &registryMiddleware.RateLimiters{
  488. RPS: *registryRPS,
  489. Burst: *registryBurst,
  490. Logger: log.With(logger, "component", "ratelimiter"),
  491. }
  492. remoteFactory := &registry.RemoteClientFactory{
  493. Logger: registryLogger,
  494. Limiters: registryLimits,
  495. Trace: *registryTrace,
  496. InsecureHosts: *registryInsecure,
  497. }
  498. // Warmer
  499. var err error
  500. cacheWarmer, err = cache.NewWarmer(remoteFactory, cacheClient, *registryBurst)
  501. if err != nil {
  502. logger.Log("err", err)
  503. os.Exit(1)
  504. }
  505. }
  506. // Checkpoint: we want to include the fact of whether the daemon
  507. // was given a Git repo it could clone; but the expected scenario
  508. // is that it will have been set up already, and we don't want to
  509. // report anything before seeing if it works. So, don't start
  510. // until we have failed or succeeded.
  511. updateCheckLogger := log.With(logger, "component", "checkpoint")
  512. checkpointFlags := map[string]string{
  513. "cluster-version": clusterVersion,
  514. "git-configured": strconv.FormatBool(*gitURL != ""),
  515. }
  516. checkpoint.CheckForUpdates(product, version, checkpointFlags, updateCheckLogger)
  517. gitRemote := git.Remote{URL: *gitURL}
  518. gitConfig := git.Config{
  519. Paths: *gitPath,
  520. Branch: *gitBranch,
  521. NotesRef: *gitNotesRef,
  522. UserName: *gitUser,
  523. UserEmail: *gitEmail,
  524. SigningKey: *gitSigningKey,
  525. SetAuthor: *gitSetAuthor,
  526. SkipMessage: *gitSkipMessage,
  527. GitSecret: *gitSecret,
  528. }
  529. repo := git.NewRepo(gitRemote, git.PollInterval(*gitPollInterval), git.Timeout(*gitTimeout), git.Branch(*gitBranch), git.IsReadOnly(*gitReadonly))
  530. {
  531. shutdownWg.Add(1)
  532. go func() {
  533. err := repo.Start(shutdown, shutdownWg)
  534. if err != nil {
  535. errc <- err
  536. }
  537. }()
  538. }
  539. logger.Log(
  540. "url", *gitURL,
  541. "user", *gitUser,
  542. "email", *gitEmail,
  543. "signing-key", *gitSigningKey,
  544. "verify-signatures", *gitVerifySignatures,
  545. "sync-tag", *gitSyncTag,
  546. "state", *syncState,
  547. "readonly", *gitReadonly,
  548. "notes-ref", *gitNotesRef,
  549. "set-author", *gitSetAuthor,
  550. "git-secret", *gitSecret,
  551. )
  552. var jobs *job.Queue
  553. {
  554. jobs = job.NewQueue(shutdown, shutdownWg)
  555. }
  556. var syncProvider fluxsync.State
  557. switch *syncState {
  558. case fluxsync.NativeStateMode:
  559. namespace, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
  560. if err != nil {
  561. logger.Log("err", err)
  562. os.Exit(1)
  563. }
  564. syncProvider, err = fluxsync.NewNativeSyncProvider(
  565. string(namespace),
  566. *k8sSecretName,
  567. )
  568. if err != nil {
  569. logger.Log("err", err)
  570. os.Exit(1)
  571. }
  572. case fluxsync.GitTagStateMode:
  573. syncProvider, err = fluxsync.NewGitTagSyncProvider(
  574. repo,
  575. *gitSyncTag,
  576. *gitSigningKey,
  577. *gitVerifySignatures,
  578. gitConfig,
  579. )
  580. if err != nil {
  581. logger.Log("err", err)
  582. os.Exit(1)
  583. }
  584. default:
  585. logger.Log("error", "unknown sync state mode", "mode", *syncState)
  586. os.Exit(1)
  587. }
  588. daemon := &daemon.Daemon{
  589. V: version,
  590. Cluster: k8s,
  591. Manifests: k8sManifests,
  592. Registry: cacheRegistry,
  593. ImageRefresh: make(chan image.Name, 100), // size chosen by fair dice roll
  594. Repo: repo,
  595. GitConfig: gitConfig,
  596. Jobs: jobs,
  597. JobStatusCache: &job.StatusCache{Size: 100},
  598. Logger: log.With(logger, "component", "daemon"),
  599. ManifestGenerationEnabled: *manifestGeneration,
  600. LoopVars: &daemon.LoopVars{
  601. SyncInterval: *syncInterval,
  602. SyncState: syncProvider,
  603. AutomationInterval: *automationInterval,
  604. GitTimeout: *gitTimeout,
  605. GitVerifySignatures: *gitVerifySignatures,
  606. },
  607. }
  608. {
  609. // Connect to fluxsvc if given an upstream address
  610. if *upstreamURL != "" {
  611. upstreamLogger := log.With(logger, "component", "upstream")
  612. upstreamLogger.Log("URL", *upstreamURL)
  613. upstream, err := daemonhttp.NewUpstream(
  614. &http.Client{Timeout: 10 * time.Second},
  615. fmt.Sprintf("fluxd/%v", version),
  616. client.Token(*token),
  617. transport.NewUpstreamRouter(),
  618. *upstreamURL,
  619. remote.NewErrorLoggingServer(daemon, upstreamLogger),
  620. *rpcTimeout,
  621. upstreamLogger,
  622. )
  623. if err != nil {
  624. logger.Log("err", err)
  625. os.Exit(1)
  626. }
  627. daemon.EventWriter = upstream
  628. go func() {
  629. <-shutdown
  630. upstream.Close()
  631. }()
  632. } else {
  633. logger.Log("upstream", "no upstream URL given")
  634. }
  635. }
  636. shutdownWg.Add(1)
  637. go daemon.Loop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop"))
  638. cacheWarmer.Notify = daemon.AskForAutomatedWorkloadImageUpdates
  639. cacheWarmer.Priority = daemon.ImageRefresh
  640. cacheWarmer.Trace = *registryTrace
  641. shutdownWg.Add(1)
  642. go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, imageCreds)
  643. go func() {
  644. mux := http.DefaultServeMux
  645. // Serve /metrics alongside API
  646. if *listenMetricsAddr == "" {
  647. mux.Handle("/metrics", promhttp.Handler())
  648. }
  649. handler := daemonhttp.NewHandler(daemon, daemonhttp.NewRouter())
  650. mux.Handle("/api/flux/", http.StripPrefix("/api/flux", handler))
  651. logger.Log("addr", *listenAddr)
  652. errc <- http.ListenAndServe(*listenAddr, mux)
  653. }()
  654. if *listenMetricsAddr != "" {
  655. go func() {
  656. mux := http.NewServeMux()
  657. mux.Handle("/metrics", promhttp.Handler())
  658. logger.Log("metrics-addr", *listenMetricsAddr)
  659. errc <- http.ListenAndServe(*listenMetricsAddr, mux)
  660. }()
  661. }
  662. // wait here until stopping.
  663. logger.Log("exiting", <-errc)
  664. close(shutdown)
  665. shutdownWg.Wait()
  666. }
  667. func homeDir() string {
  668. // nix
  669. if h := os.Getenv("HOME"); h != "" {
  670. return h
  671. }
  672. // windows
  673. if h := os.Getenv("USERPROFILE"); h != "" {
  674. return h
  675. }
  676. return ""
  677. }