GitOps for k8s
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

warming.go 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package cache
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/go-kit/kit/log"
  11. "github.com/pkg/errors"
  12. "github.com/weaveworks/flux/image"
  13. "github.com/weaveworks/flux/registry"
  14. )
  15. const askForNewImagesInterval = time.Minute
  16. // start off assuming an image will change about an hour from first
  17. // seeing it
  18. const initialRefresh = 1 * time.Hour
  19. // never try to refresh a tag faster than this
  20. const minRefresh = 5 * time.Minute
  21. // never set a refresh deadline longer than this
  22. const maxRefresh = 7 * 24 * time.Hour
  23. // excluded images get an constant, fairly long refresh deadline; we
  24. // don't expect them to become usable e.g., change architecture.
  25. const excludedRefresh = 24 * time.Hour
  26. // the whole set of image manifests for a repo gets a long refresh; in
  27. // general we write it back every time we go 'round the loop, so this
  28. // is mainly for the effect of making garbage collection less likely.
  29. const repoRefresh = maxRefresh
  30. func clipRefresh(r time.Duration) time.Duration {
  31. if r > maxRefresh {
  32. return maxRefresh
  33. }
  34. if r < minRefresh {
  35. return minRefresh
  36. }
  37. return r
  38. }
  39. // Warmer refreshes the information kept in the cache from remote
  40. // registries.
  41. type Warmer struct {
  42. clientFactory registry.ClientFactory
  43. cache Client
  44. burst int
  45. Trace bool
  46. Priority chan image.Name
  47. Notify func()
  48. }
  49. // NewWarmer creates cache warmer that (when Loop is invoked) will
  50. // periodically refresh the values kept in the cache.
  51. func NewWarmer(cf registry.ClientFactory, cacheClient Client, burst int) (*Warmer, error) {
  52. if cf == nil || cacheClient == nil || burst <= 0 {
  53. return nil, errors.New("arguments must be non-nil (or > 0 in the case of burst)")
  54. }
  55. return &Warmer{
  56. clientFactory: cf,
  57. cache: cacheClient,
  58. burst: burst,
  59. }, nil
  60. }
  61. // .. and this is what we keep in the backlog
  62. type backlogItem struct {
  63. image.Name
  64. registry.Credentials
  65. }
  66. // Loop continuously gets the images to populate the cache with,
  67. // and populate the cache with them.
  68. func (w *Warmer) Loop(logger log.Logger, stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) {
  69. defer wg.Done()
  70. refresh := time.Tick(askForNewImagesInterval)
  71. imageCreds := imagesToFetchFunc()
  72. backlog := imageCredsToBacklog(imageCreds)
  73. // We have some fine control over how long to spend on each fetch
  74. // operation, since they are given a `context`. For now though,
  75. // just rattle through them one by one, however long they take.
  76. ctx := context.Background()
  77. // NB the implicit contract here is that the prioritised
  78. // image has to have been running the last time we
  79. // requested the credentials.
  80. priorityWarm := func(name image.Name) {
  81. logger.Log("priority", name.String())
  82. if creds, ok := imageCreds[name]; ok {
  83. w.warm(ctx, time.Now(), logger, name, creds)
  84. } else {
  85. logger.Log("priority", name.String(), "err", "no creds available")
  86. }
  87. }
  88. // This loop acts keeps a kind of priority queue, whereby image
  89. // names coming in on the `Priority` channel are looked up first.
  90. // If there are none, images used in the cluster are refreshed;
  91. // but no more often than once every `askForNewImagesInterval`,
  92. // since there is no effective back-pressure on cache refreshes
  93. // and it would spin freely otherwise.
  94. for {
  95. select {
  96. case <-stop:
  97. logger.Log("stopping", "true")
  98. return
  99. case name := <-w.Priority:
  100. priorityWarm(name)
  101. continue
  102. default:
  103. }
  104. if len(backlog) > 0 {
  105. im := backlog[0]
  106. backlog = backlog[1:]
  107. w.warm(ctx, time.Now(), logger, im.Name, im.Credentials)
  108. } else {
  109. select {
  110. case <-stop:
  111. logger.Log("stopping", "true")
  112. return
  113. case <-refresh:
  114. imageCreds = imagesToFetchFunc()
  115. backlog = imageCredsToBacklog(imageCreds)
  116. case name := <-w.Priority:
  117. priorityWarm(name)
  118. }
  119. }
  120. }
  121. }
  122. func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
  123. backlog := make([]backlogItem, len(imageCreds))
  124. var i int
  125. for name, cred := range imageCreds {
  126. backlog[i] = backlogItem{name, cred}
  127. i++
  128. }
  129. return backlog
  130. }
  131. func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id image.Name, creds registry.Credentials) {
  132. errorLogger := log.With(logger, "canonical_name", id.CanonicalName(), "auth", creds)
  133. client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds)
  134. if err != nil {
  135. errorLogger.Log("err", err.Error())
  136. return
  137. }
  138. // This is what we're going to write back to the cache
  139. var repo ImageRepository
  140. repoKey := NewRepositoryKey(id.CanonicalName())
  141. bytes, _, err := w.cache.GetKey(repoKey)
  142. if err == nil {
  143. err = json.Unmarshal(bytes, &repo)
  144. } else if err == ErrNotCached {
  145. err = nil
  146. }
  147. if err != nil {
  148. errorLogger.Log("err", errors.Wrap(err, "fetching previous result from cache"))
  149. return
  150. }
  151. // Save for comparison later
  152. oldImages := repo.Images
  153. // Now we have the previous result; everything after will be
  154. // attempting to refresh that value. Whatever happens, at the end
  155. // we'll write something back.
  156. defer func() {
  157. bytes, err := json.Marshal(repo)
  158. if err == nil {
  159. err = w.cache.SetKey(repoKey, now.Add(repoRefresh), bytes)
  160. }
  161. if err != nil {
  162. errorLogger.Log("err", errors.Wrap(err, "writing result to cache"))
  163. }
  164. }()
  165. tags, err := client.Tags(ctx)
  166. if err != nil {
  167. if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
  168. errorLogger.Log("err", errors.Wrap(err, "requesting tags"))
  169. repo.LastError = err.Error()
  170. }
  171. return
  172. }
  173. newImages := map[string]image.Info{}
  174. // Create a list of images that need updating
  175. type update struct {
  176. ref image.Ref
  177. previousDigest string
  178. previousRefresh time.Duration
  179. }
  180. var toUpdate []update
  181. // Counters for reporting what happened
  182. var missing, refresh int
  183. for _, tag := range tags {
  184. if tag == "" {
  185. errorLogger.Log("err", "empty tag in fetched tags", "tags", tags)
  186. repo.LastError = "empty tag in fetched tags"
  187. return // abort and let the error be written
  188. }
  189. // See if we have the manifest already cached
  190. newID := id.ToRef(tag)
  191. key := NewManifestKey(newID.CanonicalRef())
  192. bytes, deadline, err := w.cache.GetKey(key)
  193. // If err, then we don't have it yet. Update.
  194. switch {
  195. case err != nil: // by and large these are cache misses, but any error shall count as "not found"
  196. if err != ErrNotCached {
  197. errorLogger.Log("warning", "error from cache", "err", err, "ref", newID)
  198. }
  199. missing++
  200. toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh})
  201. case len(bytes) == 0:
  202. errorLogger.Log("warning", "empty result from cache", "ref", newID)
  203. missing++
  204. toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh})
  205. default:
  206. var entry registry.ImageEntry
  207. if err := json.Unmarshal(bytes, &entry); err == nil {
  208. if w.Trace {
  209. errorLogger.Log("trace", "found cached manifest", "ref", newID, "last_fetched", entry.LastFetched.Format(time.RFC3339), "deadline", deadline.Format(time.RFC3339))
  210. }
  211. if entry.ExcludedReason == "" {
  212. newImages[tag] = entry.Info
  213. if now.After(deadline) {
  214. previousRefresh := minRefresh
  215. lastFetched := entry.Info.LastFetched
  216. if !lastFetched.IsZero() {
  217. previousRefresh = deadline.Sub(lastFetched)
  218. }
  219. toUpdate = append(toUpdate, update{ref: newID, previousRefresh: previousRefresh, previousDigest: entry.Info.Digest})
  220. refresh++
  221. }
  222. } else {
  223. if w.Trace {
  224. logger.Log("trace", "excluded in cache", "ref", newID, "reason", entry.ExcludedReason)
  225. }
  226. if now.After(deadline) {
  227. toUpdate = append(toUpdate, update{ref: newID, previousRefresh: excludedRefresh})
  228. refresh++
  229. }
  230. }
  231. }
  232. }
  233. }
  234. var fetchMx sync.Mutex // also guards access to newImages
  235. var successCount int
  236. var manifestUnknownCount int
  237. if len(toUpdate) > 0 {
  238. logger.Log("info", "refreshing image", "image", id, "tag_count", len(tags), "to_update", len(toUpdate), "of_which_refresh", refresh, "of_which_missing", missing)
  239. // The upper bound for concurrent fetches against a single host is
  240. // w.Burst, so limit the number of fetching goroutines to that.
  241. fetchers := make(chan struct{}, w.burst)
  242. awaitFetchers := &sync.WaitGroup{}
  243. ctxc, cancel := context.WithCancel(ctx)
  244. var once sync.Once
  245. defer cancel()
  246. updates:
  247. for _, up := range toUpdate {
  248. select {
  249. case <-ctxc.Done():
  250. break updates
  251. case fetchers <- struct{}{}:
  252. }
  253. awaitFetchers.Add(1)
  254. go func(update update) {
  255. defer func() { awaitFetchers.Done(); <-fetchers }()
  256. imageID := update.ref
  257. if w.Trace {
  258. errorLogger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String())
  259. }
  260. // Get the image from the remote
  261. entry, err := client.Manifest(ctxc, imageID.Tag)
  262. if err != nil {
  263. if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
  264. // This was due to a context timeout, don't bother logging
  265. return
  266. }
  267. switch {
  268. case strings.Contains(err.Error(), "429"):
  269. // abort the image tags fetching if we've been rate limited
  270. once.Do(func() {
  271. errorLogger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later")
  272. cancel()
  273. })
  274. case strings.Contains(err.Error(), "manifest unknown"):
  275. // Registry is corrupted, keep going, this manifest may not be relevant for automatic updates
  276. fetchMx.Lock()
  277. manifestUnknownCount++
  278. fetchMx.Unlock()
  279. errorLogger.Log("warn", fmt.Sprintf("manifest for tag %s missing in registry %s", imageID.Tag, imageID.Name),
  280. "impact", "flux will fail to auto-release workloads with matching images, ask the respository administrator to fix the inconsistency")
  281. default:
  282. errorLogger.Log("err", err, "ref", imageID)
  283. }
  284. return
  285. }
  286. refresh := update.previousRefresh
  287. reason := ""
  288. switch {
  289. case entry.ExcludedReason != "":
  290. errorLogger.Log("excluded", entry.ExcludedReason, "ref", imageID)
  291. refresh = excludedRefresh
  292. reason = "image is excluded"
  293. case update.previousDigest == "":
  294. entry.Info.LastFetched = now
  295. refresh = update.previousRefresh
  296. reason = "no prior cache entry for image"
  297. case entry.Info.Digest == update.previousDigest:
  298. entry.Info.LastFetched = now
  299. refresh = clipRefresh(refresh * 2)
  300. reason = "image digest is same"
  301. default: // i.e., not excluded, but the digests differ -> the tag was moved
  302. entry.Info.LastFetched = now
  303. refresh = clipRefresh(refresh / 2)
  304. reason = "image digest is different"
  305. }
  306. if w.Trace {
  307. errorLogger.Log("trace", "caching manifest", "ref", imageID, "last_fetched", now.Format(time.RFC3339), "refresh", refresh.String(), "reason", reason)
  308. }
  309. key := NewManifestKey(imageID.CanonicalRef())
  310. // Write back to memcached
  311. val, err := json.Marshal(entry)
  312. if err != nil {
  313. errorLogger.Log("err", err, "ref", imageID)
  314. return
  315. }
  316. err = w.cache.SetKey(key, now.Add(refresh), val)
  317. if err != nil {
  318. errorLogger.Log("err", err, "ref", imageID)
  319. return
  320. }
  321. fetchMx.Lock()
  322. successCount++
  323. if entry.ExcludedReason == "" {
  324. newImages[imageID.Tag] = entry.Info
  325. }
  326. fetchMx.Unlock()
  327. }(up)
  328. }
  329. awaitFetchers.Wait()
  330. logger.Log("updated", id.String(), "successful", successCount, "attempted", len(toUpdate))
  331. }
  332. // We managed to fetch new metadata for everything we needed.
  333. // Ratchet the result forward.
  334. if successCount+manifestUnknownCount == len(toUpdate) {
  335. repo = ImageRepository{
  336. LastUpdate: time.Now(),
  337. RepositoryMetadata: image.RepositoryMetadata{
  338. Images: newImages,
  339. Tags: tags,
  340. },
  341. }
  342. // If we got through all that without bumping into `HTTP 429
  343. // Too Many Requests` (or other problems), we can potentially
  344. // creep the rate limit up
  345. w.clientFactory.Succeed(id.CanonicalName())
  346. }
  347. if w.Notify != nil {
  348. cacheTags := StringSet{}
  349. for t := range oldImages {
  350. cacheTags[t] = struct{}{}
  351. }
  352. // If there's more tags than there used to be, there must be
  353. // at least one new tag.
  354. if len(cacheTags) < len(tags) {
  355. w.Notify()
  356. return
  357. }
  358. // Otherwise, check whether there are any entries in the
  359. // fetched tags that aren't in the cached tags.
  360. tagSet := NewStringSet(tags)
  361. if !tagSet.Subset(cacheTags) {
  362. w.Notify()
  363. }
  364. }
  365. }
  366. // StringSet is a set of strings.
  367. type StringSet map[string]struct{}
  368. // NewStringSet returns a StringSet containing exactly the strings
  369. // given as arguments.
  370. func NewStringSet(ss []string) StringSet {
  371. res := StringSet{}
  372. for _, s := range ss {
  373. res[s] = struct{}{}
  374. }
  375. return res
  376. }
  377. // Subset returns true if `s` is a subset of `t` (including the case
  378. // of having the same members).
  379. func (s StringSet) Subset(t StringSet) bool {
  380. for k := range s {
  381. if _, ok := t[k]; !ok {
  382. return false
  383. }
  384. }
  385. return true
  386. }