Skip to content

Commit

Permalink
[ch] Dr. CI to use CH Part 1 (#5634)
Browse files Browse the repository at this point in the history
Moves most queries used by Dr. CI to CH. There are some one off queries
that I still need to track down

* types changed: the CH query doesn't have nulls, so I changed null
checks to check for the corresponding default value ("" for string, 0
for int, time 0 for timestamps) and removed a lot of `!`. This causes a
mild conflict with the `JobData` struct, so there's some casting that I
don't think actually does anything. I'm also not sure how accurate the
typing was originally, since I think some ids were int before too
  * is there a runtime type checker for typescript?  
* I tried and failed to add a linter, like
https://typescript-eslint.io/rules/strict-boolean-expressions/ to make
sure the null/existence checks were correct. It would be nice to figure
this out
* CH returns timestamps without the "Z" timezone, but they are all in
UTC, so use `dayjs.utc` to make sure it is correct (`dayjs` uses local
timezone by default)

I checked the outputs by cherry picking csl/comp_orig
(4db3fa5) onto this branch and
origin/main (origin/main also needs
055a919 to fix a bug), running curl
`curl "http://localhost:3000/api/drci/drci" --data 'repo=pytorch' >
../_logs/<new, orig>.json`, running a script

<summary><details>

```


import json
from library import REPO_ROOT
import re

def failures(thing):
    thing = thing["failures"]
    for pr_num in thing:
        for classification in thing[pr_num]:
            thing[pr_num][classification] = sorted(thing[pr_num][classification], key=lambda x: x["id"])
            for failure in thing[pr_num][classification]:
                if failure["failure_captures"] == None:
                    failure["failure_captures"] = []
                if failure["failure_context"] == None:
                    failure["failure_context"] = []
                if failure["failure_lines"] == None:
                    failure["failure_lines"] = []
                if failure["runnerName"] == None:
                    failure["runnerName"] = ""
                for key, value in failure.items():
                    date_match = re.match(r"(.*) (.*)\.000000000", str(value))
                    if date_match:
                        failure[key] = f"date"
                    date_match_2 = re.match(r"(\d\d\d\d-\d\d-\d\d)T(\d\d:\d\d:\d\d).*", str(value))
                    if date_match_2:
                        failure[key] = f"date"
    return thing


def comments(thing):
    thing = thing["comments"]
    s = "\n"
    for pr_num in sorted(thing.keys()):
        s += f"{pr_num}\n"
        s += f"{thing[pr_num]['new']}\n"
        s += f"{thing[pr_num]['body']}\n"
    return s

if __name__ == "__main__":
    orig_file = REPO_ROOT / "../test-infra/_logs/orig.json"
    new_file = REPO_ROOT / "../test-infra/_logs/new.json"

    with open(orig_file, "r") as f:
        orig = json.load(f)

    with open(new_file, "r") as f:
        new = json.load(f)

    o_thing = failures(orig)
    n_thing = failures(new)

    with open(orig_file, "w") as f:
        json.dump(o_thing, f, indent=2, sort_keys=True)
        print(comments(orig), file=f)

    with open(new_file, "w") as f:
        json.dump(n_thing, f, indent=2, sort_keys=True)
        print(comments(new), file=f)

```

</details></summary>

and the diffing the files

There were small differences which I then checked individually, and they
were due to the databases not being exactly the same when I did the
curls. There were also some small differences in which disable issues
were used, but based on the code for that I think it's still technically
correct
  • Loading branch information
clee2000 committed Sep 11, 2024
1 parent d7af4ff commit eb5fda4
Show file tree
Hide file tree
Showing 17 changed files with 446 additions and 512 deletions.
7 changes: 3 additions & 4 deletions torchci/clickhouse_queries/issue_query/query.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted
SELECT
issue.number,
issue.title,
Expand All @@ -8,7 +7,7 @@ SELECT
issue.updated_at,
issue.author_association,
FROM
issues AS issue
CROSS JOIN UNNEST(issue.labels AS label) AS labels
issues AS issue final
array join issue.labels AS label
WHERE
labels.label.name =: label
label.name = {label: String}
5 changes: 5 additions & 0 deletions torchci/clickhouse_queries/pr_merge_commits/params.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"pr_nums": "Array(String)",
"owner": "String",
"project": "String"
}
12 changes: 12 additions & 0 deletions torchci/clickhouse_queries/pr_merge_commits/query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
SELECT
pr_num,
merge_commit_sha,
FROM
default.merges final
WHERE
pr_num in {pr_nums: Array(Int64)}
AND owner = {owner: String}
AND project = {project: String}
AND merge_commit_sha != ''
ORDER BY
comment_id DESC
20 changes: 15 additions & 5 deletions torchci/clickhouse_queries/recent_pr_workflows_query/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,27 @@ WITH relevant_shas as (
from default.pull_request pr final
where pr.number = {prNumber: Int64}
),
relevant_pushes as (
-- optimization because push is currently ordered by timestamp
select push.head_commit.'timestamp' as timestamp, push.head_commit.'id' as id
from default.push final
where push.head_commit.'timestamp' in (select timestamp from materialized_views.push_by_sha where id in relevant_shas)
),
recent_prs AS (
SELECT
distinct pull_request.head.'sha' AS sha,
pull_request.number AS number,
push.head_commit.'timestamp' AS timestamp
push.timestamp AS timestamp
FROM
relevant_shas r
JOIN default.pull_request pull_request final ON r.head_sha = pull_request.head.'sha'
-- Do a left join here because the push table won't have any information about
-- commits from forked repo
LEFT JOIN default.push push final ON r.head_sha = push.head_commit.'id'
LEFT JOIN relevant_pushes push ON r.head_sha = push.id
WHERE
pull_request.base.'repo'.'full_name' = {repo: String}
-- Filter pull request table to be smaller to amke query faster
and pull_request.number in (select number from materialized_views.pr_by_sha where head_sha in (select head_sha from relevant_shas))
)
SELECT
w.id AS workflowId,
Expand Down Expand Up @@ -56,15 +64,15 @@ where
and j.id in (select id from materialized_views.workflow_job_by_head_sha where head_sha in (select sha from recent_prs))
UNION all
SELECT
null AS workflowId,
0 AS workflowId,
w.workflow_id as workflowUniqueId,
w.id as id,
null AS runnerName,
'' AS runnerName,
w.head_commit.'author'.'email' as authorEmail,
w.name AS name,
w.name AS jobName,
w.conclusion as conclusion,
null as completed_at,
toDateTime64(0, 9) as completed_at,
w.html_url as html_url,
w.head_branch as head_branch,
recent_prs.number AS pr_number,
Expand All @@ -81,3 +89,5 @@ WHERE
w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in (select sha from recent_prs))
ORDER BY
time DESC
-- Non experimental analyzer has problems with final...
SETTINGS allow_experimental_analyzer=1
7 changes: 7 additions & 0 deletions torchci/lib/bot/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import dayjs from "dayjs";
import { Octokit } from "octokit";
import { Context, Probot } from "probot";
import urllib from "urllib";

export function isTime0(time: string): boolean {
return dayjs.utc(time).valueOf() == 0;
}

export const TIME_0 = "1970-01-01 00:00:00.000000000";

export function repoKey(
context: Context | Context<"pull_request.labeled">
): string {
Expand Down
19 changes: 17 additions & 2 deletions torchci/lib/clickhouse.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
// This file can't be imported by files on the client side (ex .tsx) due to
// lacking modules (ex fs) and required environment variables. Run `yarn run
// build` to see the error and where it is imported from if vercel fails to
// deploy.
import { createClient } from "@clickhouse/client";
import { readFileSync } from "fs";
// Import itself to ensure that mocks can be applied, see
// https://stackoverflow.com/questions/51900413/jest-mock-function-doesnt-work-while-it-was-called-in-the-other-function
// https://stackoverflow.com/questions/45111198/how-to-mock-functions-in-the-same-module-using-jest
import * as thisModule from "./clickhouse";

export function getClickhouseClient() {
return createClient({
Expand Down Expand Up @@ -38,7 +46,11 @@ export async function queryClickhouseSaved(
* @param queryName: string, the name of the query, which is the name of the folder in clickhouse_queries
* @param inputParams: Record<string, unknown>, the parameters to the query, an object where keys are the parameter names
*
* This function will filter the inputParams to only include the parameters that are in the query params json file
* This function will filter the inputParams to only include the parameters
* that are in the query params json file.
*
* During local development, if this fails due to "cannot find module ...
* params.json", delete the .next folder and try again.
*/
const query = readFileSync(
// https://stackoverflow.com/questions/74924100/vercel-error-enoent-no-such-file-or-directory
Expand All @@ -50,7 +62,10 @@ export async function queryClickhouseSaved(
const queryParams = new Map(
Object.entries(paramsText).map(([key, _]) => [key, inputParams[key]])
);
return await queryClickhouse(query, Object.fromEntries(queryParams));
return await thisModule.queryClickhouse(
query,
Object.fromEntries(queryParams)
);
}

export function enableClickhouse() {
Expand Down
89 changes: 51 additions & 38 deletions torchci/lib/drciUtils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Client } from "@opensearch-project/opensearch";
import dayjs from "dayjs";
import utc from "dayjs/plugin/utc";
import { isEligibleCommitForSimilarFailureCheck } from "lib/commitUtils";
import fetchIssuesByLabel from "lib/fetchIssuesByLabel";
import { fetchIssuesByLabelCH } from "lib/fetchIssuesByLabel";
import {
hasS3Log,
isFailureFromPrevMergeCommit,
Expand All @@ -12,8 +13,10 @@ import { MAX_SIZE, OLDEST_FIRST, querySimilarFailures } from "lib/searchUtils";
import { RecentWorkflowsData } from "lib/types";
import _ from "lodash";
import { Octokit } from "octokit";
import { isDrCIEnabled, isPyTorchPyTorch } from "./bot/utils";
import { isDrCIEnabled, isPyTorchPyTorch, isTime0, TIME_0 } from "./bot/utils";
import { queryClickhouseSaved } from "./clickhouse";
import { IssueData } from "./types";
dayjs.extend(utc);

export const NUM_MINUTES = 30;
export const REPO: string = "pytorch";
Expand Down Expand Up @@ -193,7 +196,7 @@ export async function upsertDrCiComment(
);
const existingDrciID = existingDrciData.id;
const existingDrciComment = existingDrciData.body;
const sev = getActiveSEVs(await fetchIssuesByLabel("ci: sev"));
const sev = getActiveSEVs(await fetchIssuesByLabelCH("ci: sev"));
const drciComment = formDrciComment(
prNum,
owner,
Expand Down Expand Up @@ -247,23 +250,16 @@ export async function hasSimilarFailures(
// NB: Using the job completed_at timestamp has many false positives, so it's
// better that we only enable this feature when the head commit timestamp is
// available and use it as the end date
if (
job.head_sha_timestamp === undefined ||
job.head_sha_timestamp === null ||
job.head_sha_timestamp === "" ||
job.head_sha_timestamp === "0"
) {
if (isTime0(job.head_sha_timestamp)) {
return;
}

// NB: Use the commit timestamp here instead of the job timestamp to avoid using
// the wrong end date when a PR is reverted and the job reruns
const endDate = dayjs(job.head_sha_timestamp);
const startDate = dayjs(
baseCommitDate !== "" && baseCommitDate !== "0"
? baseCommitDate
: job.head_sha_timestamp
).subtract(lookbackPeriodInHours, "hour");
const endDate = dayjs.utc(job.head_sha_timestamp);
const startDate = dayjs
.utc(!isTime0(baseCommitDate) ? baseCommitDate : job.head_sha_timestamp)
.subtract(lookbackPeriodInHours, "hour");

if (
endDate.diff(startDate, "hour") >
Expand Down Expand Up @@ -297,9 +293,10 @@ export async function hasSimilarFailures(
let foundSimilarFailure;
for (const record of records) {
// Convert the result in JobData to RecentWorkflowsData used by Dr.CI
// TODO remove `as any` when CH migration is complete?
const failure: RecentWorkflowsData = {
workflowId: record.workflowId,
id: record.id as string,
workflowId: record.workflowId as any as number,
id: record.id as any as number,
jobName: record.jobName as string,
name: record.name as string,
conclusion: record.conclusion as string,
Expand All @@ -308,9 +305,12 @@ export async function hasSimilarFailures(
head_sha: record.sha as string,
head_branch: record.branch as string,
failure_captures: record.failureCaptures as string[],
failure_lines: record.failureLines,
failure_context: record.failureContext,
failure_lines: record.failureLines as string[],
failure_context: record.failureContext as string[],
authorEmail: record.authorEmail,
workflowUniqueId: 0,
head_sha_timestamp: TIME_0,
pr_number: 0,
};

const isEligibleCommit = await isEligibleCommitForSimilarFailureCheck(
Expand Down Expand Up @@ -367,14 +367,9 @@ export function isInfraFlakyJob(job: RecentWorkflowsData): boolean {
// was allowed to go through as flaky
return (
job.conclusion === "failure" &&
job.workflowId !== null &&
job.workflowId !== undefined &&
(job.failure_lines === null ||
job.failure_lines === undefined ||
job.failure_lines.join("") === "") &&
(job.runnerName === null ||
job.runnerName === undefined ||
job.runnerName === "")
job.workflowId !== 0 &&
(job.failure_lines.length == 0 || job.failure_lines.join("") === "") &&
job.runnerName === ""
);
}

Expand All @@ -383,15 +378,13 @@ export async function isLogClassifierFailed(
): Promise<boolean> {
// Having no workflow ID means that this is a workflow run, not a workflow job.
// We don't want to apply the log classifier check for a workflow run
if (job.workflowId === null || job.workflowId === undefined) {
if (job.workflowId === 0) {
return false;
}

// This covers the case when there is no log on S3 or log classifier fails to triggered
const hasFailureLines =
job.failure_lines !== null &&
job.failure_lines !== undefined &&
job.failure_lines.join("") !== "";
job.failure_lines.length !== 0 && job.failure_lines.join("") !== "";
const hasLog = await hasS3Log(job);

return job.conclusion === "failure" && (!hasFailureLines || !hasLog);
Expand All @@ -404,7 +397,7 @@ export function isExcludedFromFlakiness(job: RecentWorkflowsData): boolean {
_.find(
EXCLUDED_FROM_FLAKINESS,
(exclude: string) =>
job.name !== undefined &&
job.name !== "" &&
job.name.toLowerCase().includes(exclude.toLowerCase())
) !== undefined
);
Expand Down Expand Up @@ -433,11 +426,7 @@ export function getSuppressedLabels(
job: RecentWorkflowsData,
labels: string[]
): string[] {
if (
job.jobName === undefined ||
job.jobName === null ||
!(job.jobName in SUPPRESSED_JOB_BY_LABELS)
) {
if (job.jobName === "" || !(job.jobName in SUPPRESSED_JOB_BY_LABELS)) {
return [];
}

Expand All @@ -447,7 +436,7 @@ export function getSuppressedLabels(
export function isExcludedFromSimilarityPostProcessing(
job: RecentWorkflowsData
): boolean {
if (job.failure_captures === null || job.failure_captures === undefined) {
if (job.failure_captures.length === 0) {
return false;
}

Expand Down Expand Up @@ -476,3 +465,27 @@ export function hasSimilarFailuresInSamePR(

return;
}

export async function getPRMergeCommits(
owner: string,
repo: string,
prNumbers: number[]
): Promise<Map<number, string[]>> {
// Sort by comment ID desc because we don't want to depend on _event_time in
// general
const results = await queryClickhouseSaved("pr_merge_commits", {
pr_nums: prNumbers,
owner,
project: repo,
});

// If the array is empty, the PR hasn't been merged yet
return results.reduce((acc: { [prNumber: number]: string[] }, row: any) => {
if (!acc[row.pr_num]) {
acc[row.pr_num] = [];
}

acc[row.pr_num].push(row.merge_commit_sha);
return acc;
}, new Map<number, string[]>());
}
10 changes: 10 additions & 0 deletions torchci/lib/fetchIssuesByLabel.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import rocksetVersions from "rockset/prodVersions.json";
import { queryClickhouseSaved } from "./clickhouse";
import getRocksetClient from "./rockset";
import { IssueData } from "./types";

export async function fetchIssuesByLabelCH(
label: string
): Promise<IssueData[]> {
// Uses CH and doesn't gate on env var, for use with Dr. CI
return await queryClickhouseSaved("issue_query", {
label,
});
}

export default async function fetchIssuesByLabel(
label: string
): Promise<IssueData[]> {
Expand Down
10 changes: 6 additions & 4 deletions torchci/lib/fetchPR.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import { PRData } from "./types";
async function fetchHistoricalCommits(
owner: string,
repo: string,
prNumber: string
prNumber: string,
useClickhouse: boolean
) {
if (enableClickhouse()) {
if (useClickhouse || enableClickhouse()) {
return await queryClickhouseSaved("pr_commits", {
pr_num: prNumber,
owner,
Expand Down Expand Up @@ -49,7 +50,8 @@ export default async function fetchPR(
owner: string,
repo: string,
prNumber: string,
octokit: Octokit
octokit: Octokit,
useClickhouse: boolean = false
): Promise<PRData> {
// We pull data from both Rockset and Github to get all commits, including
// the ones that have been force merged out of the git history.
Expand All @@ -67,7 +69,7 @@ export default async function fetchPR(
pull_number: parseInt(prNumber),
per_page: 100,
}),
fetchHistoricalCommits(owner, repo, prNumber),
fetchHistoricalCommits(owner, repo, prNumber, useClickhouse),
]);
const title = pull.data.title;
const body = pull.data.body ?? "";
Expand Down
Loading

0 comments on commit eb5fda4

Please sign in to comment.