Skip to content

Commit

Permalink
goworker: Added 'MaxAgeRetries' option to the Goworker
Browse files Browse the repository at this point in the history
This option is useful to automatically remove retried failed jobs from the 'failed' queue that exceede that duration, this
check will be done every 1m.

Also changed the 'failed.FailedAt' and added 'failed.RetriedAt' and switched them to type string. The main reason is
that the Ruby lib is setting those values in an specific format and Ruby can read multiple formats into one, but GO
cannot and we need to actually use the same ones or the unmarshaler does not work so I decided to switch them to
'string' and add helpers to set/get the values that will directly convert them.

All the logic has more or less been ported from the Ruby version, on how to remove failed jobs and how the data
is stored, as the 'MaxAgeRetries' is something unique from this GO version
  • Loading branch information
xescugc committed Jul 1, 2021
1 parent 7835396 commit 616f9b2
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## [Unreleased]

### Added

- Flag `-max-age-retries` to remove retried failed jobs after that duration
([PR #9](https://github.com/cycloidio/goworker/pull/9))

## [0.1.8] _2021-06-11_

### Added
Expand Down
34 changes: 25 additions & 9 deletions failure.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
package goworker

import (
"time"
import "time"

const (
retriedAtLayout = "2006/01/02 15:04:05"
failedAtLayout = "2006/01/02 15:04:05 -07:00"
)

type failure struct {
FailedAt time.Time `json:"failed_at"`
Payload Payload `json:"payload"`
Exception string `json:"exception"`
Error string `json:"error"`
Backtrace []string `json:"backtrace"`
Worker *worker `json:"worker"`
Queue string `json:"queue"`
FailedAt string `json:"failed_at"`
Payload Payload `json:"payload"`
Exception string `json:"exception"`
Error string `json:"error"`
Backtrace []string `json:"backtrace"`
Worker *worker `json:"worker"`
Queue string `json:"queue"`
RetriedAt string `json:"retried_at"`
}

func (f *failure) GetRetriedAtTime() (time.Time, error) {
if f.RetriedAt == "" {
return time.Time{}, nil
}

return time.Parse(retriedAtLayout, f.RetriedAt)
}

func (f *failure) SetFailedAt(t time.Time) {
f.FailedAt = t.Format(failedAtLayout)
}
9 changes: 9 additions & 0 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@
// encoded in scientific notation, losing
// pecision. This will default to true soon.
//
// -max-age-retries=1s
// — This flag will enable a feature to automatically
// clean the retried failed jobs older than the
// specified max age/duration (time.Duration).
// By default is disabled if enabled it'll
// check every 1m for old retries.
//
// You can also configure your own flags for use
// within your workers. Be sure to set them
// before calling goworker.Main(). It is okay to
Expand Down Expand Up @@ -128,6 +135,8 @@ func init() {
flag.BoolVar(&workerSettings.UseNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon")

flag.BoolVar(&workerSettings.SkipTLSVerify, "insecure-tls", false, "skip TLS validation")

flag.DurationVar(&workerSettings.MaxAgeRetries, "max-age-retries", 0, "max age of the retried failed jobs before cleaning them")
}

func flags() error {
Expand Down
75 changes: 75 additions & 0 deletions goworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goworker
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand All @@ -25,6 +26,14 @@ var (
initialized bool
)

const (
keyForCleaningExpiredRetries = "cleaning_expired_retried_in_progress"
)

var (
cleaningExpiredRetriesInterval = time.Minute
)

var workerSettings WorkerSettings

type WorkerSettings struct {
Expand All @@ -41,6 +50,7 @@ type WorkerSettings struct {
UseNumber bool
SkipTLSVerify bool
TLSCertPath string
MaxAgeRetries time.Duration

closed chan struct{}
}
Expand Down Expand Up @@ -173,7 +183,72 @@ func Work() error {
worker.work(jobs, &monitor)
}

if hasToCleanRetries() {
cleanExpiredRetryTicker := time.NewTicker(cleaningExpiredRetriesInterval)
waitChan := make(chan struct{})
go func() {
monitor.Wait()
close(waitChan)
}()
for {
select {
case <-cleanExpiredRetryTicker.C:
cleanExpiredRetries()
case <-waitChan:
cleanExpiredRetryTicker.Stop()
return nil
}
}
}

monitor.Wait()

return nil
}

func hasToCleanRetries() bool {
return workerSettings.MaxAgeRetries != 0
}

func cleanExpiredRetries() {
ok, err := client.SetNX(fmt.Sprintf("%s%s", workerSettings.Namespace, keyForCleaningExpiredRetries), os.Getpid(), cleaningExpiredRetriesInterval/2).Result()
if err != nil {
logger.Criticalf("Error on setting lock to clean retries: %v", err)
return
}

if !ok {
return
}

failures, err := client.LRange(fmt.Sprintf("%sfailed", workerSettings.Namespace), 0, -1).Result()
if err != nil {
logger.Criticalf("Error on getting list of all failed jobs: %v", err)
return
}

for i, fail := range failures {
var f failure
err = json.Unmarshal([]byte(fail), &f)
if err != nil {
logger.Criticalf("Error on unmarshaling failure: %v", err)
return
}
ra, err := f.GetRetriedAtTime()
if err != nil {
logger.Criticalf("Error on GetRetriedAtTime of failure job %q: %v", fail, err)
return
}
if ra == *new(time.Time) {
continue
}

// If the RetryAt has exceeded the MaxAgeRetries then we'll
// remove the job from the list of failed jobs
if ra.Add(workerSettings.MaxAgeRetries).Before(time.Now()) {
hopefullyUniqueValueWeCanUseToDeleteJob := ""
client.LSet(fmt.Sprintf("%sfailed", workerSettings.Namespace), int64(i), hopefullyUniqueValueWeCanUseToDeleteJob)
client.LRem(fmt.Sprintf("%sfailed", workerSettings.Namespace), 1, hopefullyUniqueValueWeCanUseToDeleteJob)
}
}
}
17 changes: 16 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ func newWorker(id string, queues []string) (*worker, error) {
func (w *worker) MarshalJSON() ([]byte, error) {
return json.Marshal(w.String())
}
func (w *worker) UnmarshalJSON(b []byte) error {
s := string(b)
parts := strings.Split(s, ":")
pidAndID := strings.Split(parts[1], "-")
pid, _ := strconv.Atoi(pidAndID[0])
w = &worker{
process: process{
Hostname: parts[0],
Pid: int(pid),
ID: pidAndID[1],
Queues: strings.Split(parts[2], ","),
},
}
return nil
}

func (w *worker) start(c *redis.Client, job *Job) error {
work := &work{
Expand Down Expand Up @@ -162,13 +177,13 @@ func (w *worker) pruneDeadWorkers(c *redis.Client) {

func (w *worker) fail(c *redis.Client, job *Job, err error) error {
failure := &failure{
FailedAt: time.Now(),
Payload: job.Payload,
Exception: "Error",
Error: err.Error(),
Worker: w,
Queue: job.Queue,
}
failure.SetFailedAt(time.Now())
buffer, err := json.Marshal(failure)
if err != nil {
return err
Expand Down

0 comments on commit 616f9b2

Please sign in to comment.