Skip to content

Commit

Permalink
Merge pull request #9 from cycloidio/fg-retries-clean
Browse files Browse the repository at this point in the history
  • Loading branch information
xescugc committed Jul 19, 2021
2 parents 7835396 + f213a0e commit 46e3911
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## [Unreleased]

### Added

- Flag `-max-age-retries` to remove retried failed jobs after that duration
([PR #9](https://github.com/cycloidio/goworker/pull/9))

## [0.1.8] _2021-06-11_

### Added
Expand Down
39 changes: 30 additions & 9 deletions failure.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
package goworker

import (
"time"
import "time"

const (
retriedAtLayout = "2006/01/02 15:04:05"
failedAtLayout = "2006/01/02 15:04:05 -07:00"
)

type failure struct {
FailedAt time.Time `json:"failed_at"`
Payload Payload `json:"payload"`
Exception string `json:"exception"`
Error string `json:"error"`
Backtrace []string `json:"backtrace"`
Worker *worker `json:"worker"`
Queue string `json:"queue"`
FailedAt string `json:"failed_at"`
Payload Payload `json:"payload"`
Exception string `json:"exception"`
Error string `json:"error"`
Backtrace []string `json:"backtrace"`
Worker *worker `json:"worker"`
Queue string `json:"queue"`
RetriedAt string `json:"retried_at"`
}

// GetRetriedAtTime returns the RetriedAt as a time.Time
// converting it from the string. If it's not set it'll return
// an empty time.Time
func (f *failure) GetRetriedAtTime() (time.Time, error) {
if f.RetriedAt == "" {
return time.Time{}, nil
}

return time.Parse(retriedAtLayout, f.RetriedAt)
}

// SetFailedAt will set the FailedAt value with t with
// the right format
func (f *failure) SetFailedAt(t time.Time) {
f.FailedAt = t.Format(failedAtLayout)
}
9 changes: 9 additions & 0 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@
// encoded in scientific notation, losing
// pecision. This will default to true soon.
//
// -max-age-retries=1s
// — This flag will enable a feature to automatically
// clean the retried failed jobs older than the
// specified max age/duration (time.Duration).
// By default is disabled if enabled it'll
// check every 1m for old retries.
//
// You can also configure your own flags for use
// within your workers. Be sure to set them
// before calling goworker.Main(). It is okay to
Expand Down Expand Up @@ -128,6 +135,8 @@ func init() {
flag.BoolVar(&workerSettings.UseNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon")

flag.BoolVar(&workerSettings.SkipTLSVerify, "insecure-tls", false, "skip TLS validation")

flag.DurationVar(&workerSettings.MaxAgeRetries, "max-age-retries", 0, "max age of the retried failed jobs before cleaning them")
}

func flags() error {
Expand Down
78 changes: 78 additions & 0 deletions goworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goworker
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand All @@ -25,6 +26,14 @@ var (
initialized bool
)

const (
keyForCleaningExpiredRetries = "cleaning_expired_retried_in_progress"
)

var (
cleaningExpiredRetriesInterval = time.Minute
)

var workerSettings WorkerSettings

type WorkerSettings struct {
Expand All @@ -41,6 +50,7 @@ type WorkerSettings struct {
UseNumber bool
SkipTLSVerify bool
TLSCertPath string
MaxAgeRetries time.Duration

closed chan struct{}
}
Expand Down Expand Up @@ -173,7 +183,75 @@ func Work() error {
worker.work(jobs, &monitor)
}

if hasToCleanRetries() {
cleanExpiredRetryTicker := time.NewTicker(cleaningExpiredRetriesInterval)
waitChan := make(chan struct{})
go func() {
monitor.Wait()
close(waitChan)
}()
for {
select {
case <-cleanExpiredRetryTicker.C:
cleanExpiredRetries()
case <-waitChan:
cleanExpiredRetryTicker.Stop()
return nil
}
}
}

monitor.Wait()

return nil
}

func hasToCleanRetries() bool {
return workerSettings.MaxAgeRetries != 0
}

func cleanExpiredRetries() {
// This is used to set a lock so this operation is not done by more than 1 worker at the same time
ok, err := client.SetNX(fmt.Sprintf("%s%s", workerSettings.Namespace, keyForCleaningExpiredRetries), os.Getpid(), cleaningExpiredRetriesInterval/2).Result()
if err != nil {
logger.Criticalf("Error on setting lock to clean retries: %v", err)
return
}

if !ok {
return
}

failures, err := client.LRange(fmt.Sprintf("%sfailed", workerSettings.Namespace), 0, -1).Result()
if err != nil {
logger.Criticalf("Error on getting list of all failed jobs: %v", err)
return
}

for i, fail := range failures {
var f failure
err = json.Unmarshal([]byte(fail), &f)
if err != nil {
logger.Criticalf("Error on unmarshaling failure: %v", err)
return
}
ra, err := f.GetRetriedAtTime()
if err != nil {
logger.Criticalf("Error on GetRetriedAtTime of failure job %q: %v", fail, err)
return
}
if ra == *new(time.Time) {
continue
}

// If the RetryAt has exceeded the MaxAgeRetries then we'll
// remove the job from the list of failed jobs
if ra.Add(workerSettings.MaxAgeRetries).Before(time.Now()) {
hopefullyUniqueValueWeCanUseToDeleteJob := ""
// This logic what it does it replace first the value (with the LSet) and then remove the first
// occurrence on the failed queue of the replaced value. This value is the 'hopefullyUniqueValueWeCanUseToDeleteJob'
client.LSet(fmt.Sprintf("%sfailed", workerSettings.Namespace), int64(i), hopefullyUniqueValueWeCanUseToDeleteJob)
client.LRem(fmt.Sprintf("%sfailed", workerSettings.Namespace), 1, hopefullyUniqueValueWeCanUseToDeleteJob)
}
}
}
20 changes: 19 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,28 @@ func newWorker(id string, queues []string) (*worker, error) {
}, nil
}

// MarshalJSON marshals the worker into a []byte
func (w *worker) MarshalJSON() ([]byte, error) {
return json.Marshal(w.String())
}

// UnmarshalJSON converts the b into a woker
func (w *worker) UnmarshalJSON(b []byte) error {
s := string(b)
parts := strings.Split(s, ":")
pidAndID := strings.Split(parts[1], "-")
pid, _ := strconv.Atoi(pidAndID[0])
w = &worker{
process: process{
Hostname: parts[0],
Pid: int(pid),
ID: pidAndID[1],
Queues: strings.Split(parts[2], ","),
},
}
return nil
}

func (w *worker) start(c *redis.Client, job *Job) error {
work := &work{
Queue: job.Queue,
Expand Down Expand Up @@ -162,13 +180,13 @@ func (w *worker) pruneDeadWorkers(c *redis.Client) {

func (w *worker) fail(c *redis.Client, job *Job, err error) error {
failure := &failure{
FailedAt: time.Now(),
Payload: job.Payload,
Exception: "Error",
Error: err.Error(),
Worker: w,
Queue: job.Queue,
}
failure.SetFailedAt(time.Now())
buffer, err := json.Marshal(failure)
if err != nil {
return err
Expand Down

0 comments on commit 46e3911

Please sign in to comment.