Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Vary sampling rate in image metadata cache, and back off on HTTP 429 #1354

Merged
merged 4 commits into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Change image cache expiry to adaptive scheme
To drive the refresh of cache data, we expire entries after a
configurable duration (default of 1 hour).

Because many image tags never change what they point to, this means
the image cache does thousands of needless fetches every hour (or
whatever the configured duration is). On the other hand, if you set it
to much longer (say 24h), any changes to a tag won't be noticed for
that much longer.

This commit gives each manifest its own, adaptive schedule for being
refreshed. It does this by doubling the period when a freshly fetched
manifest does not differ from the existing entry, and halving it when
it does differ. (The refresh period is clipped to `[minRefresh,
maxRefresh]`)

A tag that doesn't change will end up being polled infrequently. A tag
that changes occasionally will alternate between a number of values,
depending on how regular the changes are.

There are surely more accurate ways of arriving at a good estimate for
the polling frequency based on the past samples; this one has the
advantage of being very simple, and requiring little state to be kept.

Note that the expiry of the keys is now distinct from the deadline for
being refreshed. These were previously conflated, with the refresh
"timed" so it would fetch new metadata just before an entry was due to
expire. The expiry is set so that there is a generous grace period
after the deadline for a new value to be fetched and written.
  • Loading branch information
squaremo committed Sep 12, 2018
commit 847ef25a0af9262e634fe41dcab3cc2d8f44ef79
5 changes: 3 additions & 2 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func main() {
memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.")
memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images")
registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host")
registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache")
Expand All @@ -119,7 +118,10 @@ func main() {
token = fs.String("token", "", "Authentication token for upstream service")

dockerConfig = fs.String("docker-config", "", "path to a docker config to use for image registry credentials")

_ = fs.Duration("registry-cache-expiry", 0, "")
)
fs.MarkDeprecated("registry-cache-expiry", "no longer used; cache entries are expired adaptively according to how often they change")

err := fs.Parse(os.Args[1:])
switch {
Expand Down Expand Up @@ -281,7 +283,6 @@ func main() {
memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Expiry: *registryCacheExpiry,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
Expand Down
36 changes: 26 additions & 10 deletions image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ type Info struct {
ImageID string `json:",omitempty"`
// the time at which the image pointed at was created
CreatedAt time.Time `json:",omitempty"`
// the last time this image manifest was fetched
LastFetched time.Time `json:",omitempty"`
}

// MarshalJSON returns the Info value in JSON (as bytes). It is
Expand All @@ -245,14 +247,18 @@ type Info struct {
// detect.
func (im Info) MarshalJSON() ([]byte, error) {
type InfoAlias Info // alias to shed existing MarshalJSON implementation
var t string
var ca, lf string
if !im.CreatedAt.IsZero() {
t = im.CreatedAt.UTC().Format(time.RFC3339Nano)
ca = im.CreatedAt.UTC().Format(time.RFC3339Nano)
}
if !im.LastFetched.IsZero() {
lf = im.LastFetched.UTC().Format(time.RFC3339Nano)
}
encode := struct {
InfoAlias
CreatedAt string `json:",omitempty"`
}{InfoAlias(im), t}
CreatedAt string `json:",omitempty"`
LastFetched string `json:",omitempty"`
}{InfoAlias(im), ca, lf}
return json.Marshal(encode)
}

Expand All @@ -262,18 +268,28 @@ func (im *Info) UnmarshalJSON(b []byte) error {
type InfoAlias Info
unencode := struct {
InfoAlias
CreatedAt string `json:",omitempty"`
CreatedAt string `json:",omitempty"`
LastFetched string `json:",omitempty"`
}{}
json.Unmarshal(b, &unencode)
*im = Info(unencode.InfoAlias)
if unencode.CreatedAt == "" {
im.CreatedAt = time.Time{}

var err error
if err = decodeTime(unencode.CreatedAt, &im.CreatedAt); err == nil {
err = decodeTime(unencode.LastFetched, &im.LastFetched)
}
return err
}

func decodeTime(s string, t *time.Time) error {
if s == "" {
*t = time.Time{}
} else {
t, err := time.Parse(time.RFC3339, unencode.CreatedAt)
var err error
*t, err = time.Parse(time.RFC3339, s)
if err != nil {
return err
}
im.CreatedAt = t.UTC()
}
return nil
}
Expand Down Expand Up @@ -311,7 +327,7 @@ func NewerBySemver(lhs, rhs *Info) bool {
}

// Sort orders the given image infos according to `newer` func.
func Sort(infos []Info, newer func (a, b *Info) bool) {
func Sort(infos []Info, newer func(a, b *Info) bool) {
if newer == nil {
newer = NewerByCreated
}
Expand Down
8 changes: 3 additions & 5 deletions image/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package image
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -149,9 +148,11 @@ func mustMakeInfo(ref string, created time.Time) Info {

func TestImageInfoSerialisation(t *testing.T) {
t0 := time.Now().UTC() // UTC so it has nil location, otherwise it won't compare
t1 := time.Now().Add(5 * time.Minute).UTC()
info := mustMakeInfo("my/image:tag", t0)
info.Digest = "sha256:digest"
info.ImageID = "sha256:layerID"
info.LastFetched = t1
bytes, err := json.Marshal(info)
if err != nil {
t.Fatal(err)
Expand All @@ -160,9 +161,7 @@ func TestImageInfoSerialisation(t *testing.T) {
if err = json.Unmarshal(bytes, &info1); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(info, info1) {
t.Errorf("roundtrip serialisation failed:\n original: %#v\nroundtripped: %#v", info, info1)
}
assert.Equal(t, info, info1)
}

func TestImageInfoCreatedAtZero(t *testing.T) {
Expand Down Expand Up @@ -248,4 +247,3 @@ func reverse(imgs []Info) {
imgs[i], imgs[opp] = imgs[opp], imgs[i]
}
}

4 changes: 3 additions & 1 deletion registry/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
)

type Reader interface {
// GetKey gets the value at a key, along with its refresh deadline
GetKey(k Keyer) ([]byte, time.Time, error)
}

type Writer interface {
SetKey(k Keyer, v []byte) error
// SetKey sets the value at a key, along with its refresh deadline
SetKey(k Keyer, deadline time.Time, v []byte) error
}

type Client interface {
Expand Down
58 changes: 29 additions & 29 deletions registry/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
/* This package implements an image DB cache using memcached.

Items are given an expiry based on their refresh deadline, with a
minimum duration to try and ensure things will expire well after they
would have been refreshed (i.e., only if they truly need garbage
collection).

memcached will still evict things when under memory pressure. We can
recover from that -- we'll just get a cache miss, and fetch it again.

*/
package memcached

import (
Expand All @@ -16,7 +27,8 @@ import (
)

const (
DefaultExpiry = time.Hour
// The minimum expiry given to an entry.
MinExpiry = time.Hour
)

// MemcacheClient is a memcache client that gets its server list from SRV
Expand All @@ -26,7 +38,6 @@ type MemcacheClient struct {
serverList *memcache.ServerList
hostname string
service string
ttl time.Duration
logger log.Logger

quit chan struct{}
Expand All @@ -37,7 +48,6 @@ type MemcacheClient struct {
type MemcacheConfig struct {
Host string
Service string
Expiry time.Duration
Timeout time.Duration
UpdateInterval time.Duration
Logger log.Logger
Expand All @@ -55,15 +65,10 @@ func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

err := newClient.updateMemcacheServers()
if err != nil {
config.Logger.Log("err", errors.Wrapf(err, "Error setting memcache servers to '%v'", config.Host))
Expand All @@ -86,25 +91,14 @@ func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) *M
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

return newClient
}

// The memcached client does not report the expiry when you GET a
// value, but we do want to know it, so we can refresh items that are
// soon to expire (and ignore items that are not). For that reason, we
// prepend the expiry to the value when setting, and read it back when
// getting.

// GetKey gets the value and its expiry time from the cache.
// GetKey gets the value and its refresh deadline from the cache.
func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) {
cacheItem, err := c.client.Get(k.Key())
if err != nil {
Expand All @@ -116,19 +110,25 @@ func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) {
return []byte{}, time.Time{}, err
}
}
exTime := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(exTime), 0), nil
deadlineTime := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(deadlineTime), 0), nil
}

// SetKey sets the value at a key.
func (c *MemcacheClient) SetKey(k cache.Keyer, v []byte) error {
exTime := time.Now().Add(c.ttl).Unix()
exBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(exBytes, uint32(exTime))
// SetKey sets the value and its refresh deadline at a key. NB the key
// expiry is set _longer_ than the deadline, to give us a grace period
// in which to refresh the value.
func (c *MemcacheClient) SetKey(k cache.Keyer, refreshDeadline time.Time, v []byte) error {
expiry := refreshDeadline.Sub(time.Now()) * 2
if expiry < MinExpiry {
expiry = MinExpiry
}

deadlineBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(deadlineBytes, uint32(refreshDeadline.Unix()))
if err := c.client.Set(&memcache.Item{
Key: k.Key(),
Value: append(exBytes, v...),
Expiration: int32(exTime),
Value: append(deadlineBytes, v...),
Expiration: int32(expiry.Seconds()),
}); err != nil {
c.logger.Log("err", errors.Wrap(err, "storing in memcache"))
return err
Expand Down
12 changes: 5 additions & 7 deletions registry/cache/memcached/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) {
}, strings.Fields(*memcachedIPs)...)

// Set some dummy data
err := mc.SetKey(key, val)
now := time.Now().Round(time.Second)
err := mc.SetKey(key, now, val)
if err != nil {
t.Fatal(err)
}

cached, expiry, err := mc.GetKey(key)
cached, deadline, err := mc.GetKey(key)
if err != nil {
t.Fatal(err)
}
if expiry.IsZero() {
t.Fatal("Time should not be zero")
}
if expiry.Before(time.Now()) {
t.Fatal("Expiry should be in the future")
if !deadline.Equal(now) {
t.Fatalf("Deadline should be %s, but is %s", now.String(), deadline.String())
}

if string(cached) != string(val) {
Expand Down
4 changes: 2 additions & 2 deletions registry/cache/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func (i *instrumentedClient) GetKey(k Keyer) (_ []byte, ex time.Time, err error)
return i.next.GetKey(k)
}

func (i *instrumentedClient) SetKey(k Keyer, v []byte) (err error) {
func (i *instrumentedClient) SetKey(k Keyer, d time.Time, v []byte) (err error) {
defer func(begin time.Time) {
cacheRequestDuration.With(
fluxmetrics.LabelMethod, "SetKey",
fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
).Observe(time.Since(begin).Seconds())
}(time.Now())
return i.next.SetKey(k, v)
return i.next.SetKey(k, d, v)
}
Loading