Skip to content

Commit

Permalink
Merge pull request go-graphite#123 from Civil/carbonserver_improvemen…
Browse files Browse the repository at this point in the history
…ts_fixes

WIP: Carbonserver improvements fixes
  • Loading branch information
lomik committed Nov 14, 2016
2 parents c210d86 + b70e358 commit 2a26b84
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 139 deletions.
2 changes: 1 addition & 1 deletion carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (app *App) Start() (err error) {
return
}

carbonserver := carbonserver.NewCarbonserverListener(core)
carbonserver := carbonserver.NewCarbonserverListener(core.Get)
carbonserver.SetWhisperData(conf.Whisper.DataDir)
carbonserver.SetMaxGlobs(conf.Carbonserver.MaxGlobs)
carbonserver.SetBuckets(conf.Carbonserver.Buckets)
Expand Down
133 changes: 53 additions & 80 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/dgryski/httputil"
"github.com/gogo/protobuf/proto"
pickle "github.com/kisielk/og-rek"
"github.com/lomik/go-carbon/cache"
"github.com/lomik/go-carbon/helper"
"github.com/lomik/go-carbon/points"
whisper "github.com/lomik/go-whisper"
Expand All @@ -56,9 +55,8 @@ type metricStruct struct {
FindZero uint64
InfoRequests uint64
InfoErrors uint64
CachePartialHit uint64
CacheHit uint64
CacheMiss uint64
CacheOnlyHit uint64
CacheRequestsTotal uint64
CacheWaitTimeNS uint64
CacheWaitTimeOverheadNS uint64
Expand All @@ -70,7 +68,7 @@ type metricStruct struct {

type CarbonserverListener struct {
helper.Stoppable
cache *cache.Cache
cacheGet func (key string) []points.Point
readTimeout time.Duration
writeTimeout time.Duration
whisperData string
Expand All @@ -92,11 +90,11 @@ type fileIndex struct {
files []string
}

func NewCarbonserverListener(cache *cache.Cache) *CarbonserverListener {
func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *CarbonserverListener {
return &CarbonserverListener{
// Config variables
metricsAsCounters: false,
cache: cache,
cacheGet: cacheGetFunc,
}
}

Expand Down Expand Up @@ -387,34 +385,24 @@ func (listener *CarbonserverListener) findHandler(wr http.ResponseWriter, req *h
return
}

func fetchCachedData(data []points.Point, fetchFromTime, fetchUntilTime, step int32) ([]float64, int32, int32) {
var cacheFromTime, cacheUntilTime int32
cachedValues := make([]float64, 0)
prevTs := int32(0)
type cachedData map[int32]float64

func fetchCachedData(data []points.Point, fetchFromTime, fetchUntilTime, step int32) cachedData {
cachedData := make(cachedData)
if len(data) == 0 {
return cachedValues, cacheFromTime, cacheUntilTime
return cachedData
}

for _, item := range data {
ts := int32(item.Timestamp) + (step - int32(item.Timestamp)%step)
ts := int32(item.Timestamp) - int32(item.Timestamp)%step
// Data not from requested range, we don't need it.
if ts > fetchUntilTime || ts < fetchFromTime || prevTs == ts {
if ts > fetchUntilTime || ts < fetchFromTime {
continue
}
for i := prevTs; prevTs != 0 && i < ts-step; i += step {
cachedValues = append(cachedValues, math.NaN())
}
prevTs = ts
cachedValues = append(cachedValues, item.Value)
if cacheFromTime == 0 || cacheFromTime > ts {
cacheFromTime = ts
}
if cacheUntilTime == 0 || cacheUntilTime < ts {
cacheUntilTime = ts
}
cachedData[ts] = item.Value
}

return cachedValues, cacheFromTime, cacheUntilTime
return cachedData
}

func (listener *CarbonserverListener) fetchHandler(wr http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -537,7 +525,6 @@ func (listener *CarbonserverListener) fetchSingleMetric(metric string, fromTime,
var step int32
var cacheFromTime int32
var cacheUntilTime int32
cacheGotEverything := false

// We need to obtain the metadata from whisper file anyway.
path := listener.whisperData + "/" + strings.Replace(metric, ".", "/", -1) + ".wsp"
Expand Down Expand Up @@ -571,38 +558,27 @@ func (listener *CarbonserverListener) fetchSingleMetric(metric string, fromTime,
}
atomic.AddUint64(&listener.metrics.MetricsReturned, 1)

cachedValues := make([]float64, 0)
var cachedValues cachedData
var cacheData []points.Point

cacheStartTime := time.Now()
if step != bestStep {
logger.Debugf("[carbonserver] Cache is not supported for this query (required step != best step). path=%q fromTime=%v untilTime=%v step=%v bestStep=%v", path, fromTime, untilTime, step, bestStep)
} else {
// query cache
cacheData = listener.cache.Get(metric)
cacheData = listener.cacheGet(metric)
waitTime := uint64(time.Since(cacheStartTime).Nanoseconds())
atomic.AddUint64(&listener.metrics.CacheWaitTimeOverheadNS, waitTime)
}

if cacheData != nil {
atomic.AddUint64(&listener.metrics.CacheRequestsTotal, 1)

cachedValues, cacheFromTime, cacheUntilTime = fetchCachedData(cacheData, fetchFromTime, fetchUntilTime, step)
cachedValues = fetchCachedData(cacheData, fetchFromTime, fetchUntilTime, step)
logger.Debugf("[carbonserver] fetched cached metric=%v from=%v until=%v", metric, cacheFromTime, cacheUntilTime)

if cacheFromTime != 0 {
if cacheFromTime <= fetchFromTime {
for fetchUntilTime > cacheUntilTime {
cachedValues = append(cachedValues, math.NaN())
cacheUntilTime += step
}
fetchUntilTime = cacheUntilTime
fetchFromTime = cacheFromTime
cacheGotEverything = true
atomic.AddUint64(&listener.metrics.CacheOnlyHit, 1)
} else {
atomic.AddUint64(&listener.metrics.CachePartialHit, 1)
}
if cachedValues != nil {
atomic.AddUint64(&listener.metrics.CacheHit, 1)
} else {
atomic.AddUint64(&listener.metrics.CacheMiss, 1)
}
Expand All @@ -612,62 +588,60 @@ func (listener *CarbonserverListener) fetchSingleMetric(metric string, fromTime,

// End of cache query
var values []float64
if !cacheGotEverything {
atomic.AddUint64(&listener.metrics.DiskRequests, 1)
diskStartTime := time.Now()
logger.Debugf("[carbonserver] fetching disk metric=%v from=%v until=%v", metric, fetchFromTime, fetchUntilTime)

points, err := w.Fetch(int(fetchFromTime), int(fetchUntilTime))
w.Close()
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
logger.Infof("[carbonserver] failed to fetch points from %s: %s", path, err)
return nil, errors.New("failed to fetch points")
}

if points == nil {
atomic.AddUint64(&listener.metrics.NotFound, 1)
logger.Debugf("[carbonserver] Metric time range not found: metric=%s from=%d to=%d ", metric, fromTime, untilTime)
return nil, errors.New("time range not found")
}
values = points.Values()
atomic.AddUint64(&listener.metrics.DiskRequests, 1)
diskStartTime := time.Now()
logger.Debugf("[carbonserver] fetching disk metric=%v from=%v until=%v", metric, fetchFromTime, fetchUntilTime)

fetchFromTime = int32(points.FromTime())
fetchUntilTime = int32(points.UntilTime())
step = int32(points.Step())
points, err := w.Fetch(int(fetchFromTime), int(fetchUntilTime))
w.Close()
if err != nil {
atomic.AddUint64(&listener.metrics.RenderErrors, 1)
logger.Infof("[carbonserver] failed to fetch points from %s: %s", path, err)
return nil, errors.New("failed to fetch points")
}

waitTime := uint64(time.Since(diskStartTime).Nanoseconds())
atomic.AddUint64(&listener.metrics.DiskWaitTimeNS, waitTime)
} else {
values = make([]float64, 0)
w.Close()
if points == nil {
atomic.AddUint64(&listener.metrics.NotFound, 1)
logger.Debugf("[carbonserver] Metric time range not found: metric=%s from=%d to=%d ", metric, fromTime, untilTime)
return nil, errors.New("time range not found")
}
values = points.Values()

fetchFromTime = int32(points.FromTime())
fetchUntilTime = int32(points.UntilTime())
step = int32(points.Step())

waitTime := uint64(time.Since(diskStartTime).Nanoseconds())
atomic.AddUint64(&listener.metrics.DiskWaitTimeNS, waitTime)

startTime := fetchFromTime
stopTime := fetchUntilTime
points := (stopTime - startTime) / step
atomic.AddUint64(&listener.metrics.PointsReturned, uint64(points))
amountOfPoints := (stopTime - startTime) / step
atomic.AddUint64(&listener.metrics.PointsReturned, uint64(amountOfPoints))
response := pb.FetchResponse{
Name: proto.String(metric),
StartTime: &startTime,
StopTime: &stopTime,
StepTime: &step,
Values: make([]float64, points),
IsAbsent: make([]bool, points),
Values: make([]float64, amountOfPoints),
IsAbsent: make([]bool, amountOfPoints),
}

ts := startTime
cacheCursor := 0
diskCursor := 0
var ok bool
for i := range response.Values {
p := math.NaN()
if cacheFromTime != 0 && ts >= cacheFromTime && ts <= cacheUntilTime {
p = cachedValues[cacheCursor]
cacheCursor++
} else if fetchFromTime != 0 && ts >= fetchFromTime && ts <= fetchUntilTime {
if cachedValues != nil {
p, ok = cachedValues[ts]
if !ok {
p = values[diskCursor]
}
} else {
p = values[diskCursor]
diskCursor++
}
diskCursor++

if math.IsNaN(p) {
response.Values[i] = 0
Expand Down Expand Up @@ -787,9 +761,8 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) {
sender("find_requests", &listener.metrics.FindRequests, send)
sender("find_errors", &listener.metrics.FindErrors, send)
sender("find_zero", &listener.metrics.FindZero, send)
sender("cache_partial_hit", &listener.metrics.CachePartialHit, send)
sender("cache_hit", &listener.metrics.CacheHit, send)
sender("cache_miss", &listener.metrics.CacheMiss, send)
sender("cache_only_hit", &listener.metrics.CacheOnlyHit, send)
sender("cache_wait_time_ns", &listener.metrics.CacheWaitTimeNS, send)
sender("cache_wait_time_overhead_ns", &listener.metrics.CacheWaitTimeOverheadNS, send)
sender("cache_requests", &listener.metrics.CacheRequestsTotal, send)
Expand Down
Loading

0 comments on commit 2a26b84

Please sign in to comment.