Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ch] Random helper scripts #5641

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions tools/rockset_migration/compare_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
Helper script to compare dynamo keys present between Rockset and Clickhouse, and
upload missing keys to Clickhouse if any are missing
"""
from argparse import ArgumentParser
from functools import lru_cache
from typing import Any, List
import os
import rockset

from dynamo2ch import (
ADAPTERS,
get_dynamo_client,
unmarshal,
upload_to_clickhouse,
get_clickhouse_client,
)


CLICKHOUSE_ENDPOINT = os.environ.get("CLICKHOUSE_ENDPOINT", "localhost")
CLICKHOUSE_USERNAME = os.environ.get("CLICKHOUSE_USERNAME", "default")
CLICKHOUSE_PASSWORD = os.environ.get("CLICKHOUSE_PASSWORD", "default")


@lru_cache
def get_rockset_client():
return rockset.RocksetClient(
host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)


CLICKHOUSE_TABLE_TO_DYNAMO_TABLE = {
"issue_comment": "torchci-issue-comment",
"issues": "torchci-issues",
"pull_request": "torchci-pull-request",
"push": "torchci-push",
"workflow_run": "torchci-workflow-run",
"workflow_job": "torchci-workflow-job",
"pull_request_review": "torchci-pull-request-review",
"pull_request_review_comment": "torchci-pull-request-review-comment",
}


def insert_missing_keys(ch_table: str, keys: List[str]):
records = []
for key in keys:
res = get_dynamo_client().query(
TableName=CLICKHOUSE_TABLE_TO_DYNAMO_TABLE[ch_table],
KeyConditionExpression="dynamoKey = :dynamoKey",
ExpressionAttributeValues={":dynamoKey": {"S": key}},
)

body = unmarshal({"M": res["Items"][0]})
body = ADAPTERS.get(CLICKHOUSE_TABLE_TO_DYNAMO_TABLE[ch_table], lambda x: x)(
body
)
records.append(body)
upload_to_clickhouse(records, ch_table)


def parse_args() -> Any:
parser = ArgumentParser("Copy dynamoDB table to ClickHouse")
parser.add_argument(
"--table",
type=str,
required=True,
help="the names of the tables to cmopare",
)

return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
clickhouse_ids = []
with get_clickhouse_client().query_rows_stream(
f"select dynamoKey from {args.table} final"
) as stream:
count = 0
for s in stream:
count += 1
clickhouse_ids.append(s[0])

rockset_ids = []
for rockset_id in (
get_rockset_client().sql(f"select dynamoKey from {args.table}").results
):
rockset_ids.append(rockset_id["dynamoKey"])

print(
f"ClickHouse has {len(clickhouse_ids)} rows, {len(set(clickhouse_ids))} unique keys, "
f"num dups: {len(clickhouse_ids) - len(set(clickhouse_ids))}\n"
f"Rockset has {len(rockset_ids)} rows, {len(set(rockset_ids))} unique keys, "
f"num dups: {len(rockset_ids) - len(set(rockset_ids))}\n"
f"Unique keys, clickhouse - rockset: {len(set(clickhouse_ids)) - len(set(rockset_ids))}"
)

# difference = set(clickhouse_ids) - set(rockset_ids)
# for key in difference:
# print(f"Key {key} in ClickHouse but not in Rockset")

other_difference = set(rockset_ids) - set(clickhouse_ids)
# for key in other_difference:
# print(f"Key {key} in Rockset but not in ClickHouse")
insert_missing_keys(args.table, list(other_difference))
190 changes: 190 additions & 0 deletions tools/rockset_migration/create_clickhouse_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""
Use this to generate a Clickhouse schema from a Rockset table. It still requires
some manual work to verify and fill in some types if the script cannot infer
them.
"""
import re
from typing import Dict, List
from rockset_queries import get_query_lambdas
from torchci.rockset_utils import query_rockset


class Field:
full_name: List[str]
type: List[str]
used: bool = False
nullable: bool = False
nested_fields: Dict[str, "Field"]

def short_name(self):
return ".".join(self.full_name)

def __init__(self, full_name: List[str]):
self.full_name = full_name
self.type = []
self.nested_fields = {}

def __str__(self):
return f"{self.short_name()} {self.type} {self.nullable} {self.used} {self.nested_fields}"

def __repr__(self):
return str(self)

def get_clickhouse_type(self, allow_nullable=False):
rockset_type_to_clickhouse_type_map = {
"string": "String",
"int": "Int64",
"object": "Tuple",
"array": "Array",
"bool": "Bool",
"datetime": "DateTime64(9)",
"float": "Float32",
}
if len(self.type) == 0:
return "InvalidType"
types = []
for type in self.type:
clickhouse_type = rockset_type_to_clickhouse_type_map.get(type)
if clickhouse_type is None:
clickhouse_type = f"InvalidType {type}"
elif clickhouse_type == "Array":
if "*" not in self.nested_fields:
clickhouse_type = "Array(InvalidType cannot find * nested field)"
else:
clickhouse_type = (
f"Array({self.nested_fields['*'].get_clickhouse_type()})"
)
elif clickhouse_type == "Tuple":
children_types = ", ".join(
[
f"{f.full_name[-1]} {f.get_clickhouse_type()}"
for f in self.nested_fields.values()
]
)
clickhouse_type = f"Tuple({children_types})"
types.append(clickhouse_type)
if len(types) > 1:
final_type = f"Variant({', '.join(types)})"
else:
final_type = types[0]
if self.nullable and allow_nullable:
return f"Nullable({final_type})"
return final_type


def get_rockset_schema(table_name: str, allow_nullable=False):
"""
Query the Rockset API to get the schema of a table. Returns a dictionary of the form
field: type
where field is formatted as a dot-separated string of the field names if it is a nested field.
For example:
{
"name": ["string"],
"actor.id": ["int"],
}

"""
schema = query_rockset(f"describe {table_name}")
schema_as_dict = {}
for row in schema:
field = Field(row["field"])
if field.short_name() in schema_as_dict:
field = schema_as_dict[field.short_name()]
else:
schema_as_dict[field.short_name()] = field

if row["type"] == "null":
field.nullable = True
else:
field.type.append(row["type"])

return schema_as_dict


def get_table_field_usages(table_name: str):
lambdas = get_query_lambdas()
fields = set()
for lambda_name, lambda_info in lambdas.items():
try:
if lambda_name in [
"commons.flaky_test_history",
"commons.flaky_tests_per_job",
"commons.original_pr_hud_query",
]:
continue
sql = lambda_info.sql["query"]
params = {}
for param in lambda_info.sql["default_parameters"]:
typee = param["type"]
val = param["value"]
name = param["name"]
if typee == "int":
val = int(val)
params[name] = val
if len(params) == 0:
params = None
if table_name in sql:
explain = query_rockset(f"explain {sql}", params=params)[0]["EXPLAIN"]
for line in explain.split("\n"):
if f"{table_name}:" not in line:
continue
field_match = re.findall(r"=([\w\.]+)", line)
for f in field_match:
fields.add(f)
except Exception as e:
print(e)
print(lambda_name)
print(sql)
print(params)
print(table_name)
raise e

# Remove all _event_time fields
fields = {f for f in fields if not f.endswith("_event_time")}
return fields


def nest_fields(schema_as_dict: Dict[str, Field]):
nested: Dict[str, Field] = {}
for field in schema_as_dict.values():
if len(field.full_name) == 1:
nested[field.full_name[0]] = field
else:
curr = nested
for i, parent in enumerate(field.full_name[:-1]):
if parent not in curr:
curr[parent] = Field(field.full_name[: i + 1])
curr[parent].type.append("object")
curr = curr[parent].nested_fields
curr[field.full_name[-1]] = field
return nested


def gen_schema(fields: Dict[str, Field], allow_nullable=False):
schema = []
for field in fields.values():
schema.append(
f"`{field.short_name()}` {field.get_clickhouse_type(allow_nullable)}"
)
return ",\n".join(schema)


def parse_args():
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("table_name", type=str, help="The name of the table in Rockset")
parser.add_argument(
"--allow-nullable", action="store_true", help="Allow nullable fields"
)
return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
table_name = args.table_name
used_fields = get_table_field_usages(table_name)
schema_as_dict = get_rockset_schema(table_name)

nested_fields = nest_fields(schema_as_dict)
print(gen_schema(nested_fields, args.allow_nullable))
Loading
Loading