Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

goworker: Added 'MaxAgeRetries' option to the Goworker #9

Merged
merged 1 commit into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
goworker: Added 'MaxAgeRetries' option to the Goworker
This option is useful to automatically remove retried failed jobs from the 'failed' queue that exceede that duration, this
check will be done every 1m.

Also changed the 'failed.FailedAt' and added 'failed.RetriedAt' and switched them to type string. The main reason is
that the Ruby lib is setting those values in an specific format and Ruby can read multiple formats into one, but GO
cannot and we need to actually use the same ones or the unmarshaler does not work so I decided to switch them to
'string' and add helpers to set/get the values that will directly convert them.

All the logic has more or less been ported from the Ruby version, on how to remove failed jobs and how the data
is stored, as the 'MaxAgeRetries' is something unique from this GO version
  • Loading branch information
xescugc committed Jul 19, 2021
commit f213a0e0e2fd63324530c23d0566ec017d444268
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## [Unreleased]

### Added

- Flag `-max-age-retries` to remove retried failed jobs after that duration
([PR #9](https://github.com/cycloidio/goworker/pull/9))

## [0.1.8] _2021-06-11_

### Added
Expand Down
39 changes: 30 additions & 9 deletions failure.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
package goworker

import (
"time"
import "time"

const (
retriedAtLayout = "2006/01/02 15:04:05"
failedAtLayout = "2006/01/02 15:04:05 -07:00"
)

type failure struct {
FailedAt time.Time `json:"failed_at"`
Payload Payload `json:"payload"`
Exception string `json:"exception"`
Error string `json:"error"`
Backtrace []string `json:"backtrace"`
Worker *worker `json:"worker"`
Queue string `json:"queue"`
FailedAt string `json:"failed_at"`
Payload Payload `json:"payload"`
Exception string `json:"exception"`
Error string `json:"error"`
Backtrace []string `json:"backtrace"`
Worker *worker `json:"worker"`
Queue string `json:"queue"`
RetriedAt string `json:"retried_at"`
}

// GetRetriedAtTime returns the RetriedAt as a time.Time
// converting it from the string. If it's not set it'll return
// an empty time.Time
func (f *failure) GetRetriedAtTime() (time.Time, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment / Test?

if f.RetriedAt == "" {
return time.Time{}, nil
}

return time.Parse(retriedAtLayout, f.RetriedAt)
}

// SetFailedAt will set the FailedAt value with t with
// the right format
func (f *failure) SetFailedAt(t time.Time) {
f.FailedAt = t.Format(failedAtLayout)
}
9 changes: 9 additions & 0 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@
// encoded in scientific notation, losing
// pecision. This will default to true soon.
//
// -max-age-retries=1s
// — This flag will enable a feature to automatically
// clean the retried failed jobs older than the
// specified max age/duration (time.Duration).
// By default is disabled if enabled it'll
// check every 1m for old retries.
//
// You can also configure your own flags for use
// within your workers. Be sure to set them
// before calling goworker.Main(). It is okay to
Expand Down Expand Up @@ -128,6 +135,8 @@ func init() {
flag.BoolVar(&workerSettings.UseNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon")

flag.BoolVar(&workerSettings.SkipTLSVerify, "insecure-tls", false, "skip TLS validation")

flag.DurationVar(&workerSettings.MaxAgeRetries, "max-age-retries", 0, "max age of the retried failed jobs before cleaning them")
}

func flags() error {
Expand Down
78 changes: 78 additions & 0 deletions goworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goworker
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand All @@ -25,6 +26,14 @@ var (
initialized bool
)

const (
keyForCleaningExpiredRetries = "cleaning_expired_retried_in_progress"
)

var (
cleaningExpiredRetriesInterval = time.Minute
)

var workerSettings WorkerSettings

type WorkerSettings struct {
Expand All @@ -41,6 +50,7 @@ type WorkerSettings struct {
UseNumber bool
SkipTLSVerify bool
TLSCertPath string
MaxAgeRetries time.Duration

closed chan struct{}
}
Expand Down Expand Up @@ -173,7 +183,75 @@ func Work() error {
worker.work(jobs, &monitor)
}

if hasToCleanRetries() {
cleanExpiredRetryTicker := time.NewTicker(cleaningExpiredRetriesInterval)
waitChan := make(chan struct{})
go func() {
monitor.Wait()
close(waitChan)
}()
for {
select {
case <-cleanExpiredRetryTicker.C:
cleanExpiredRetries()
case <-waitChan:
cleanExpiredRetryTicker.Stop()
return nil
}
}
}

monitor.Wait()

return nil
}

func hasToCleanRetries() bool {
return workerSettings.MaxAgeRetries != 0
}

func cleanExpiredRetries() {
// This is used to set a lock so this operation is not done by more than 1 worker at the same time
ok, err := client.SetNX(fmt.Sprintf("%s%s", workerSettings.Namespace, keyForCleaningExpiredRetries), os.Getpid(), cleaningExpiredRetriesInterval/2).Result()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does this do? And why not log something on !ok result?

if err != nil {
logger.Criticalf("Error on setting lock to clean retries: %v", err)
return
}

if !ok {
return
}

failures, err := client.LRange(fmt.Sprintf("%sfailed", workerSettings.Namespace), 0, -1).Result()
if err != nil {
logger.Criticalf("Error on getting list of all failed jobs: %v", err)
return
}

for i, fail := range failures {
var f failure
err = json.Unmarshal([]byte(fail), &f)
if err != nil {
logger.Criticalf("Error on unmarshaling failure: %v", err)
return
}
ra, err := f.GetRetriedAtTime()
if err != nil {
logger.Criticalf("Error on GetRetriedAtTime of failure job %q: %v", fail, err)
return
}
if ra == *new(time.Time) {
continue
}

// If the RetryAt has exceeded the MaxAgeRetries then we'll
// remove the job from the list of failed jobs
if ra.Add(workerSettings.MaxAgeRetries).Before(time.Now()) {
hopefullyUniqueValueWeCanUseToDeleteJob := ""
// This logic what it does it replace first the value (with the LSet) and then remove the first
// occurrence on the failed queue of the replaced value. This value is the 'hopefullyUniqueValueWeCanUseToDeleteJob'
client.LSet(fmt.Sprintf("%sfailed", workerSettings.Namespace), int64(i), hopefullyUniqueValueWeCanUseToDeleteJob)
client.LRem(fmt.Sprintf("%sfailed", workerSettings.Namespace), 1, hopefullyUniqueValueWeCanUseToDeleteJob)
Comment on lines +253 to +254
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic behind this is not obvious, why setting and then removing?

}
}
}
20 changes: 19 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,28 @@ func newWorker(id string, queues []string) (*worker, error) {
}, nil
}

// MarshalJSON marshals the worker into a []byte
func (w *worker) MarshalJSON() ([]byte, error) {
return json.Marshal(w.String())
}

// UnmarshalJSON converts the b into a woker
func (w *worker) UnmarshalJSON(b []byte) error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment / Test?

s := string(b)
parts := strings.Split(s, ":")
pidAndID := strings.Split(parts[1], "-")
pid, _ := strconv.Atoi(pidAndID[0])
w = &worker{
process: process{
Hostname: parts[0],
Pid: int(pid),
ID: pidAndID[1],
Queues: strings.Split(parts[2], ","),
},
}
return nil
}

func (w *worker) start(c *redis.Client, job *Job) error {
work := &work{
Queue: job.Queue,
Expand Down Expand Up @@ -162,13 +180,13 @@ func (w *worker) pruneDeadWorkers(c *redis.Client) {

func (w *worker) fail(c *redis.Client, job *Job, err error) error {
failure := &failure{
FailedAt: time.Now(),
Payload: job.Payload,
Exception: "Error",
Error: err.Error(),
Worker: w,
Queue: job.Queue,
}
failure.SetFailedAt(time.Now())
buffer, err := json.Marshal(failure)
if err != nil {
return err
Expand Down