From f2326f2469847bf8772889ea185d78218d564aab Mon Sep 17 00:00:00 2001 From: Catherine Lee Date: Mon, 9 Sep 2024 14:45:06 -0700 Subject: [PATCH 1/2] tc --- .../clickhouse_queries/issue_query/query.sql | 7 +- .../recent_pr_workflows_query/query.sql | 20 +- torchci/lib/bot/utils.ts | 7 + torchci/lib/drciUtils.ts | 60 +++--- torchci/lib/fetchIssuesByLabel.ts | 10 + torchci/lib/fetchRecentWorkflows.ts | 79 +------ torchci/lib/jobUtils.ts | 61 +++--- torchci/lib/types.ts | 21 +- torchci/pages/api/drci/drci.ts | 204 ++++++++---------- 9 files changed, 188 insertions(+), 281 deletions(-) diff --git a/torchci/clickhouse_queries/issue_query/query.sql b/torchci/clickhouse_queries/issue_query/query.sql index cb5471885f..cc0ee8537b 100644 --- a/torchci/clickhouse_queries/issue_query/query.sql +++ b/torchci/clickhouse_queries/issue_query/query.sql @@ -1,4 +1,3 @@ --- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted SELECT issue.number, issue.title, @@ -8,7 +7,7 @@ SELECT issue.updated_at, issue.author_association, FROM - issues AS issue - CROSS JOIN UNNEST(issue.labels AS label) AS labels + issues AS issue final + array join issue.labels AS label WHERE - labels.label.name =: label + label.name = {label: String} diff --git a/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql b/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql index 114b5ab358..8dbcaea540 100644 --- a/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql +++ b/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql @@ -10,19 +10,27 @@ WITH relevant_shas as ( from default.pull_request pr final where pr.number = {prNumber: Int64} ), +relevant_pushes as ( + -- optimization because push is currently ordered by timestamp + select push.head_commit.'timestamp' as timestamp, push.head_commit.'id' as id + from default.push final + where push.head_commit.'timestamp' in (select timestamp from materialized_views.push_by_sha where id in relevant_shas) +), recent_prs AS ( SELECT distinct pull_request.head.'sha' AS sha, pull_request.number AS number, - push.head_commit.'timestamp' AS timestamp + push.timestamp AS timestamp FROM relevant_shas r JOIN default.pull_request pull_request final ON r.head_sha = pull_request.head.'sha' -- Do a left join here because the push table won't have any information about -- commits from forked repo - LEFT JOIN default.push push final ON r.head_sha = push.head_commit.'id' + LEFT JOIN relevant_pushes push ON r.head_sha = push.id WHERE pull_request.base.'repo'.'full_name' = {repo: String} + -- Filter pull request table to be smaller to amke query faster + and pull_request.number in (select number from materialized_views.pr_by_sha where head_sha in (select head_sha from relevant_shas)) ) SELECT w.id AS workflowId, @@ -56,15 +64,15 @@ where and j.id in (select id from materialized_views.workflow_job_by_head_sha where head_sha in (select sha from recent_prs)) UNION all SELECT - null AS workflowId, + 0 AS workflowId, w.workflow_id as workflowUniqueId, w.id as id, - null AS runnerName, + '' AS runnerName, w.head_commit.'author'.'email' as authorEmail, w.name AS name, w.name AS jobName, w.conclusion as conclusion, - null as completed_at, + toDateTime64(0, 9) as completed_at, w.html_url as html_url, w.head_branch as head_branch, recent_prs.number AS pr_number, @@ -81,3 +89,5 @@ WHERE w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in (select sha from recent_prs)) ORDER BY time DESC +-- Non experimental analyzer has problems with final... +SETTINGS allow_experimental_analyzer=1 diff --git a/torchci/lib/bot/utils.ts b/torchci/lib/bot/utils.ts index 994ff2e228..ce748b1e21 100644 --- a/torchci/lib/bot/utils.ts +++ b/torchci/lib/bot/utils.ts @@ -1,7 +1,14 @@ +import dayjs from "dayjs"; import { Octokit } from "octokit"; import { Context, Probot } from "probot"; import urllib from "urllib"; +export function isTime0(time: string): boolean { + return dayjs.utc(time) == dayjs.unix(0) +} + +export const TIME_0 = "1970-01-01 00:00:00.000000000"; + export function repoKey( context: Context | Context<"pull_request.labeled"> ): string { diff --git a/torchci/lib/drciUtils.ts b/torchci/lib/drciUtils.ts index be7c9e8a0e..f02c3fe976 100644 --- a/torchci/lib/drciUtils.ts +++ b/torchci/lib/drciUtils.ts @@ -1,5 +1,6 @@ import { Client } from "@opensearch-project/opensearch"; import dayjs from "dayjs"; +import utc from "dayjs/plugin/utc"; import { isEligibleCommitForSimilarFailureCheck } from "lib/commitUtils"; import fetchIssuesByLabel from "lib/fetchIssuesByLabel"; import { @@ -12,8 +13,9 @@ import { MAX_SIZE, OLDEST_FIRST, querySimilarFailures } from "lib/searchUtils"; import { RecentWorkflowsData } from "lib/types"; import _ from "lodash"; import { Octokit } from "octokit"; -import { isDrCIEnabled, isPyTorchPyTorch } from "./bot/utils"; +import { isDrCIEnabled, isPyTorchPyTorch, isTime0, TIME_0 } from "./bot/utils"; import { IssueData } from "./types"; +dayjs.extend(utc); export const NUM_MINUTES = 30; export const REPO: string = "pytorch"; @@ -247,23 +249,16 @@ export async function hasSimilarFailures( // NB: Using the job completed_at timestamp has many false positives, so it's // better that we only enable this feature when the head commit timestamp is // available and use it as the end date - if ( - job.head_sha_timestamp === undefined || - job.head_sha_timestamp === null || - job.head_sha_timestamp === "" || - job.head_sha_timestamp === "0" - ) { + if (isTime0(job.head_sha_timestamp)) { return; } // NB: Use the commit timestamp here instead of the job timestamp to avoid using // the wrong end date when a PR is reverted and the job reruns - const endDate = dayjs(job.head_sha_timestamp); - const startDate = dayjs( - baseCommitDate !== "" && baseCommitDate !== "0" - ? baseCommitDate - : job.head_sha_timestamp - ).subtract(lookbackPeriodInHours, "hour"); + const endDate = dayjs.utc(job.head_sha_timestamp); + const startDate = dayjs + .utc(!isTime0(baseCommitDate) ? baseCommitDate : job.head_sha_timestamp) + .subtract(lookbackPeriodInHours, "hour"); if ( endDate.diff(startDate, "hour") > @@ -297,9 +292,10 @@ export async function hasSimilarFailures( let foundSimilarFailure; for (const record of records) { // Convert the result in JobData to RecentWorkflowsData used by Dr.CI + // TODO remove `as any` when CH migration is complete? const failure: RecentWorkflowsData = { - workflowId: record.workflowId, - id: record.id as string, + workflowId: record.workflowId as any as number, + id: record.id as any as number, jobName: record.jobName as string, name: record.name as string, conclusion: record.conclusion as string, @@ -308,9 +304,12 @@ export async function hasSimilarFailures( head_sha: record.sha as string, head_branch: record.branch as string, failure_captures: record.failureCaptures as string[], - failure_lines: record.failureLines, - failure_context: record.failureContext, + failure_lines: record.failureLines as string[], + failure_context: record.failureContext as string[], authorEmail: record.authorEmail, + workflowUniqueId: 0, + head_sha_timestamp: TIME_0, + pr_number: 0, }; const isEligibleCommit = await isEligibleCommitForSimilarFailureCheck( @@ -367,14 +366,9 @@ export function isInfraFlakyJob(job: RecentWorkflowsData): boolean { // was allowed to go through as flaky return ( job.conclusion === "failure" && - job.workflowId !== null && - job.workflowId !== undefined && - (job.failure_lines === null || - job.failure_lines === undefined || - job.failure_lines.join("") === "") && - (job.runnerName === null || - job.runnerName === undefined || - job.runnerName === "") + job.workflowId !== 0 && + (job.failure_lines.length == 0 || job.failure_lines.join("") === "") && + job.runnerName === "" ); } @@ -383,15 +377,13 @@ export async function isLogClassifierFailed( ): Promise { // Having no workflow ID means that this is a workflow run, not a workflow job. // We don't want to apply the log classifier check for a workflow run - if (job.workflowId === null || job.workflowId === undefined) { + if (job.workflowId === 0) { return false; } // This covers the case when there is no log on S3 or log classifier fails to triggered const hasFailureLines = - job.failure_lines !== null && - job.failure_lines !== undefined && - job.failure_lines.join("") !== ""; + job.failure_lines.length !== 0 && job.failure_lines.join("") !== ""; const hasLog = await hasS3Log(job); return job.conclusion === "failure" && (!hasFailureLines || !hasLog); @@ -404,7 +396,7 @@ export function isExcludedFromFlakiness(job: RecentWorkflowsData): boolean { _.find( EXCLUDED_FROM_FLAKINESS, (exclude: string) => - job.name !== undefined && + job.name !== "" && job.name.toLowerCase().includes(exclude.toLowerCase()) ) !== undefined ); @@ -433,11 +425,7 @@ export function getSuppressedLabels( job: RecentWorkflowsData, labels: string[] ): string[] { - if ( - job.jobName === undefined || - job.jobName === null || - !(job.jobName in SUPPRESSED_JOB_BY_LABELS) - ) { + if (job.jobName === "" || !(job.jobName in SUPPRESSED_JOB_BY_LABELS)) { return []; } @@ -447,7 +435,7 @@ export function getSuppressedLabels( export function isExcludedFromSimilarityPostProcessing( job: RecentWorkflowsData ): boolean { - if (job.failure_captures === null || job.failure_captures === undefined) { + if (job.failure_captures.length === 0) { return false; } diff --git a/torchci/lib/fetchIssuesByLabel.ts b/torchci/lib/fetchIssuesByLabel.ts index 489dab61b4..5ac8852b5e 100644 --- a/torchci/lib/fetchIssuesByLabel.ts +++ b/torchci/lib/fetchIssuesByLabel.ts @@ -1,7 +1,17 @@ import rocksetVersions from "rockset/prodVersions.json"; +import { queryClickhouseSaved } from "./clickhouse"; import getRocksetClient from "./rockset"; import { IssueData } from "./types"; +export async function fetchIssuesByLabelCH( + label: string +): Promise { + // Uses CH and doesn't gate on env var, for use with Dr. CI + return await queryClickhouseSaved("issue_query", { + label, + }); +} + export default async function fetchIssuesByLabel( label: string ): Promise { diff --git a/torchci/lib/fetchRecentWorkflows.ts b/torchci/lib/fetchRecentWorkflows.ts index 8bdac73be7..6841b562a4 100644 --- a/torchci/lib/fetchRecentWorkflows.ts +++ b/torchci/lib/fetchRecentWorkflows.ts @@ -1,6 +1,4 @@ -import rocksetVersions from "rockset/prodVersions.json"; -import { enableClickhouse, queryClickhouseSaved } from "./clickhouse"; -import getRocksetClient from "./rockset"; +import { queryClickhouseSaved } from "./clickhouse"; import { RecentWorkflowsData } from "./types"; export async function fetchRecentWorkflows( @@ -8,75 +6,18 @@ export async function fetchRecentWorkflows( prNumber: string = "0", numMinutes: string = "30" ): Promise { - if (enableClickhouse()) { - const res = await queryClickhouseSaved("recent_pr_workflows_query", { - numMinutes, - prNumber, - repo, - }); - for (const row of res) { - // Check for time 0 since CH uses default value - if (row["head_sha_timestamp"] == "1970-01-01 00:00:00.000000000") { - row["head_sha_timestamp"] = null; - } - if (row["completed_at"] == "1970-01-01 00:00:00.000000000") { - row["completed_at"] = null; - } - } - return res; - } - const rocksetClient = getRocksetClient(); - const recentWorkflowsQuery = - await rocksetClient.queryLambdas.executeQueryLambda( - "commons", - "recent_pr_workflows_query", - rocksetVersions.commons.recent_pr_workflows_query, - { - parameters: [ - { - name: "numMinutes", - type: "int", - value: numMinutes, - }, - { - name: "prNumber", - type: "int", - value: prNumber, - }, - { - name: "repo", - type: "string", - value: repo, - }, - ], - } - ); - return recentWorkflowsQuery.results ?? []; + const res = await queryClickhouseSaved("recent_pr_workflows_query", { + numMinutes, + prNumber, + repo, + }); + return res; } export async function fetchFailedJobsFromCommits( shas: string[] ): Promise { - if (enableClickhouse()) { - return await queryClickhouseSaved("commit_failed_jobs", { - shas, - }); - } - const rocksetClient = getRocksetClient(); - const commitFailedJobsQuery = - await rocksetClient.queryLambdas.executeQueryLambda( - "commons", - "commit_failed_jobs", - rocksetVersions.commons.commit_failed_jobs, - { - parameters: [ - { - name: "shas", - type: "string", - value: shas.join(","), - }, - ], - } - ); - return commitFailedJobsQuery.results ?? []; + return await queryClickhouseSaved("commit_failed_jobs", { + shas, + }); } diff --git a/torchci/lib/jobUtils.ts b/torchci/lib/jobUtils.ts index d69324891a..784ae2db19 100644 --- a/torchci/lib/jobUtils.ts +++ b/torchci/lib/jobUtils.ts @@ -11,6 +11,7 @@ import { import _, { isEqual } from "lodash"; import rocksetVersions from "rockset/prodVersions.json"; import TrieSearch from "trie-search"; +import { queryClickhouse } from "./clickhouse"; import getRocksetClient from "./rockset"; export const REMOVE_JOB_NAME_SUFFIX_REGEX = new RegExp( @@ -168,7 +169,7 @@ export function getDisabledTestIssues( job: RecentWorkflowsData, disabledTestIssues: IssueData[] ): IssueData[] { - if (!job.name || !job.failure_captures || job.failure_captures.length === 0) { + if (job.name == "" || job.failure_captures.length === 0) { return []; } @@ -205,7 +206,7 @@ export function getDisabledTestIssues( platformsMatch.groups.platforms .split(",") .map((platform) => platform.trim()), - (platform) => job.name!.includes(platform) + (platform) => job.name.includes(platform) ); }); @@ -370,54 +371,40 @@ export async function isSameAuthor( export async function getPRMergeCommits( owner: string, repo: string, - prNumber: number -): Promise { + prNumbers: number[] +): Promise> { // Sort by comment ID desc because we don't want to depend on _event_time in // general const query = ` SELECT + pr_num, merge_commit_sha, FROM - commons.merges + default.merges WHERE - pr_num = :pr_num - AND owner = :owner - AND project = :project + pr_num in {pr_nums: Array(Int64)} + AND owner = {owner: String} + AND project = {project: String} AND merge_commit_sha != '' ORDER BY comment_id DESC `; - const rocksetClient = getRocksetClient(); - const results = ( - await rocksetClient.queries.query({ - sql: { - query: query, - parameters: [ - { - name: "pr_num", - type: "int", - value: prNumber.toString(), - }, - { - name: "owner", - type: "string", - value: owner, - }, - { - name: "project", - type: "string", - value: repo, - }, - ], - }, - }) - ).results; + const results = await queryClickhouse(query, { + pr_nums: prNumbers, + owner, + project: repo, + }); + + // If the array is empty, the PR hasn't been merged yet + return results.reduce((acc: { [prNumber: number]: string[] }, row: any) => { + if (!acc[row.pr_num]) { + acc[row.pr_num] = []; + } - // If the result is empty, the PR hasn't been merged yet - return results !== undefined - ? _.map(results, (record) => record.merge_commit_sha) - : []; + acc[row.pr_num].push(row.merge_commit_sha); + return acc; + }, new Map()); } export function isFailureFromPrevMergeCommit( diff --git a/torchci/lib/types.ts b/torchci/lib/types.ts index ef18c549d5..5f22b077db 100644 --- a/torchci/lib/types.ts +++ b/torchci/lib/types.ts @@ -34,23 +34,24 @@ export interface JobData extends BasicJobData { // Used by Dr.CI export interface RecentWorkflowsData extends BasicJobData { // only included if this is a job and not a workflow, if it is a workflow, the name is in the name field - workflowId?: string; + name: string; // In BasicJobData, but required here + workflowId: number; // Each workflow file has an id. In rockset this is workflow_run.workflow_id. // This can be used to group normal workflows (ex trunk) and those that failed // to run (ex .github/workflows/trunk.yml) together even when they have // different names. - workflowUniqueId?: number; - jobName?: string; - id: string; - completed_at: string | null; + workflowUniqueId: number; + jobName: string; + id: number; + completed_at: string; html_url: string; head_sha: string; - head_sha_timestamp?: string; - head_branch?: string | null; - pr_number?: number; + head_sha_timestamp: string; + head_branch: string; + pr_number: number; failure_captures: string[]; - failure_lines?: string[] | null; - failure_context?: string[] | null; + failure_lines: string[]; + failure_context: string[]; } export interface Artifact { diff --git a/torchci/pages/api/drci/drci.ts b/torchci/pages/api/drci/drci.ts index 097219cce1..b57bfcb7fc 100644 --- a/torchci/pages/api/drci/drci.ts +++ b/torchci/pages/api/drci/drci.ts @@ -1,5 +1,8 @@ import { PutObjectCommand } from "@aws-sdk/client-s3"; -import { fetchJSON } from "lib/bot/utils"; +import dayjs from "dayjs"; +import utc from "dayjs/plugin/utc"; +import { fetchJSON, isTime0 } from "lib/bot/utils"; +import { queryClickhouse } from "lib/clickhouse"; import { CANCELLED_STEP_ERROR, fetchIssueLabels, @@ -20,7 +23,7 @@ import { OWNER, } from "lib/drciUtils"; import { fetchCommitTimestamp } from "lib/fetchCommit"; -import fetchIssuesByLabel from "lib/fetchIssuesByLabel"; +import { fetchIssuesByLabelCH } from "lib/fetchIssuesByLabel"; import fetchPR from "lib/fetchPR"; import { fetchFailedJobsFromCommits, @@ -40,13 +43,12 @@ import { removeCancelledJobAfterRetry, removeJobNameSuffix, } from "lib/jobUtils"; -import getRocksetClient from "lib/rockset"; import { getS3Client } from "lib/s3"; import { IssueData, PRandJobs, RecentWorkflowsData } from "lib/types"; import _ from "lodash"; import type { NextApiRequest, NextApiResponse } from "next"; import { Octokit } from "octokit"; - +dayjs.extend(utc); export interface FlakyRule { name: string; captures: string[]; @@ -99,15 +101,20 @@ export async function updateDrciComments( ); const head = get_head_branch(repo); await addMergeBaseCommits(octokit, repo, head, workflowsByPR); - const sevs = getActiveSEVs(await fetchIssuesByLabel("ci: sev")); + const sevs = getActiveSEVs(await fetchIssuesByLabelCH("ci: sev")); const flakyRules: FlakyRule[] = (await fetchJSON(FLAKY_RULES_JSON)) || []; - const unstableIssues: IssueData[] = await fetchIssuesByLabel("unstable"); - const disabledTestIssues: IssueData[] = await fetchIssuesByLabel("skipped"); + const unstableIssues: IssueData[] = await fetchIssuesByLabelCH("unstable"); + const disabledTestIssues: IssueData[] = await fetchIssuesByLabelCH("skipped"); const baseCommitJobs = await getBaseCommitJobs(workflowsByPR); const existingDrCiComments = await getExistingDrCiComments( `${OWNER}/${repo}`, workflowsByPR ); + const prMergeCommits = await getPRMergeCommits( + OWNER, + repo, + Array.from(workflowsByPR.keys()) + ); // Return the list of all failed jobs grouped by their classification const failures: { [pr: number]: { [cat: string]: RecentWorkflowsData[] } } = @@ -117,11 +124,7 @@ export async function updateDrciComments( workflowsByPR, async (pr_info: PRandJobs) => { // Find the merge commits of the PR to check if it has already been merged before - const mergeCommits = await getPRMergeCommits( - OWNER, - repo, - pr_info.pr_number - ); + const mergeCommits = prMergeCommits.get(pr_info.pr_number) || []; const labels = await fetchIssueLabels( octokit, @@ -270,43 +273,28 @@ select from merge_bases where - ARRAY_CONTAINS(SPLIT(:shas, ','), sha) - and merge_base_commit_date is not null - and repo = :repo + sha in {shas: Array(String)} + and merge_base_commit_date != 0 + and repo = {repo: String} `; - const rocksetClient = getRocksetClient(); const s3client = getS3Client(); - const rocksetMergeBases = new Map( + const chMergeBases = new Map( ( - await rocksetClient.queries.query({ - sql: { - query: mergeBasesQuery, - parameters: [ - { - name: "shas", - type: "string", - value: Array.from(workflowsByPR.values()) - .map((v) => v.head_sha) - .join(","), - }, - { - name: "repo", - type: "string", - value: `${OWNER}/${repo}`, - }, - ], - }, + await queryClickhouse(mergeBasesQuery, { + shas: Array.from(workflowsByPR.values()).map((v) => v.head_sha), + repo: `${OWNER}/${repo}`, }) - ).results?.map((v) => [v.head_sha, v]) + )?.map((v) => [v.head_sha, v]) ); await forAllPRs( workflowsByPR, async (pr_info: PRandJobs) => { - const rocksetMergeBase = rocksetMergeBases.get(pr_info.head_sha); - if (rocksetMergeBase === undefined) { - // Not found in rockset, ask github instead, then put into rockset + const chMergeBase = chMergeBases.get(pr_info.head_sha); + if (chMergeBase === undefined) { + // Not found on CH, ask github instead, then put into dynamo, which will + // get synced with CH const diff = await octokit.rest.repos.compareCommits({ owner: OWNER, repo: repo, @@ -329,7 +317,7 @@ where sha: pr_info.head_sha, merge_base: pr_info.merge_base, changed_files: diffWithMergeBase.data.files?.map((e) => e.filename), - merge_base_commit_date: pr_info.merge_base_date ?? "", + merge_base_commit_date: pr_info.merge_base_date, repo: `${OWNER}/${repo}`, _id: `${OWNER}-${repo}-${pr_info.head_sha}`, }; @@ -345,8 +333,8 @@ where console.error("Failed to upload to S3", e); } } else { - pr_info.merge_base = rocksetMergeBase.merge_base; - pr_info.merge_base_date = rocksetMergeBase.merge_base_commit_date; + pr_info.merge_base = chMergeBase.merge_base; + pr_info.merge_base_date = chMergeBase.merge_base_commit_date; } }, // NB (huydhn): This function couldn't find merge base for ghstack PR and @@ -383,8 +371,8 @@ export async function getBaseCommitJobs( if (!jobsBySha.has(job.head_sha)) { jobsBySha.set(job.head_sha, new Map()); } - const existing_job = jobsBySha.get(job.head_sha).get(job.name!); - if (!existing_job || existing_job.id < job.id!) { + const existing_job = jobsBySha.get(job.head_sha).get(job.name); + if (!existing_job || existing_job.id < job.id) { // if rerun, choose the job with the larger id as that is more recent jobsBySha.get(job.head_sha).set(job.name, job); } @@ -423,34 +411,22 @@ async function getExistingDrCiComments( select id, body, - issue_url, + issue_url from - commons.issue_comment i + default.issue_comment final where - i.body like '%%' - and ARRAY_CONTAINS(SPLIT(:prUrls, ','), issue_url) + body like '%%' + and issue_url in {prUrls: Array(String)} `; - const rocksetClient = getRocksetClient(); return new Map( ( - await rocksetClient.queries.query({ - sql: { - query: existingCommentsQuery, - parameters: [ - { - name: "prUrls", - type: "string", - value: Array.from(workflowsByPR.keys()) - .map( - (prNumber) => - `https://api.github.com/repos/${repoFullName}/issues/${prNumber}` - ) - .join(","), - }, - ], - }, + await queryClickhouse(existingCommentsQuery, { + prUrls: Array.from(workflowsByPR.keys()).map( + (prNumber) => + `https://api.github.com/repos/${repoFullName}/issues/${prNumber}` + ), }) - ).results?.map((v) => [ + )?.map((v) => [ parseInt(v.issue_url.split("/").pop()), { id: parseInt(v.id), body: v.body }, ]) @@ -467,9 +443,9 @@ function constructResultsJobsSections( jobs: RecentWorkflowsData[], suggestion?: string, collapsed: boolean = false, - relatedJobs: Map = new Map(), - relatedIssues: Map = new Map(), - relatedInfo: Map = new Map() + relatedJobs: Map = new Map(), + relatedIssues: Map = new Map(), + relatedInfo: Map = new Map() ): string { if (jobs.length === 0) { return ""; @@ -485,7 +461,7 @@ function constructResultsJobsSections( output += "

\n\n"; // Two newlines are needed for bullts below to be formattec correctly const hudPrUrl = `${hudBaseUrl}/pr/${owner}/${repo}/${prNumber}`; - const jobsSorted = jobs.sort((a, b) => a.name!.localeCompare(b.name!)); + const jobsSorted = jobs.sort((a, b) => a.name.localeCompare(b.name)); for (const job of jobsSorted) { output += `* [${job.name}](${hudPrUrl}#${job.id}) ([gh](${job.html_url}))`; @@ -553,9 +529,9 @@ export function constructResultsComment( flakyJobs: RecentWorkflowsData[], brokenTrunkJobs: RecentWorkflowsData[], unstableJobs: RecentWorkflowsData[], - relatedJobs: Map, - relatedIssues: Map, - relatedInfo: Map, + relatedJobs: Map, + relatedIssues: Map, + relatedInfo: Map, sha: string, merge_base: string, merge_base_date: string, @@ -645,7 +621,7 @@ export function constructResultsComment( output += title; output += `\nAs of commit ${sha} with merge base ${merge_base}`; - const timestamp = Math.floor(new Date(merge_base_date).valueOf() / 1000); + const timestamp = dayjs.utc(merge_base_date).unix(); if (!isNaN(timestamp)) { output += ` (image)`; } @@ -764,17 +740,14 @@ function isFlaky( const jobNameRegex = new RegExp(flakyRule.name); return ( - job.name!.match(jobNameRegex) && + job.name.match(jobNameRegex) && flakyRule.captures.every((capture: string) => { const captureRegex = new RegExp(capture); - const matchFailureCaptures: boolean = - job.failure_captures && - job.failure_captures.some((failureCapture) => - failureCapture.match(captureRegex) - ); + const matchFailureCaptures: boolean = job.failure_captures.some( + (failureCapture) => failureCapture.match(captureRegex) + ); const matchFailureLine: boolean = - job.failure_lines != null && - job.failure_lines[0] != null && + job.failure_lines.length > 0 && job.failure_lines[0].match(captureRegex) != null; // Accept both failure captures array and failure line string to make sure @@ -789,7 +762,7 @@ function getTrunkFailure( job: RecentWorkflowsData, baseJobs: Map ): RecentWorkflowsData | undefined { - const jobNameNoSuffix = removeJobNameSuffix(job.name!); + const jobNameNoSuffix = removeJobNameSuffix(job.name); // This job doesn't exist in the base commit, thus not a broken trunk failure if (!baseJobs.has(jobNameNoSuffix)) { @@ -815,9 +788,9 @@ export async function getWorkflowJobsStatuses( flakyJobs: RecentWorkflowsData[]; brokenTrunkJobs: RecentWorkflowsData[]; unstableJobs: RecentWorkflowsData[]; - relatedJobs: Map; - relatedIssues: Map; - relatedInfo: Map; + relatedJobs: Map; + relatedIssues: Map; + relatedInfo: Map; }> { let pending = 0; const preprocessFailedJobs: RecentWorkflowsData[] = []; @@ -828,36 +801,25 @@ export async function getWorkflowJobsStatuses( // This map holds the list of the base failures for broken trunk jobs or the similar // failures for flaky jobs - const relatedJobs: Map = new Map(); - // And this holds the string pointing to the associated unstable issue that disables a job - const relatedIssues: Map = new Map(); + const relatedJobs: Map = new Map(); + // Maps job id -> associated unstable issue that disables a job + const relatedIssues: Map = new Map(); // Any additional information about the job classification can be kept here - const relatedInfo: Map = new Map(); + const relatedInfo: Map = new Map(); for (const job of prInfo.jobs) { - if ( - (job.conclusion === undefined || - job.conclusion === null || - job.conclusion === "") && - (job.completed_at === undefined || - job.completed_at === null || - job.completed_at === "" || - job.completed_at === "1970-01-01 00:00:00.000000000") // Time 0 - ) { + if (job.conclusion === "" && isTime0(job.completed_at)) { pending++; } else if (job.conclusion === "failure" || job.conclusion === "cancelled") { const suppressedLabels = await getSuppressedLabels(job, labels); - if ( - prInfo.repo === "pytorch" && - suppressedLabels && - suppressedLabels.length !== 0 - ) { + if (prInfo.repo === "pytorch" && suppressedLabels.length !== 0) { flakyJobs.push(job); relatedInfo.set(job.id, `suppressed by ${suppressedLabels.join(", ")}`); continue; } - if (isUnstableJob(job, unstableIssues)) { + // TODO: remove the `as any` cast when CH migration is complete + if (isUnstableJob(job as any, unstableIssues)) { unstableJobs.push(job); relatedIssues.set( job.id, @@ -909,7 +871,6 @@ export async function getWorkflowJobsStatuses( disabledTestIssues ); if ( - matchDisabledTestIssues !== undefined && matchDisabledTestIssues.length !== 0 && isRecentlyCloseDisabledTest( matchDisabledTestIssues, @@ -937,7 +898,6 @@ export async function getWorkflowJobsStatuses( } if ( - matchDisabledTestIssues !== undefined && matchDisabledTestIssues.length !== 0 && isDisabledTest(matchDisabledTestIssues) ) { @@ -1049,15 +1009,15 @@ export async function reorganizeWorkflows( const headShaTimestamps: Map = new Map(); for (const workflow of recentWorkflows) { - const prNumber = workflow.pr_number!; + const prNumber = workflow.pr_number; if (!workflowsByPR.has(prNumber)) { let headShaTimestamp = workflow.head_sha_timestamp; - // NB: The head SHA timestamp is currently used as the end date when searching - // for similar failures. However, it's not available on Rockset for commits - // from forked PRs before a ciflow ref is pushed. In such case, the head SHA - // timestamp will be undefined and we will make an additional query to GitHub - // to get the value - if (octokit && !headShaTimestamp) { + // NB: The head SHA timestamp is currently used as the end date when + // searching for similar failures. However, it's not available on CH for + // commits from forked PRs before a ciflow ref is pushed. In such case, + // the head SHA timestamp will be undefined and we will make an additional + // query to GitHub to get the value + if (octokit && isTime0(headShaTimestamp)) { headShaTimestamp = await fetchCommitTimestamp( octokit, owner, @@ -1094,7 +1054,11 @@ export async function reorganizeWorkflows( } const headShaTimestamp = headShaTimestamps.get(workflow.head_sha); - if (!workflow.head_sha_timestamp && headShaTimestamp) { + if ( + isTime0(workflow.head_sha_timestamp) && + headShaTimestamp && + !isTime0(headShaTimestamp) + ) { workflow.head_sha_timestamp = headShaTimestamp; } @@ -1106,16 +1070,16 @@ export async function reorganizeWorkflows( for (const [, prInfo] of workflowsByPR) { const [workflows, jobs] = _.partition( prInfo.jobs, - (job) => job.workflowId === null || job.workflowId === undefined + (job) => job.workflowId === 0 ); - // Get most recent workflow run based on workflowUniqueId (workflow_id in rockset) + // Get most recent workflow run based on workflowUniqueId (workflow_id in webhooks) const recentWorkflows: Map = new Map(); for (const workflow of workflows) { // Check that this is a workflow, not a job - const workflowUniqueId = workflow.workflowUniqueId!; + const workflowUniqueId = workflow.workflowUniqueId; const existingWorkflowId = recentWorkflows.get(workflowUniqueId)?.id; - if (!existingWorkflowId || existingWorkflowId! < workflow.id!) { + if (!existingWorkflowId || existingWorkflowId < workflow.id) { recentWorkflows.set(workflowUniqueId, workflow); } } @@ -1131,7 +1095,7 @@ export async function reorganizeWorkflows( // This belongs to an older run of the workflow continue; } - const key = job.name!; + const key = job.name; const existing_job = removeRetries.get(key); if (!existing_job || existing_job.id < job.id!) { removeRetries.set(key, job); From b21443c1a55dbded0048e0dad62b134f97133a55 Mon Sep 17 00:00:00 2001 From: Catherine Lee Date: Mon, 9 Sep 2024 15:11:26 -0700 Subject: [PATCH 2/2] tc --- torchci/lib/bot/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchci/lib/bot/utils.ts b/torchci/lib/bot/utils.ts index ce748b1e21..7c928f55a3 100644 --- a/torchci/lib/bot/utils.ts +++ b/torchci/lib/bot/utils.ts @@ -4,7 +4,7 @@ import { Context, Probot } from "probot"; import urllib from "urllib"; export function isTime0(time: string): boolean { - return dayjs.utc(time) == dayjs.unix(0) + return dayjs.utc(time).valueOf() == 0; } export const TIME_0 = "1970-01-01 00:00:00.000000000";