Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sdg pipeline #2496

Merged
merged 6 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cli/monitoring/components/custom_preprocessing/src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dateutil import parser
from pyspark.sql import SparkSession


def init_spark():
"""Get or create spark session."""
spark = SparkSession.builder.appName("AccessParquetFiles").getOrCreate()
Expand Down Expand Up @@ -68,7 +69,7 @@ def preprocess(
table = table.extract_columns_from_partition_format(partitionFormat)

# Filter on partitionFormat based on user data window
filterStr = f"PartitionDate >= datetime({start_datetime.year}, {start_datetime.month}, {start_datetime.day}, {start_datetime.hour}) and PartitionDate <= datetime({end_datetime.year}, {end_datetime.month}, {end_datetime.day}, {end_datetime.hour})" # noqa
filterStr = f"PartitionDate >= datetime({start_datetime.year}, {start_datetime.month}, {start_datetime.day}, {start_datetime.hour}) and PartitionDate <= datetime({end_datetime.year}, {end_datetime.month}, {end_datetime.day}, {end_datetime.hour})" # noqa
table = table.filter(filterStr)

# Data column is a list of objects, convert it into string because spark.read_json cannot read object
Expand Down
36 changes: 23 additions & 13 deletions cli/monitoring/components/custom_signal/src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import stddev


def init_spark():
"""Get or create spark session."""
spark = SparkSession.builder.appName("AccessParquetFiles").getOrCreate()
Expand All @@ -23,7 +24,9 @@ def read_mltable_in_spark(mltable_path: str):

def save_spark_df_as_mltable(metrics_df, folder_path: str):
"""Save spark dataframe as mltable."""
metrics_df.write.option("output_format", "parquet").option('overwrite', True).mltable(folder_path)
metrics_df.write.option("output_format", "parquet").option(
"overwrite", True
).mltable(folder_path)


def _create_output_dataframe(data):
Expand All @@ -46,27 +49,32 @@ def _create_row(metric, group, group_pivot, value, threshold):
"group": group,
"metric_value": value,
"threshold_value": threshold,
"group_pivot": group_pivot
"group_pivot": group_pivot,
}


def _compute_max_standard_deviation(df, std_deviation_threshold: float):
standard_deviations = df.agg(*[stddev(column).alias(column) for column in df.columns]).collect()[0]
standard_deviations = df.agg(
*[stddev(column).alias(column) for column in df.columns]
).collect()[0]

rows = []
for feature in df.columns:

if feature == None:
continue

rows.append(_create_row(
metric="MaxStandardDeviation",
group=feature,
group_pivot="",
value=standard_deviations[feature],
threshold=std_deviation_threshold))
rows.append(
_create_row(
metric="MaxStandardDeviation",
group=feature,
group_pivot="",
value=standard_deviations[feature],
threshold=std_deviation_threshold,
)
)

return _create_output_dataframe(rows)
return _create_output_dataframe(rows)


def run():
Expand All @@ -80,11 +88,13 @@ def run():

df = read_mltable_in_spark(args.production_data)

signal_metrics = _compute_max_standard_deviation(df, float(args.std_deviation_threshold))
signal_metrics = _compute_max_standard_deviation(
df, float(args.std_deviation_threshold)
)
signal_metrics.show()

save_spark_df_as_mltable(signal_metrics, args.signal_metrics)


if __name__ == "__main__":
run()
run()
33 changes: 17 additions & 16 deletions tutorials/get-started-notebooks/explore-data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@
"\n",
"The next notebook cell creates the data asset. The code sample uploads the raw data file to the designated cloud storage resource. \n",
"\n",
"Each time you create a data asset, you need a unique version for it. If the version already exists, you'll get an error. In this code, we're using time to generate a unique version each time the cell is run.\n",
"Each time you create a data asset, you need a unique version for it. If the version already exists, you'll get an error. In this code, we're using the \"initial\" for the first read of the data. If that version already exists, we'll skip creating it again.\n",
"\n",
"You can also omit the **version** parameter, and a version number is generated for you, starting with 1 and then incrementing from there. In this tutorial, we want to refer to specific version numbers, so we create a version number instead."
"You can also omit the **version** parameter, and a version number is generated for you, starting with 1 and then incrementing from there. \n",
"\n",
"In this tutorial, we use the name \"initial\" as the first version. The [Create production machine learning pipelines](pipeline.ipynb) tutorial will also use this version of the data, so here we are using a value that you'll see again in that tutorial."
]
},
{
Expand All @@ -191,15 +193,13 @@
"source": [
"from azure.ai.ml.entities import Data\n",
"from azure.ai.ml.constants import AssetTypes\n",
"import time\n",
"\n",
"# update the 'my_path' variable to match the location of where you downloaded the data on your\n",
"# local filesystem\n",
"\n",
"my_path = \"./data/default_of_credit_card_clients.csv\"\n",
"# set the version number of the data asset to the current UTC time\n",
"v1 = time.strftime(\"%Y.%m.%d.%H%M%S\", time.gmtime())\n",
"\n",
"# set the version number of the data asset\n",
"v1 = \"initial\"\n",
"\n",
"my_data = Data(\n",
" name=\"credit-card\",\n",
Expand All @@ -209,10 +209,15 @@
" type=AssetTypes.URI_FILE,\n",
")\n",
"\n",
"# create data asset\n",
"ml_client.data.create_or_update(my_data)\n",
"\n",
"print(f\"Data asset created. Name: {my_data.name}, version: {my_data.version}\")"
"## create data asset if it doesn't already exist:\n",
"try:\n",
" data_asset = ml_client.data.get(name=\"credit-card\", version=v1)\n",
" print(\n",
" f\"Data asset already exists. Name: {my_data.name}, version: {my_data.version}\"\n",
" )\n",
"except:\n",
" ml_client.data.create_or_update(my_data)\n",
" print(f\"Data asset created. Name: {my_data.name}, version: {my_data.version}\")"
]
},
{
Expand Down Expand Up @@ -398,11 +403,7 @@
"|X18-23 | Explanatory | Amount of previous payment (NT dollar) from April to September 2005. |\n",
"|Y | Response | Default payment (Yes = 1, No = 0) |\n",
"\n",
"Next, create a new _version_ of the data asset (the data automatically uploads to cloud storage):\n",
"\n",
"> [!NOTE]\n",
">\n",
"> This Python code cell sets **name** and **version** values for the data asset it creates. As a result, the code in this cell will fail if executed more than once, without a change to these values. Fixed **name** and **version** values offer a way to pass values that work for specific situations, without concern for auto-generated or randomly-generated values.\n"
"Next, create a new _version_ of the data asset (the data automatically uploads to cloud storage). For this version, we'll add a time value, so that each time this code is run, a different version number will be created.\n"
]
},
{
Expand All @@ -429,7 +430,7 @@
"import time\n",
"\n",
"# Next, create a new *version* of the data asset (the data is automatically uploaded to cloud storage):\n",
"v2 = v1 + \"_cleaned\"\n",
"v2 = \"cleaned\" + time.strftime(\"%Y.%m.%d.%H%M%S\", time.gmtime())\n",
"my_path = \"./data/cleaned-credit-card.parquet\"\n",
"\n",
"# Define the data asset, and use tags to make it clear the asset can be used in training\n",
Expand Down
54 changes: 13 additions & 41 deletions tutorials/get-started-notebooks/pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
"\n",
"* If you're seeing this notebook elsewhere, complete [Create resources you need to get started](https://docs.microsoft.com/azure/machine-learning/quickstart-create-resources) to create an Azure Machine Learning workspace and a compute instance.\n",
"\n",
"* Complete the tutorial [Tutorial: Upload, access and explore your data](explore-data.ipynb) to create the data asset you need in this tutorial. Make sure you run all the code to create the initial data asset. You can optionally explore the data and revise it, but you'll only need the initial data to complete this tutorial.\n",
"\n",
"## Set your kernel\n",
"\n",
"* If your compute instance is stopped, start it now. \n",
Expand Down Expand Up @@ -120,45 +122,18 @@
"> [!NOTE]\n",
"> Creating MLClient will not connect to the workspace. The client initialization is lazy, it will wait for the first time it needs to make a call (this will happen when creating the `credit_data` data asset, two code cells from here).\n",
"\n",
"## Register data from an external url\n",
"## Access the registered data asset\n",
"\n",
"If you have been following along with the other tutorials in this series and already registered the data, you can fetch the same dataset from the workspace using `credit_dataset = ml_client.data.get(\"<DATA ASSET NAME>\", version='<VERSION>')`. Then you may skip this section. To learn about data more in depth or if you would rather complete the data tutorial first, see [Upload, access and explore your data in Azure Machine Learning](https://learn.microsoft.com/azure/machine-learning/tutorial-explore-data).\n",
"Start by getting the data that you previously registered in [Tutorial: Upload, access and explore your data](explore-data.ipynb).\n",
"\n",
"* Azure Machine Learning uses a `Data` object to register a reusable definition of data, and consume data within a pipeline. In the next section, you consume some data from web url as one example. Data from other sources can be created as well. `Data` assets from other sources can be created as well.\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "credit_data"
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import Data\n",
"from azure.ai.ml.constants import AssetTypes\n",
"\n",
"web_path = \"https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls\"\n",
"\n",
"credit_data = Data(\n",
" name=\"creditcard_defaults\",\n",
" path=web_path,\n",
" type=AssetTypes.URI_FILE,\n",
" description=\"Dataset for credit card defaults\",\n",
" tags={\"source_type\": \"web\", \"source\": \"UCI ML Repo\"},\n",
" version=\"1.0.0\",\n",
")"
"* Azure Machine Learning uses a `Data` object to register a reusable definition of data, and consume data within a pipeline. In the next section, you consume some data from web url as one example. `Data` assets from other sources can be created as well."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This code just created a `Data` asset, ready to be consumed as an input by the pipeline that you'll define in the next sections. In addition, you can register the data to your workspace so it becomes reusable across pipelines.\n",
"\n",
"Since this is the first time that you're making a call to the workspace, you may be asked to authenticate. Once the authentication is complete, you then see the dataset registration completion message.\n",
"\n"
"Since this is the first time that you're making a call to the workspace, you may be asked to authenticate. Once the authentication is complete, you then see the dataset registration completion message."
]
},
{
Expand All @@ -175,19 +150,16 @@
},
"outputs": [],
"source": [
"credit_data = ml_client.data.create_or_update(credit_data)\n",
"print(\n",
" f\"Dataset with name {credit_data.name} was registered to workspace, the dataset version is {credit_data.version}\"\n",
")"
"# get a handle of the data asset and print the URI\n",
"credit_data = ml_client.data.get(name=\"credit-card\", version=\"initial\")\n",
"print(f\"Data asset URI: {credit_data.path}\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"In the future, you can fetch the same dataset from the workspace using `credit_dataset = ml_client.data.get(\"<DATA ASSET NAME>\", version='<VERSION>')`.\n",
"\n",
"## Create a compute resource to run your pipeline (Optional)\n",
"\n",
"You can **skip this step** if you want to use [serverless compute (preview)](https://learn.microsoft.com/azure/machine-learning/how-to-use-serverless-compute?view=azureml-api-2&tabs=python) to run the training job. Through serverless compute, Azure Machine Learning takes care of creating, scaling, deleting, patching and managing compute, along with providing managed network isolation, reducing the burden on you. \n",
Expand Down Expand Up @@ -310,8 +282,8 @@
" - pip:\n",
" - inference-schema[numpy-support]==1.3.0\n",
" - xlrd==2.0.1\n",
" - mlflow== 1.26.1\n",
" - azureml-mlflow==1.42.0"
" - mlflow== 2.4.1\n",
" - azureml-mlflow==1.51.0"
]
},
{
Expand Down Expand Up @@ -350,7 +322,7 @@
" tags={\"scikit-learn\": \"0.24.2\"},\n",
" conda_file=os.path.join(dependencies_dir, \"conda.yaml\"),\n",
" image=\"mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest\",\n",
" version=\"0.1.0\",\n",
" version=\"0.2.0\",\n",
")\n",
"pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)\n",
"\n",
Expand Down Expand Up @@ -459,7 +431,7 @@
"\n",
" print(\"input data:\", args.data)\n",
"\n",
" credit_df = pd.read_excel(args.data, header=1, index_col=0)\n",
" credit_df = pd.read_csv(args.data, header=1, index_col=0)\n",
"\n",
" mlflow.log_metric(\"num_samples\", credit_df.shape[0])\n",
" mlflow.log_metric(\"num_features\", credit_df.shape[1] - 1)\n",
Expand Down
Loading