Skip to content

Commit

Permalink
implemented graphite handler
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisenuf committed Jul 31, 2015
1 parent 7dd97c8 commit 7936887
Showing 1 changed file with 82 additions and 7 deletions.
89 changes: 82 additions & 7 deletions src/fullerite/handler/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package handler

import (
"fullerite/metric"
"log"
"time"
"fmt"
"net"
)

// Graphite type
type Graphite struct {
BaseHandler
server string
port string
}

// NewGraphite returns a new Graphite handler.
Expand All @@ -19,17 +24,87 @@ func NewGraphite() *Graphite {
return g
}

// Configure accepts the different configuration options
func (g Graphite) Configure(conf *map[string]string) {
// TODO: implement

// Configure : accepts the different configuration options for the graphite handler
func (g *Graphite) Configure(config *map[string]string) {
asmap := *config
var exists bool
g.server, exists = asmap["server"]
if !exists {
log.Println("There was no server specified for the Graphite Handler, there won't be any emissions")
}

g.port, exists = asmap["port"]
if !exists {
log.Println("There was no port specified for the Graphite Handler, there won't be any emissions")
}
}

// Run sends metrics in the channel to the graphite server.
func (g Graphite) Run() {
// TODO: check interval and queue size and metrics.
func (g *Graphite) Run() {
lastEmission := time.Now()
metrics := make([]string, 0, g.maxBufferSize)


for metric := range g.channel {
// TODO: Actually send to graphite server
log.Debug("Sending metric to Graphite:", metric)
log.Println("Sending metric to Graphite:", metric)
datapoints := g.convertToGraphite(&metric)

//if the datapoints from metric would overflow the buffer, flush it and then add the new datapoints
if time.Since(lastEmission).Seconds() >= float64(s.interval) || len(metrics) + len(datapoints) > s.maxBufferSize {
s.emitMetrics(metrics)
lastEmission = time.Now()
metrics = make([]string, 0, g.maxBufferSize)
}
metrics = append(metrics, datapoints...)
}

}

func (g *Graphite) convertToGraphite(metric *metric.Metric) *[]string{
outname := g.Prefix() + (*metric).Name
datapoints := make([]string, 0, len(metric.getDimensions()) + 1)
// for key in dimensions, generate a new metric data point, add to a list, return
//what timestamp to use?
datapoints = append(datapoints, fmt.Sprintf("%s %f %s\n", outname, metric.Value, time.Now())) //find out what time to use

dimensions := metric.getDimensions())
for key, value := range dimensions {
//create a list of datapoints for this metric, then append that list the a global list
datapoints = append(datapoints, fmt.Sprintf("%s.%s %f %s\n", outname, key, value, time.Now()))
}

return &datapoints
}

func (g *Graphite) emitMetrics(datapoints []*string) {
log.Info("Starting to emit ", len(datapoints), " datapoints")

if len(datapoints) == 0 {
log.Warn("Skipping send because of an empty payload")
return
}

conn, _ := net.Dial("tcp", fmt.Sprintf("%s:%s", g.server, g.port))
for data := range datapoints {
fmt.Fprintf(conn, data)
}
}


















0 comments on commit 7936887

Please sign in to comment.