forked from hatchet-dev/hatchet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflow_run.go
444 lines (336 loc) · 14.4 KB
/
workflow_run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
package repository
import (
"context"
"fmt"
"time"
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/pkg/random"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
)
type CreateWorkflowRunOpts struct {
// (optional) the workflow run display name
DisplayName *string
// (required) the workflow version id
WorkflowVersionId string `validate:"required,uuid"`
ManualTriggerInput *string `validate:"omitnil,required_without=TriggeringEventId,required_without=Cron,required_without=ScheduledWorkflowId,excluded_with=TriggeringEventId,excluded_with=Cron,excluded_with=ScheduledWorkflowId"`
// (optional) the event id that triggered the workflow run
TriggeringEventId *string `validate:"omitnil,uuid,required_without=ManualTriggerInput,required_without=Cron,required_without=ScheduledWorkflowId,excluded_with=ManualTriggerInput,excluded_with=Cron,excluded_with=ScheduledWorkflowId"`
// (optional) the cron schedule that triggered the workflow run
Cron *string `validate:"omitnil,cron,required_without=ManualTriggerInput,required_without=TriggeringEventId,required_without=ScheduledWorkflowId,excluded_with=ManualTriggerInput,excluded_with=TriggeringEventId,excluded_with=ScheduledWorkflowId"`
CronParentId *string `validate:"omitnil,uuid,required_without=ManualTriggerInput,required_without=TriggeringEventId,required_without=ScheduledWorkflowId,excluded_with=ManualTriggerInput,excluded_with=TriggeringEventId,excluded_with=ScheduledWorkflowId"`
// (optional) the scheduled trigger
ScheduledWorkflowId *string `validate:"omitnil,uuid,required_without=ManualTriggerInput,required_without=TriggeringEventId,required_without=Cron,excluded_with=ManualTriggerInput,excluded_with=TriggeringEventId,excluded_with=Cron"`
InputData []byte
TriggeredBy string
GetGroupKeyRun *CreateGroupKeyRunOpts `validate:"omitempty"`
// (optional) the parent workflow run which this workflow run was triggered from
ParentId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id which this workflow run was triggered from
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) the child key of the workflow run, if this is a child run of a different workflow
ChildKey *string
// (optional) the child index of the workflow run, if this is a child run of a different workflow
ChildIndex *int
// (optional) additional metadata for the workflow run
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
// (optional) the desired worker id for sticky state
DesiredWorkerId *string `validate:"omitempty,uuid"`
// (optional) the deduplication value for the workflow run
DedupeValue *string `validate:"omitempty"`
// (optional) the priority of the workflow run
Priority *int32 `validate:"omitempty,min=1,max=3"`
}
type CreateGroupKeyRunOpts struct {
// (optional) the input data
Input []byte
}
type CreateWorkflowRunOpt func(*CreateWorkflowRunOpts)
func WithParent(
parentId, parentStepRunId string,
childIndex int,
childKey *string,
additionalMetadata map[string]interface{},
parentAdditionalMetadata map[string]interface{},
) CreateWorkflowRunOpt {
return func(opts *CreateWorkflowRunOpts) {
opts.ParentId = &parentId
opts.ParentStepRunId = &parentStepRunId
opts.ChildIndex = &childIndex
opts.ChildKey = childKey
opts.AdditionalMetadata = parentAdditionalMetadata
if opts.AdditionalMetadata == nil {
opts.AdditionalMetadata = make(map[string]interface{})
}
for k, v := range additionalMetadata {
opts.AdditionalMetadata[k] = v
}
}
}
func GetCreateWorkflowRunOptsFromManual(
workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
input []byte,
additionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error) {
if input == nil {
input = []byte("{}")
}
opts := &CreateWorkflowRunOpts{
DisplayName: StringPtr(getWorkflowRunDisplayName(workflowVersion.WorkflowName)),
WorkflowVersionId: sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID),
ManualTriggerInput: StringPtr(string(input)),
TriggeredBy: string(datautils.TriggeredByManual),
InputData: input,
AdditionalMetadata: additionalMetadata,
}
if workflowVersion.ConcurrencyLimitStrategy.Valid {
opts.GetGroupKeyRun = &CreateGroupKeyRunOpts{
Input: input,
}
}
return opts, nil
}
func GetCreateWorkflowRunOptsFromParent(
workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
input []byte,
parentId, parentStepRunId string,
childIndex int,
childKey *string,
additionalMetadata map[string]interface{},
parentAdditionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error) {
if input == nil {
input = []byte("{}")
}
opts := &CreateWorkflowRunOpts{
DisplayName: StringPtr(getWorkflowRunDisplayName(workflowVersion.WorkflowName)),
WorkflowVersionId: sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID),
ManualTriggerInput: StringPtr(string(input)),
TriggeredBy: string(datautils.TriggeredByParent),
InputData: input,
}
WithParent(parentId, parentStepRunId, childIndex, childKey, additionalMetadata, parentAdditionalMetadata)(opts)
if workflowVersion.ConcurrencyLimitStrategy.Valid {
opts.GetGroupKeyRun = &CreateGroupKeyRunOpts{
Input: input,
}
}
return opts, nil
}
func GetCreateWorkflowRunOptsFromEvent(
eventId string,
workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
input []byte,
additionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error) {
if input == nil {
input = []byte("{}")
}
opts := &CreateWorkflowRunOpts{
DisplayName: StringPtr(getWorkflowRunDisplayName(workflowVersion.WorkflowName)),
WorkflowVersionId: sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID),
TriggeringEventId: &eventId,
TriggeredBy: string(datautils.TriggeredByEvent),
InputData: input,
AdditionalMetadata: additionalMetadata,
}
if workflowVersion.ConcurrencyLimitStrategy.Valid {
opts.GetGroupKeyRun = &CreateGroupKeyRunOpts{
Input: input,
}
}
return opts, nil
}
func GetCreateWorkflowRunOptsFromCron(
cron,
cronParentId string,
workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
input []byte,
additionalMetadata map[string]interface{},
) (*CreateWorkflowRunOpts, error) {
if input == nil {
input = []byte("{}")
}
opts := &CreateWorkflowRunOpts{
DisplayName: StringPtr(getWorkflowRunDisplayName(workflowVersion.WorkflowName)),
WorkflowVersionId: sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID),
Cron: &cron,
CronParentId: &cronParentId,
TriggeredBy: string(datautils.TriggeredByCron),
InputData: input,
AdditionalMetadata: additionalMetadata,
}
if workflowVersion.ConcurrencyLimitStrategy.Valid {
opts.GetGroupKeyRun = &CreateGroupKeyRunOpts{
Input: input,
}
}
return opts, nil
}
func GetCreateWorkflowRunOptsFromSchedule(
scheduledWorkflowId string,
workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow,
input []byte,
additionalMetadata map[string]interface{},
fs ...CreateWorkflowRunOpt,
) (*CreateWorkflowRunOpts, error) {
if input == nil {
input = []byte("{}")
}
opts := &CreateWorkflowRunOpts{
DisplayName: StringPtr(getWorkflowRunDisplayName(workflowVersion.WorkflowName)),
WorkflowVersionId: sqlchelpers.UUIDToStr(workflowVersion.WorkflowVersion.ID),
ScheduledWorkflowId: &scheduledWorkflowId,
TriggeredBy: string(datautils.TriggeredBySchedule),
InputData: input,
AdditionalMetadata: additionalMetadata,
}
if workflowVersion.ConcurrencyLimitStrategy.Valid {
opts.GetGroupKeyRun = &CreateGroupKeyRunOpts{
Input: input,
}
}
for _, f := range fs {
f(opts)
}
return opts, nil
}
func getWorkflowRunDisplayName(workflowName string) string {
workflowSuffix, _ := random.Generate(6)
return workflowName + "-" + workflowSuffix
}
type ListWorkflowRunsOpts struct {
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the workflow version id
WorkflowVersionId *string `validate:"omitempty,uuid"`
// (optional) a list of workflow run ids to filter by
Ids []string `validate:"omitempty,dive,uuid"`
// (optional) the parent workflow run id
ParentId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) the event id that triggered the workflow run
EventId *string `validate:"omitempty,uuid"`
// (optional) the group key for the workflow run
GroupKey *string
// (optional) the status of the workflow run
Statuses *[]db.WorkflowRunStatus
// (optional) a list of kinds to filter by
Kinds *[]dbsqlc.WorkflowKind
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt finishedAt startedAt duration"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
// (optional) a time after which the run was created
CreatedAfter *time.Time
// (optional) a time before which the run was created
CreatedBefore *time.Time
// (optional) a time before which the run was finished
FinishedAfter *time.Time
// (optional) exact metadata to filter by
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}
type WorkflowRunsMetricsOpts struct {
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the workflow version id
WorkflowVersionId *string `validate:"omitempty,uuid"`
// (optional) the parent workflow run id
ParentId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) the event id that triggered the workflow run
EventId *string `validate:"omitempty,uuid"`
// (optional) exact metadata to filter by
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
// (optional) the time the workflow run was created before
CreatedBefore *time.Time `validate:"omitempty"`
// (optional) the time the workflow run was created after
CreatedAfter *time.Time `validate:"omitempty"`
}
type ListWorkflowRunsResult struct {
Rows []*dbsqlc.ListWorkflowRunsRow
Count int
}
type CreateWorkflowRunPullRequestOpts struct {
RepositoryOwner string
RepositoryName string
PullRequestID int
PullRequestTitle string
PullRequestNumber int
PullRequestHeadBranch string
PullRequestBaseBranch string
PullRequestState string
}
type ListPullRequestsForWorkflowRunOpts struct {
State *string
}
type ListWorkflowRunRoundRobinsOpts struct {
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the workflow version id
WorkflowVersionId *string `validate:"omitempty,uuid"`
// (optional) the status of the workflow run
Status *db.WorkflowRunStatus
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
}
type WorkflowRunMetricsCountOpts struct {
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the workflow version id
WorkflowVersionId *string `validate:"omitempty,uuid"`
}
type StepRunForJobRun struct {
*dbsqlc.GetStepRunsForJobRunsRow
ChildWorkflowsCount int
}
type WorkflowRunAPIRepository interface {
RegisterCreateCallback(callback Callback[*dbsqlc.WorkflowRun])
// ListWorkflowRuns returns workflow runs for a given workflow version id.
ListWorkflowRuns(ctx context.Context, tenantId string, opts *ListWorkflowRunsOpts) (*ListWorkflowRunsResult, error)
// Counts by status
WorkflowRunMetricsCount(ctx context.Context, tenantId string, opts *WorkflowRunsMetricsOpts) (*dbsqlc.WorkflowRunsMetricsCountRow, error)
GetWorkflowRunInputData(tenantId, workflowRunId string) (map[string]interface{}, error)
// CreateNewWorkflowRun creates a new workflow run for a workflow version.
CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *CreateWorkflowRunOpts) (*dbsqlc.WorkflowRun, error)
// GetWorkflowRunById returns a workflow run by id.
GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunByIdRow, error)
GetStepsForJobs(ctx context.Context, tenantId string, jobIds []string) ([]*dbsqlc.GetStepsForJobsRow, error)
GetStepRunsForJobRuns(ctx context.Context, tenantId string, jobRunIds []string) ([]*StepRunForJobRun, error)
}
var (
ErrWorkflowRunNotFound = fmt.Errorf("workflow run not found")
)
type ErrDedupeValueExists struct {
DedupeValue string
}
func (e ErrDedupeValueExists) Error() string {
return fmt.Sprintf("workflow run with dedupe value %s already exists", e.DedupeValue)
}
type WorkflowRunEngineRepository interface {
RegisterCreateCallback(callback Callback[*dbsqlc.WorkflowRun])
// ListWorkflowRuns returns workflow runs for a given workflow version id.
ListWorkflowRuns(ctx context.Context, tenantId string, opts *ListWorkflowRunsOpts) (*ListWorkflowRunsResult, error)
GetChildWorkflowRun(ctx context.Context, parentId, parentStepRunId string, childIndex int, childkey *string) (*dbsqlc.WorkflowRun, error)
GetScheduledChildWorkflowRun(ctx context.Context, parentId, parentStepRunId string, childIndex int, childkey *string) (*dbsqlc.WorkflowTriggerScheduledRef, error)
PopWorkflowRunsRoundRobin(ctx context.Context, tenantId, workflowId string, maxRuns int) ([]*dbsqlc.WorkflowRun, error)
// CreateNewWorkflowRun creates a new workflow run for a workflow version.
CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *CreateWorkflowRunOpts) (string, error)
// GetWorkflowRunById returns a workflow run by id.
GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunRow, error)
GetWorkflowRunAdditionalMeta(ctx context.Context, tenantId, workflowRunId string) (*dbsqlc.GetWorkflowRunAdditionalMetaRow, error)
ReplayWorkflowRun(ctx context.Context, tenantId, workflowRunId string) (*dbsqlc.GetWorkflowRunRow, error)
ListActiveQueuedWorkflowVersions(ctx context.Context) ([]*dbsqlc.ListActiveQueuedWorkflowVersionsRow, error)
// DeleteExpiredWorkflowRuns deletes workflow runs that were created before the given time. It returns the number of deleted runs
// and the number of non-deleted runs that match the conditions.
SoftDeleteExpiredWorkflowRuns(ctx context.Context, tenantId string, statuses []dbsqlc.WorkflowRunStatus, before time.Time) (bool, error)
}