forked from evergreen-ci/evergreen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache_historical_task_data.go
160 lines (144 loc) · 4.77 KB
/
cache_historical_task_data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package units
import (
"context"
"fmt"
"strings"
"time"
"github.com/evergreen-ci/evergreen"
"github.com/evergreen-ci/evergreen/model/taskstats"
"github.com/evergreen-ci/utility"
"github.com/mongodb/amboy"
"github.com/mongodb/amboy/job"
"github.com/mongodb/amboy/registry"
"github.com/mongodb/grip"
"github.com/mongodb/grip/message"
"github.com/pkg/errors"
)
const cacheHistoricalTaskDataName = "cache-historical-task-data"
func init() {
registry.AddJobType(cacheHistoricalTaskDataName,
func() amboy.Job { return makeCacheHistoricalTaskDataJob() })
}
type cacheHistoricalTaskDataJob struct {
ProjectID string `bson:"project_id" json:"project_id" yaml:"project_id"`
Requesters []string `bson:"requesters" json:"requesters" yaml:"requesters"`
job.Base `bson:"job_base" json:"job_base" yaml:"job_base"`
}
func NewCacheHistoricalTaskDataJob(id, projectID string) amboy.Job {
j := makeCacheHistoricalTaskDataJob()
j.ProjectID = projectID
j.Requesters = []string{evergreen.RepotrackerVersionRequester}
j.SetID(fmt.Sprintf("%s.%s.%s", cacheHistoricalTaskDataName, projectID, id))
return j
}
func makeCacheHistoricalTaskDataJob() *cacheHistoricalTaskDataJob {
j := &cacheHistoricalTaskDataJob{
Base: job.Base{
JobType: amboy.JobType{
Name: cacheHistoricalTaskDataName,
Version: 0,
},
},
}
return j
}
func (j *cacheHistoricalTaskDataJob) Run(ctx context.Context) {
defer j.MarkComplete()
startAt := time.Now()
timingMsg := message.Fields{
"job_id": j.ID(),
"project": j.ProjectID,
"job_type": j.Type().Name,
"message": "timing-info",
"run_start_at": startAt,
}
defer func() {
timingMsg["has_errors"] = j.HasErrors()
timingMsg["aborted"] = ctx.Err() != nil
timingMsg["total"] = time.Since(startAt).Seconds()
timingMsg["run_end_at"] = time.Now()
grip.Info(timingMsg)
}()
flags, err := evergreen.GetServiceFlags(ctx)
if err != nil {
j.AddError(errors.Wrap(err, "getting service flags"))
return
}
if flags.CacheStatsJobDisabled {
j.AddError(errors.New("cache stats job is disabled"))
return
}
var statsStatus taskstats.StatsStatus
timingMsg["status_check"] = reportTiming(func() {
statsStatus, err = taskstats.GetStatsStatus(j.ProjectID)
j.AddError(errors.Wrap(err, "getting daily task stats status"))
}).Seconds()
if j.HasErrors() {
return
}
// Calculate the window of time within which we would like to check for
// stats to update, starting with ProcessedTasksUntil (the time before
// which all finished tasks have been processed for this project) up
// until now. This size of this window is capped at 24 hours to prevent
// long-running jobs and overwhelming the database.
update_window_start := statsStatus.ProcessedTasksUntil
update_window_end := time.Now()
if max := update_window_start.Add(24 * time.Hour); update_window_end.After(max) {
update_window_end = max
}
timingMsg["stats_update_window_start"] = update_window_start
timingMsg["stats_update_window_end"] = update_window_end
var statsToUpdate []taskstats.StatsToUpdate
timingMsg["find_task_stats_to_update"] = reportTiming(func() {
statsToUpdate, err = taskstats.FindStatsToUpdate(taskstats.FindStatsToUpdateOptions{
ProjectID: j.ProjectID,
Requesters: j.Requesters,
Start: update_window_start,
End: update_window_end,
})
j.AddError(errors.Wrap(err, "finding daily task stats to update"))
}).Seconds()
if j.HasErrors() {
return
}
timingMsg["update_daily_task_stats"] = reportTiming(func() {
for _, toUpdate := range statsToUpdate {
if len(toUpdate.Tasks) > 0 {
err := errors.Wrap(taskstats.GenerateStats(ctx, taskstats.GenerateStatsOptions{
ProjectID: j.ProjectID,
Requester: toUpdate.Requester,
Date: toUpdate.Day,
Tasks: toUpdate.Tasks,
}), "generating daily task stats")
grip.Warning(message.WrapError(err, message.Fields{
"job_id": j.ID(),
"project": j.ProjectID,
"job_type": j.Type().Name,
"job_start_time": startAt,
"task_date": utility.GetUTCDay(toUpdate.Day),
}))
if err != nil {
j.AddError(err)
return
}
}
}
}).Seconds()
if j.HasErrors() {
errMsg := j.Error().Error()
// The following errors are known to recur. In these cases we
// continue to update the task stats status to prevent
// re-processing of the same error.
if !strings.Contains(errMsg, evergreen.KeyTooLargeToIndexError) && !strings.Contains(errMsg, evergreen.InvalidDivideInputError) {
return
}
}
timingMsg["save_stats_status"] = reportTiming(func() {
j.AddError(errors.Wrap(taskstats.UpdateStatsStatus(j.ProjectID, startAt, update_window_end, time.Since(startAt)), "updating daily task stats status"))
}).Seconds()
}
func reportTiming(fn func()) time.Duration {
startAt := time.Now()
fn()
return time.Since(startAt)
}