Skip to content

Commit

Permalink
feat: pass otel through msgqueue (hatchet-dev#802)
Browse files Browse the repository at this point in the history
* feat: pass otel through msgqueue

* feat: more spans on scheduling

* otel increase batch size
  • Loading branch information
abelanger5 committed Aug 28, 2024
1 parent f6e937f commit 263eaf0
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 31 deletions.
32 changes: 18 additions & 14 deletions cmd/hatchet-admin/cli/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,26 @@ func runK8sQuickstart() error {
if k8sConfigResourceType == "secret" {
secret, err := clientset.CoreV1().Secrets(namespace).Get(context.Background(), k8sQuickstartConfigName, metav1.GetOptions{})

if err != nil && !errors.IsNotFound(err) {
switch {
case err != nil && !errors.IsNotFound(err):
return fmt.Errorf("error getting secret: %w", err)
} else if err != nil {
case err != nil:
exists = false
c = newFromSecret(nil, k8sQuickstartConfigName)
} else {
default:
exists = secret != nil
c = newFromSecret(secret, k8sQuickstartConfigName)
}
} else {
configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.Background(), k8sQuickstartConfigName, metav1.GetOptions{})

if err != nil && !errors.IsNotFound(err) {
switch {
case err != nil && !errors.IsNotFound(err):
return fmt.Errorf("error getting configmap: %w", err)
} else if err != nil {
case err != nil:
exists = false
c = newFromConfigMap(nil, k8sQuickstartConfigName)
} else {
default:
exists = configMap != nil
c = newFromConfigMap(configMap, k8sQuickstartConfigName)
}
Expand Down Expand Up @@ -270,24 +272,26 @@ func runCreateWorkerToken() error {
if k8sConfigResourceType == "secret" {
secret, err := clientset.CoreV1().Secrets(namespace).Get(context.Background(), k8sClientConfigName, metav1.GetOptions{})

if err != nil && !errors.IsNotFound(err) {
return err
} else if err != nil {
switch {
case err != nil && !errors.IsNotFound(err):
return fmt.Errorf("error getting secret: %w", err)
case err != nil:
exists = false
c = newFromSecret(nil, k8sClientConfigName)
} else {
default:
exists = secret != nil
c = newFromSecret(secret, k8sClientConfigName)
}
} else {
configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.Background(), k8sClientConfigName, metav1.GetOptions{})

if err != nil && !errors.IsNotFound(err) {
return err
} else if err != nil {
switch {
case err != nil && !errors.IsNotFound(err):
return fmt.Errorf("error getting configmap: %w", err)
case err != nil:
exists = false
c = newFromConfigMap(nil, k8sClientConfigName)
} else {
default:
exists = configMap != nil
c = newFromConfigMap(configMap, k8sClientConfigName)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/msgqueue/msgqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type Message struct {

// RetryDelay is the delay between retries.
RetryDelay int `json:"retry_delay"`

// OtelCarrier is the OpenTelemetry carrier for the task.
OtelCarrier map[string]string `json:"otel_carrier"`
}

func (t *Message) TenantID() string {
Expand Down
6 changes: 6 additions & 0 deletions internal/msgqueue/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rs/zerolog"

"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/logger"
"github.com/hatchet-dev/hatchet/pkg/random"
)
Expand Down Expand Up @@ -185,6 +186,11 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {

// AddMessage adds a msg to the queue.
func (t *MessageQueueImpl) AddMessage(ctx context.Context, q msgqueue.Queue, msg *msgqueue.Message) error {
// inject otel carrier into the message
if msg.OtelCarrier == nil {
msg.OtelCarrier = telemetry.GetCarrier(ctx)
}

t.msgs <- &msgWithQueue{
Message: msg,
q: q,
Expand Down
20 changes: 10 additions & 10 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes
}

func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-job-run-queued")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-queued", task.OtelCarrier)
defer span.End()

payload := tasktypes.JobRunQueuedTaskPayload{}
Expand Down Expand Up @@ -340,7 +340,7 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq
}

func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-job-run-cancelled")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-cancelled", task.OtelCarrier)
defer span.End()

payload := tasktypes.JobRunCancelledTaskPayload{}
Expand Down Expand Up @@ -397,7 +397,7 @@ func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *m
}

func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-retry")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-retry", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunRetryTaskPayload{}
Expand Down Expand Up @@ -469,7 +469,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq
// handleStepRunReplay replays a step run from scratch - it resets the workflow run state, job run state, and
// all cancelled step runs which are children of the step run being replayed.
func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-replay")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-replay", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunReplayTaskPayload{}
Expand Down Expand Up @@ -593,7 +593,7 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg
}

func (ec *JobsControllerImpl) handleStepRunQueued(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-queued")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-queued", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunTaskPayload{}
Expand Down Expand Up @@ -934,7 +934,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
}

func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-started")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-started", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunStartedTaskPayload{}
Expand Down Expand Up @@ -980,7 +980,7 @@ func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *ms
}

func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-finished")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-finished", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunFinishedTaskPayload{}
Expand Down Expand Up @@ -1072,7 +1072,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m
}

func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-failed")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-failed", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunFailedTaskPayload{}
Expand Down Expand Up @@ -1202,7 +1202,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
}

func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timed-out")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-timed-out", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunTimedOutTaskPayload{}
Expand All @@ -1224,7 +1224,7 @@ func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *m
}

func (ec *JobsControllerImpl) handleStepRunCancel(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-cancel")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-cancel", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunCancelTaskPayload{}
Expand Down
2 changes: 1 addition & 1 deletion internal/services/controllers/jobs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (q *queue) handleTask(ctx context.Context, task *msgqueue.Message) (err err
}

func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) error {
_, span := telemetry.NewSpan(ctx, "handle-check-queue")
_, span := telemetry.NewSpanWithCarrier(ctx, "handle-check-queue", task.OtelCarrier)
defer span.End()

metadata := tasktypes.CheckTenantQueueMetadata{}
Expand Down
6 changes: 3 additions & 3 deletions internal/services/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (d *DispatcherImpl) handleTask(ctx context.Context, task *msgqueue.Message)
}

func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "group-key-action-assigned")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "group-key-action-assigned", task.OtelCarrier)
defer span.End()

payload := tasktypes.GroupKeyActionAssignedTaskPayload{}
Expand Down Expand Up @@ -427,7 +427,7 @@ func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, t
}

func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "step-run-assigned")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunAssignedTaskPayload{}
Expand Down Expand Up @@ -528,7 +528,7 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms
}

func (d *DispatcherImpl) handleStepRunCancelled(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "step-run-cancelled")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-cancelled", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunCancelledTaskPayload{}
Expand Down
2 changes: 0 additions & 2 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,6 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByAdditionalMeta(key string, v
tenant := stream.Context().Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)

s.l.Error().Msgf("Received subscribe request for additional meta key-value: {%s: %s}", key, value)

q, err := msgqueue.TenantEventConsumerQueue(tenantId)
if err != nil {
return err
Expand Down
27 changes: 26 additions & 1 deletion internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -78,7 +79,11 @@ func InitTracer(opts *TracerOpts) (func(context.Context) error, error) {
otel.SetTracerProvider(
sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(traceIdRatio)),
sdktrace.WithBatcher(exporter),
sdktrace.WithBatcher(
exporter,
sdktrace.WithMaxQueueSize(sdktrace.DefaultMaxQueueSize*10),
sdktrace.WithMaxExportBatchSize(sdktrace.DefaultMaxExportBatchSize*10),
),
sdktrace.WithResource(resources),
),
)
Expand All @@ -91,6 +96,26 @@ func NewSpan(ctx context.Context, name string) (context.Context, trace.Span) {
return ctx, span
}

func NewSpanWithCarrier(ctx context.Context, name string, carrier map[string]string) (context.Context, trace.Span) {
propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})

otelCarrier := propagation.MapCarrier(carrier)
parentCtx := propagator.Extract(ctx, otelCarrier)

ctx, span := otel.Tracer("").Start(parentCtx, prefixSpanKey(name))
return ctx, span
}

func GetCarrier(ctx context.Context) map[string]string {
propgator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})

// Serialize the context into carrier
carrier := propagation.MapCarrier{}
propgator.Inject(ctx, carrier)

return carrier
}

type AttributeKey string

// AttributeKV is a wrapper for otel attributes KV
Expand Down
4 changes: 4 additions & 0 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ func UniqueSet[T any](i []T, keyFunc func(T) string) map[string]struct{} {
}

func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId string) (repository.QueueStepRunsResult, error) {
ctx, span := telemetry.NewSpan(ctx, "queue-step-runs-database")
defer span.End()

startedAt := time.Now().UTC()

emptyRes := repository.QueueStepRunsResult{
Expand Down Expand Up @@ -1048,6 +1051,7 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st
}

plan, err := scheduling.GeneratePlan(
ctx,
slots,
uniqueActionsArr,
queueItems,
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduling/scheduling.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package scheduling

import (
"context"
"time"

"github.com/jackc/pgx/v5/pgtype"

"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
Expand All @@ -17,6 +19,7 @@ type QueueItemWithOrder struct {
}

func GeneratePlan(
ctx context.Context,
slots []*dbsqlc.ListSemaphoreSlotsToAssignRow,
uniqueActionsArr []string,
queueItems []*QueueItemWithOrder,
Expand All @@ -25,6 +28,8 @@ func GeneratePlan(
workerLabels map[string][]*dbsqlc.GetWorkerLabelsRow,
stepDesiredLabels map[string][]*dbsqlc.GetDesiredLabelsRow,
) (SchedulePlan, error) {
_, span := telemetry.NewSpan(ctx, "generate-scheduling-plan")
defer span.End()

plan := SchedulePlan{
StepRunIds: make([]pgtype.UUID, 0),
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduling

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -277,6 +278,7 @@ func TestGeneratePlan(t *testing.T) {
}

got, err := GeneratePlan(
context.Background(),
fixtureData.Slots,
fixtureData.UniqueActionsArr,
fixtureData.QueueItems,
Expand All @@ -303,6 +305,7 @@ func BenchmarkGeneratePlan(b *testing.B) {

for i := 0; i < b.N; i++ {
_, _ = GeneratePlan(
context.Background(),
fixtureData.Slots,
fixtureData.UniqueActionsArr,
fixtureData.QueueItems,
Expand Down

0 comments on commit 263eaf0

Please sign in to comment.