Skip to content

Commit

Permalink
chore: more visibility and debug lines for queues (hatchet-dev#836)
Browse files Browse the repository at this point in the history
* chore: more visibility and debug options for queues

* better debug lines on queue repo

* don't log so much in load test
  • Loading branch information
abelanger5 committed Aug 29, 2024
1 parent 96c040e commit b5014f6
Show file tree
Hide file tree
Showing 21 changed files with 1,234 additions and 95 deletions.
2 changes: 2 additions & 0 deletions cmd/hatchet-engine/engine/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
jobs.WithRepository(sc.EngineRepository),
jobs.WithLogger(sc.Logger),
jobs.WithPartition(p),
jobs.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
jobs.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats),
)

if err != nil {
Expand Down
113 changes: 68 additions & 45 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/internal/telemetry/servertel"
"github.com/hatchet-dev/hatchet/pkg/config/shared"
hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/logger"
"github.com/hatchet-dev/hatchet/pkg/repository"
Expand All @@ -34,13 +35,15 @@ type JobsController interface {
}

type JobsControllerImpl struct {
mq msgqueue.MessageQueue
l *zerolog.Logger
repo repository.EngineRepository
dv datautils.DataDecoderValidator
s gocron.Scheduler
a *hatcheterrors.Wrapped
p *partition.Partition
mq msgqueue.MessageQueue
l *zerolog.Logger
queueLogger *zerolog.Logger
pgxStatsLogger *zerolog.Logger
repo repository.EngineRepository
dv datautils.DataDecoderValidator
s gocron.Scheduler
a *hatcheterrors.Wrapped
p *partition.Partition

reassignMutexes sync.Map
timeoutMutexes sync.Map
Expand All @@ -49,22 +52,29 @@ type JobsControllerImpl struct {
type JobsControllerOpt func(*JobsControllerOpts)

type JobsControllerOpts struct {
mq msgqueue.MessageQueue
l *zerolog.Logger
repo repository.EngineRepository
dv datautils.DataDecoderValidator
alerter hatcheterrors.Alerter
p *partition.Partition
mq msgqueue.MessageQueue
l *zerolog.Logger
repo repository.EngineRepository
dv datautils.DataDecoderValidator
alerter hatcheterrors.Alerter
p *partition.Partition
queueLogger *zerolog.Logger
pgxStatsLogger *zerolog.Logger
}

func defaultJobsControllerOpts() *JobsControllerOpts {
logger := logger.NewDefaultLogger("jobs-controller")
l := logger.NewDefaultLogger("jobs-controller")
alerter := hatcheterrors.NoOpAlerter{}

queueLogger := logger.NewDefaultLogger("queue")
pgxStatsLogger := logger.NewDefaultLogger("pgx-stats")

return &JobsControllerOpts{
l: &logger,
dv: datautils.NewDataDecoderValidator(),
alerter: alerter,
l: &l,
dv: datautils.NewDataDecoderValidator(),
alerter: alerter,
queueLogger: &queueLogger,
pgxStatsLogger: &pgxStatsLogger,
}
}

Expand All @@ -80,6 +90,20 @@ func WithLogger(l *zerolog.Logger) JobsControllerOpt {
}
}

func WithQueueLoggerConfig(lc *shared.LoggerConfigFile) JobsControllerOpt {
return func(opts *JobsControllerOpts) {
l := logger.NewStdErr(lc, "queue")
opts.queueLogger = &l
}
}

func WithPgxStatsLoggerConfig(lc *shared.LoggerConfigFile) JobsControllerOpt {
return func(opts *JobsControllerOpts) {
l := logger.NewStdErr(lc, "pgx-stats")
opts.pgxStatsLogger = &l
}
}

func WithAlerter(a hatcheterrors.Alerter) JobsControllerOpt {
return func(opts *JobsControllerOpts) {
opts.alerter = a
Expand Down Expand Up @@ -136,13 +160,15 @@ func New(fs ...JobsControllerOpt) (*JobsControllerImpl, error) {
a.WithData(map[string]interface{}{"service": "jobs-controller"})

return &JobsControllerImpl{
mq: opts.mq,
l: opts.l,
repo: opts.repo,
dv: opts.dv,
s: s,
a: a,
p: opts.p,
mq: opts.mq,
l: opts.l,
queueLogger: opts.queueLogger,
pgxStatsLogger: opts.pgxStatsLogger,
repo: opts.repo,
dv: opts.dv,
s: s,
a: a,
p: opts.p,
}, nil
}

Expand Down Expand Up @@ -211,7 +237,7 @@ func (jc *JobsControllerImpl) Start() (func() error, error) {
return nil, fmt.Errorf("could not subscribe to job processing queue: %w", err)
}

q, err := newQueue(jc.mq, jc.l, jc.repo, jc.dv, jc.a, jc.p)
q, err := newQueue(jc.mq, jc.l, jc.queueLogger, jc.repo, jc.dv, jc.a, jc.p)

if err != nil {
cancel()
Expand Down Expand Up @@ -618,26 +644,23 @@ func (jc *JobsControllerImpl) runPgStat() func() {
return func() {
s := jc.repo.Health().PgStat()

jc.l.Debug().Msg(fmt.Sprintf(
"Stat{\n"+
" Total Connections: %d\n"+
" Constructing: %d\n"+
" Acquired: %d\n"+
" Idle: %d\n"+
" Max: %d\n"+
" Acquire Duration: %s\n"+
" Empty Acquire Count: %d\n"+
" Canceled Acquire Count: %d\n"+
"}",
s.TotalConns(),
s.ConstructingConns(),
s.AcquireCount(),
s.IdleConns(),
s.MaxConns(),
s.AcquireDuration(),
s.EmptyAcquireCount(),
s.CanceledAcquireCount(),
))
jc.pgxStatsLogger.Info().Int32(
"total_connections", s.TotalConns(),
).Int32(
"constructing_connections", s.ConstructingConns(),
).Int64(
"acquired_connections", s.AcquireCount(),
).Int32(
"idle_connections", s.IdleConns(),
).Int32(
"max_connections", s.MaxConns(),
).Dur(
"acquire_duration", s.AcquireDuration(),
).Int64(
"empty_acquire_count", s.EmptyAcquireCount(),
).Int64(
"canceled_acquire_count", s.CanceledAcquireCount(),
).Msg("pgx stats")
}
}

Expand Down
27 changes: 22 additions & 5 deletions internal/services/controllers/jobs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ type queue struct {
a *hatcheterrors.Wrapped
p *partition.Partition

// a custom queue logger
ql *zerolog.Logger

tenantOperations sync.Map
}

func newQueue(
mq msgqueue.MessageQueue,
l *zerolog.Logger,
ql *zerolog.Logger,
repo repository.EngineRepository,
dv datautils.DataDecoderValidator,
a *hatcheterrors.Wrapped,
Expand All @@ -55,6 +59,7 @@ func newQueue(
s: s,
a: a,
p: p,
ql: ql,
}, nil
}

Expand All @@ -63,13 +68,16 @@ type operation struct {
shouldContinue bool
isRunning bool
tenantId string
lastRun time.Time
lastSchedule time.Time
}

func (o *operation) run(l *zerolog.Logger, scheduler func(context.Context, string) (bool, error)) {
func (o *operation) run(l *zerolog.Logger, ql *zerolog.Logger, scheduler func(context.Context, string) (bool, error)) {
if o.getRunning() {
return
}

ql.Info().Str("tenant_id", o.tenantId).TimeDiff("last_run", time.Now(), o.lastRun).Msg("running tenant queue")
o.setRunning(true)

go func() {
Expand All @@ -78,6 +86,9 @@ func (o *operation) run(l *zerolog.Logger, scheduler func(context.Context, strin
}()

f := func() {
ql.Info().Str("tenant_id", o.tenantId).TimeDiff("last_schedule", time.Now(), o.lastSchedule).Msg("running scheduling")
o.lastSchedule = time.Now()

o.setContinue(false)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand Down Expand Up @@ -109,6 +120,10 @@ func (o *operation) setRunning(isRunning bool) {
o.mu.Lock()
defer o.mu.Unlock()

if isRunning {
o.lastRun = time.Now()
}

o.isRunning = isRunning
}

Expand Down Expand Up @@ -226,7 +241,7 @@ func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) er
op := opInt.(*operation)

op.setContinue(true)
op.run(q.l, q.scheduleStepRuns)
op.run(q.l, q.ql, q.scheduleStepRuns)
}

return nil
Expand All @@ -253,15 +268,17 @@ func (q *queue) runTenantQueues(ctx context.Context) func() {

if !ok {
op = &operation{
tenantId: tenantId,
tenantId: tenantId,
lastRun: time.Now(),
lastSchedule: time.Now(),
}

q.tenantOperations.Store(tenantId, op)
} else {
op = opInt.(*operation)
}

op.run(q.l, q.scheduleStepRuns)
op.run(q.l, q.ql, q.scheduleStepRuns)
}
}
}
Expand All @@ -273,7 +290,7 @@ func (q *queue) scheduleStepRuns(ctx context.Context, tenantId string) (bool, er
dbCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

queueResults, err := q.repo.StepRun().QueueStepRuns(dbCtx, tenantId)
queueResults, err := q.repo.StepRun().QueueStepRuns(dbCtx, q.ql, tenantId)

if err != nil {
return false, fmt.Errorf("could not queue step runs: %w", err)
Expand Down
8 changes: 6 additions & 2 deletions internal/testutils/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ func Prepare(t *testing.T) {
_ = os.Setenv("SERVER_SECURITY_CHECK_ENABLED", "false")

_ = os.Setenv("SERVER_LOGGER_LEVEL", "error")
_ = os.Setenv("SERVER_LOGGER_FORMAT", "json")
_ = os.Setenv("SERVER_LOGGER_FORMAT", "console")
_ = os.Setenv("DATABASE_LOGGER_LEVEL", "error")
_ = os.Setenv("DATABASE_LOGGER_FORMAT", "json")
_ = os.Setenv("DATABASE_LOGGER_FORMAT", "console")
_ = os.Setenv("SERVER_ADDITIONAL_LOGGERS_QUEUE_LEVEL", "error")
_ = os.Setenv("SERVER_ADDITIONAL_LOGGERS_QUEUE_FORMAT", "console")
_ = os.Setenv("SERVER_ADDITIONAL_LOGGERS_PGXSTATS_LEVEL", "error")
_ = os.Setenv("SERVER_ADDITIONAL_LOGGERS_PGXSTATS_FORMAT", "console")

// read in the local config
configLoader := loader.NewConfigLoader(path.Join(testPath, baseDir, "generated"))
Expand Down
1 change: 1 addition & 0 deletions pkg/config/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
Email: emailSvc,
TenantAlerter: alerting.New(dc.EngineRepository, encryptionSvc, cf.Runtime.ServerURL, emailSvc),
AdditionalOAuthConfigs: additionalOAuthConfigs,
AdditionalLoggers: cf.AdditionalLoggers,
EnableDataRetention: cf.EnableDataRetention,
}, nil
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/config/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type ServerConfigFile struct {

Logger shared.LoggerConfigFile `mapstructure:"logger" json:"logger,omitempty"`

AdditionalLoggers ConfigFileAdditionalLoggers `mapstructure:"additionalLoggers" json:"additionalLoggers,omitempty"`

OpenTelemetry shared.OpenTelemetryConfigFile `mapstructure:"otel" json:"otel,omitempty"`

SecurityCheck SecurityCheckConfigFile `mapstructure:"securityCheck" json:"securityCheck,omitempty"`
Expand All @@ -54,6 +56,14 @@ type ServerConfigFile struct {
Email ConfigFileEmail `mapstructure:"email" json:"email,omitempty"`
}

type ConfigFileAdditionalLoggers struct {
// Queue is a custom logger config for the queue service
Queue shared.LoggerConfigFile `mapstructure:"queue" json:"queue,omitempty"`

// PgxStats is a custom logger config for the pgx stats service
PgxStats shared.LoggerConfigFile `mapstructure:"pgxStats" json:"pgxStats,omitempty"`
}

// General server runtime options
type ConfigFileRuntime struct {
// Port is the port that the core server listens on
Expand Down Expand Up @@ -339,6 +349,8 @@ type ServerConfig struct {

Logger *zerolog.Logger

AdditionalLoggers ConfigFileAdditionalLoggers

TLSConfig *tls.Config

SessionStore *cookie.UserSessionStore
Expand Down Expand Up @@ -480,6 +492,12 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("logger.level", "SERVER_LOGGER_LEVEL")
_ = v.BindEnv("logger.format", "SERVER_LOGGER_FORMAT")

// additional logger options
_ = v.BindEnv("additionalLoggers.queue.level", "SERVER_ADDITIONAL_LOGGERS_QUEUE_LEVEL")
_ = v.BindEnv("additionalLoggers.queue.format", "SERVER_ADDITIONAL_LOGGERS_QUEUE_FORMAT")
_ = v.BindEnv("additionalLoggers.pgxStats.level", "SERVER_ADDITIONAL_LOGGERS_PGXSTATS_LEVEL")
_ = v.BindEnv("additionalLoggers.pgxStats.format", "SERVER_ADDITIONAL_LOGGERS_PGXSTATS_FORMAT")

// otel options
_ = v.BindEnv("otel.serviceName", "SERVER_OTEL_SERVICE_NAME")
_ = v.BindEnv("otel.collectorURL", "SERVER_OTEL_COLLECTOR_URL")
Expand Down
Loading

0 comments on commit b5014f6

Please sign in to comment.