From 59027192a97ad10821d79764493baec2d5b9d0f9 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Fri, 7 Jul 2023 10:22:10 -0700 Subject: [PATCH 01/18] Automation test for spark job with managed vnet --- ...mit_spark_standalone_jobs_managed_vnet.yml | 80 +++++++++++++ sdk/python/jobs/spark/setup_spark.sh | 113 +++++++++++------- ...t_spark_standalone_jobs_managed_vnet.ipynb | 4 +- sdk/python/readme.py | 1 - 4 files changed, 154 insertions(+), 44 deletions(-) create mode 100644 .github/workflows/sdk-jobs-spark-submit_spark_standalone_jobs_managed_vnet.yml diff --git a/.github/workflows/sdk-jobs-spark-submit_spark_standalone_jobs_managed_vnet.yml b/.github/workflows/sdk-jobs-spark-submit_spark_standalone_jobs_managed_vnet.yml new file mode 100644 index 0000000000..89d7aa0dc6 --- /dev/null +++ b/.github/workflows/sdk-jobs-spark-submit_spark_standalone_jobs_managed_vnet.yml @@ -0,0 +1,80 @@ +# This code is autogenerated. +# Code is generated by running custom script: python3 readme.py +# Any manual changes to this file may cause incorrect behavior. +# Any manual changes will be overwritten if the code is regenerated. + +name: sdk-jobs-spark-submit_spark_standalone_jobs_managed_vnet +# This file is created by sdk/python/readme.py. +# Please do not edit directly. +on: + workflow_dispatch: + schedule: + - cron: "14 1/12 * * *" + pull_request: + branches: + - main + paths: + - sdk/python/jobs/spark/** + - .github/workflows/sdk-jobs-spark-submit_spark_standalone_jobs_managed_vnet.yml + - sdk/python/dev-requirements.txt + - infra/bootstrapping/** + - sdk/python/setup.sh +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: check out repo + uses: actions/checkout@v2 + - name: setup python + uses: actions/setup-python@v2 + with: + python-version: "3.8" + - name: pip install notebook reqs + run: pip install -r sdk/python/dev-requirements.txt + - name: azure login + uses: azure/login@v1 + with: + creds: ${{secrets.AZUREML_CREDENTIALS}} + - name: bootstrap resources + run: | + echo '${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}'; + bash bootstrap.sh + working-directory: infra/bootstrapping + continue-on-error: false + - name: setup SDK + run: | + source "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh"; + source "${{ github.workspace }}/infra/bootstrapping/init_environment.sh"; + bash setup.sh + working-directory: sdk/python + continue-on-error: true + - name: setup-cli + run: | + source "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh"; + source "${{ github.workspace }}/infra/bootstrapping/init_environment.sh"; + bash setup.sh + working-directory: cli + continue-on-error: true + - name: setup spark resources + run: | + bash -x jobs/spark/setup_spark.sh jobs/spark/ jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb + working-directory: sdk/python + continue-on-error: true + - name: run jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb + run: | + source "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh"; + source "${{ github.workspace }}/infra/bootstrapping/init_environment.sh"; + bash "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh" generate_workspace_config "../../.azureml/config.json"; + bash "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh" replace_template_values "submit_spark_standalone_jobs_managed_vnet.ipynb"; + [ -f "../../.azureml/config" ] && cat "../../.azureml/config"; + papermill -k python submit_spark_standalone_jobs_managed_vnet.ipynb submit_spark_standalone_jobs_managed_vnet.output.ipynb + working-directory: sdk/python/jobs/spark + - name: upload notebook's working folder as an artifact + if: ${{ always() }} + uses: actions/upload-artifact@v2 + with: + name: submit_spark_standalone_jobs_managed_vnet + path: sdk/python/jobs/spark diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index f03786ceac..953cfd852c 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -17,9 +17,8 @@ SQL_ADMIN_LOGIN_PASSWORD="auto123!" SPARK_POOL_NAME="automationpool" SPARK_POOL_ADMIN_ROLE_ID="6e4bf58a-b8e1-4cc3-bbf9-d73143322b78" USER_IDENTITY_YML="jobs/spark/user-assigned-identity.yml" -#OUTBOUND_RULE_NAME="automationtestrule" -#KEY_VAULT_NAME=$(az ml workspace show --query key_vault -o tsv | cut -d'/' -f9-) -#ACCESS_KEY_SECRET_NAME="automationsecret" +AZURE_REGION_NAME=${LOCATION} +OUTBOUND_RULE_NAME="automationtestrule" # if [[ "$2" == *"resources/compute"* ]] @@ -44,52 +43,84 @@ az identity create --name $AML_USER_MANAGED_ID --resource-group $RESOURCE_GROUP AML_USER_MANAGED_ID_OID=$(az identity show --resource-group $RESOURCE_GROUP -n $AML_USER_MANAGED_ID --query principalId -o tsv) # -# -az storage account create --name $GEN2_STORAGE_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku Standard_LRS --kind StorageV2 --enable-hierarchical-namespace true -az storage fs create -n $GEN2_FILE_SYSTEM --account-name $GEN2_STORAGE_NAME -az synapse workspace create --name $SYNAPSE_WORKSPACE_NAME --resource-group $RESOURCE_GROUP --storage-account $GEN2_STORAGE_NAME --file-system $GEN2_FILE_SYSTEM --sql-admin-login-user $SQL_ADMIN_LOGIN_USER --sql-admin-login-password $SQL_ADMIN_LOGIN_PASSWORD --location $LOCATION -az role assignment create --role "Storage Blob Data Owner" --assignee $AML_USER_MANAGED_ID_OID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_NAME/blobServices/default/containers/$GEN2_FILE_SYSTEM -az synapse spark pool create --name $SPARK_POOL_NAME --workspace-name $SYNAPSE_WORKSPACE_NAME --resource-group $RESOURCE_GROUP --spark-version 3.2 --node-count 3 --node-size Medium --min-node-count 3 --max-node-count 10 --enable-auto-scale true -az synapse workspace firewall-rule create --name allowAll --workspace-name $SYNAPSE_WORKSPACE_NAME --resource-group $RESOURCE_GROUP --start-ip-address 0.0.0.0 --end-ip-address 255.255.255.255 -# -sed -i "s//$SUBSCRIPTION_ID/g; - s//$RESOURCE_GROUP/g; - s//$AML_USER_MANAGED_ID/g;" $USER_IDENTITY_YML +# +if [[ "$2" == *"managed_vnet"* ]] +then + AML_WORKSPACE_NAME=${AML_WORKSPACE_NAME}vnet + AZURE_STORAGE_ACCOUNT="blobstoragevnet" + BLOB_CONTAINER_NAME="blobstoragevnetcontainer" + GEN2_STORAGE_ACCOUNT_NAME="gen2storagevnet" + ADLS_CONTAINER_NAME="gen2containervnet" + az storage account create -n $AZURE_STORAGE_ACCOUNT -g $RESOURCE_GROUP -l $LOCATION --sku Standard_LRS + az storage container create -n $BLOB_CONTAINER_NAME --account-name $AZURE_STORAGE_ACCOUNT -# -az ml workspace update --subscription $SUBSCRIPTION_ID --resource-group $RESOURCE_GROUP --name $AML_WORKSPACE_NAME --file $USER_IDENTITY_YML -# + az storage account create --name $GEN2_STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku Standard_LRS --kind StorageV2 --enable-hierarchical-namespace true + az storage container create -n $ADLS_CONTAINER_NAME --account-name $GEN2_STORAGE_ACCOUNT_NAME -# -sed -i "s//$SUBSCRIPTION_ID/g; - s//$RESOURCE_GROUP/g; - s//$AML_WORKSPACE_NAME/g; - s//$ATTACHED_SPARK_POOL_NAME/g; - s//$SYNAPSE_WORKSPACE_NAME/g; - s//$SPARK_POOL_NAME/g; - s//$AML_USER_MANAGED_ID/g; - s//$ATTACHED_SPARK_POOL_NAME_UAI/g; - s//$AML_USER_MANAGED_ID/g;" $ATTACH_SPARK_PY + ACCOUNT_KEY=$(az storage account keys list --account-name $AZURE_STORAGE_ACCOUNT --query "[0].value" -o tsv) + ACCESS_KEY_SECRET_NAME="autotestaccountkey" + KEY_VAULT=$(az ml workspace show -g feli1devrg -n automation-eus --query key_vault -o tsv) + KEY_VAULT_NAME=$(basename "$KEY_VAULT") + az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY -python $ATTACH_SPARK_PY -# + sed -i "s//$AZURE_REGION_NAME/g; + s//$AZURE_STORAGE_ACCOUNT/g; + s//$OUTBOUND_RULE_NAME/g; + s//$KEY_VAULT_NAME/g; + s//$ACCESS_KEY_SECRET_NAME/g; + s//$BLOB_CONTAINER_NAME/g; + s//$GEN2_STORAGE_ACCOUNT_NAME/g; + s//$ADLS_CONTAINER_NAME/g;" $2 +# +else + # + az storage account create --name $GEN2_STORAGE_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku Standard_LRS --kind StorageV2 --enable-hierarchical-namespace true + az storage fs create -n $GEN2_FILE_SYSTEM --account-name $GEN2_STORAGE_NAME + az synapse workspace create --name $SYNAPSE_WORKSPACE_NAME --resource-group $RESOURCE_GROUP --storage-account $GEN2_STORAGE_NAME --file-system $GEN2_FILE_SYSTEM --sql-admin-login-user $SQL_ADMIN_LOGIN_USER --sql-admin-login-password $SQL_ADMIN_LOGIN_PASSWORD --location $LOCATION + az role assignment create --role "Storage Blob Data Owner" --assignee $AML_USER_MANAGED_ID_OID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_NAME/blobServices/default/containers/$GEN2_FILE_SYSTEM + az synapse spark pool create --name $SPARK_POOL_NAME --workspace-name $SYNAPSE_WORKSPACE_NAME --resource-group $RESOURCE_GROUP --spark-version 3.2 --node-count 3 --node-size Medium --min-node-count 3 --max-node-count 10 --enable-auto-scale true + az synapse workspace firewall-rule create --name allowAll --workspace-name $SYNAPSE_WORKSPACE_NAME --resource-group $RESOURCE_GROUP --start-ip-address 0.0.0.0 --end-ip-address 255.255.255.255 + # -COMPUTE_MANAGED_IDENTITY=$(az ml compute show --name $ATTACHED_SPARK_POOL_NAME --resource-group $RESOURCE_GROUP --workspace-name $AML_WORKSPACE_NAME --query identity.principal_id --out tsv) + sed -i "s//$SUBSCRIPTION_ID/g; + s//$RESOURCE_GROUP/g; + s//$AML_USER_MANAGED_ID/g;" $USER_IDENTITY_YML -if [[ ! -z "$COMPUTE_MANAGED_IDENTITY" ]] -then - az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $COMPUTE_MANAGED_IDENTITY -fi + # + az ml workspace update --subscription $SUBSCRIPTION_ID --resource-group $RESOURCE_GROUP --name $AML_WORKSPACE_NAME --file $USER_IDENTITY_YML + # -COMPUTE_MANAGED_IDENTITY=$(az ml compute show --name $ATTACHED_SPARK_POOL_NAME_UAI --resource-group $RESOURCE_GROUP --workspace-name $AML_WORKSPACE_NAME --query identity.principal_id --out tsv) + # + sed -i "s//$SUBSCRIPTION_ID/g; + s//$RESOURCE_GROUP/g; + s//$AML_WORKSPACE_NAME/g; + s//$ATTACHED_SPARK_POOL_NAME/g; + s//$SYNAPSE_WORKSPACE_NAME/g; + s//$SPARK_POOL_NAME/g; + s//$AML_USER_MANAGED_ID/g; + s//$ATTACHED_SPARK_POOL_NAME_UAI/g; + s//$AML_USER_MANAGED_ID/g;" $ATTACH_SPARK_PY -if [[ ! -z "$COMPUTE_MANAGED_IDENTITY" ]] -then - az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $COMPUTE_MANAGED_IDENTITY -fi + python $ATTACH_SPARK_PY + # -az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $AML_USER_MANAGED_ID + COMPUTE_MANAGED_IDENTITY=$(az ml compute show --name $ATTACHED_SPARK_POOL_NAME --resource-group $RESOURCE_GROUP --workspace-name $AML_WORKSPACE_NAME --query identity.principal_id --out tsv) + + if [[ ! -z "$COMPUTE_MANAGED_IDENTITY" ]] + then + az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $COMPUTE_MANAGED_IDENTITY + fi + + COMPUTE_MANAGED_IDENTITY=$(az ml compute show --name $ATTACHED_SPARK_POOL_NAME_UAI --resource-group $RESOURCE_GROUP --workspace-name $AML_WORKSPACE_NAME --query identity.principal_id --out tsv) + + if [[ ! -z "$COMPUTE_MANAGED_IDENTITY" ]] + then + az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $COMPUTE_MANAGED_IDENTITY + fi + + az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $AML_USER_MANAGED_ID +fi # sed -i "s//$SUBSCRIPTION_ID/g; @@ -101,4 +132,4 @@ sed -i "s//$SUBSCRIPTION_ID/g; s//$AML_USER_MANAGED_ID/g; s//$ATTACHED_SPARK_POOL_NAME_UAI/g; s//$AML_USER_MANAGED_ID/g;" $2 -# +# \ No newline at end of file diff --git a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb index 088a07d41c..eed1e62aa2 100644 --- a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb +++ b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb @@ -592,7 +592,7 @@ "\n", "# This will add a new outbound rule to existing rules\n", "rule_name = \"\" # This name should be unique\n", - "adls_storage_account = \"\"\n", + "adls_storage_account = \"\"\n", "service_resource_id = f\"/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.Storage/storageAccounts/{adls_storage_account}\"\n", "subresource_target = \"dfs\"\n", "spark_enabled = True\n", @@ -747,7 +747,7 @@ "# Enter the Azure Data Lake Storage (ADLS) Gen2 account name and container name.\n", "# The file `titanic.csv` should be placed inside folder `data`\n", "# created in the Azure Data Lake Storage (ADLS) Gen2 container.\n", - "adls_storage_account = \"\"\n", + "adls_storage_account = \"\"\n", "container_name = \"\"\n", "\n", "spark_job = spark(\n", diff --git a/sdk/python/readme.py b/sdk/python/readme.py index c4042abbbc..9dacd9cf1b 100644 --- a/sdk/python/readme.py +++ b/sdk/python/readme.py @@ -19,7 +19,6 @@ "train-hyperparameter-tune-deploy-with-keras", "train-hyperparameter-tune-deploy-with-tensorflow", "interactive_data_wrangling", - "submit_spark_standalone_jobs_managed_vnet", # mlflow SDK samples notebooks "mlflow_sdk_online_endpoints_progresive", "mlflow_sdk_online_endpoints", From 181185c31a9a34b87b182fbfe348a4139cbb31c3 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Fri, 7 Jul 2023 11:41:52 -0700 Subject: [PATCH 02/18] Update to keyword arguments in provision vnet --- .../jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb index eed1e62aa2..03319209a6 100644 --- a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb +++ b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb @@ -183,7 +183,7 @@ "# Provisioning managed VNet with Spark support\n", "include_spark = True\n", "provision_network_result = ml_client.workspaces.begin_provision_network(\n", - " ws_name, include_spark\n", + " workspace_name=ws_name, include_spark=include_spark\n", ").result()" ] }, From ef90e342b6f663d5ea7179f2e9dd707b87ca415b Mon Sep 17 00:00:00 2001 From: Fred Li Date: Fri, 7 Jul 2023 13:04:40 -0700 Subject: [PATCH 03/18] Add test for data wrangling interactive notebook --- ...spark-run_interactive_session_notebook.yml | 80 +++++++++++++++++++ .../run_interactive_session_notebook.ipynb | 75 +++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 .github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml create mode 100644 sdk/python/jobs/spark/run_interactive_session_notebook.ipynb diff --git a/.github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml b/.github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml new file mode 100644 index 0000000000..474544653c --- /dev/null +++ b/.github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml @@ -0,0 +1,80 @@ +# This code is autogenerated. +# Code is generated by running custom script: python3 readme.py +# Any manual changes to this file may cause incorrect behavior. +# Any manual changes will be overwritten if the code is regenerated. + +name: sdk-jobs-spark-run_interactive_session_notebook +# This file is created by sdk/python/readme.py. +# Please do not edit directly. +on: + workflow_dispatch: + schedule: + - cron: "30 11/12 * * *" + pull_request: + branches: + - main + paths: + - sdk/python/jobs/spark/** + - .github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml + - sdk/python/dev-requirements.txt + - infra/bootstrapping/** + - sdk/python/setup.sh +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: check out repo + uses: actions/checkout@v2 + - name: setup python + uses: actions/setup-python@v2 + with: + python-version: "3.8" + - name: pip install notebook reqs + run: pip install -r sdk/python/dev-requirements.txt + - name: azure login + uses: azure/login@v1 + with: + creds: ${{secrets.AZUREML_CREDENTIALS}} + - name: bootstrap resources + run: | + echo '${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}'; + bash bootstrap.sh + working-directory: infra/bootstrapping + continue-on-error: false + - name: setup SDK + run: | + source "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh"; + source "${{ github.workspace }}/infra/bootstrapping/init_environment.sh"; + bash setup.sh + working-directory: sdk/python + continue-on-error: true + - name: setup-cli + run: | + source "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh"; + source "${{ github.workspace }}/infra/bootstrapping/init_environment.sh"; + bash setup.sh + working-directory: cli + continue-on-error: true + - name: setup spark resources + run: | + bash -x jobs/spark/setup_spark.sh jobs/spark/ jobs/spark/run_interactive_session_notebook.ipynb + working-directory: sdk/python + continue-on-error: true + - name: run jobs/spark/run_interactive_session_notebook.ipynb + run: | + source "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh"; + source "${{ github.workspace }}/infra/bootstrapping/init_environment.sh"; + bash "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh" generate_workspace_config "../../.azureml/config.json"; + bash "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh" replace_template_values "run_interactive_session_notebook.ipynb"; + [ -f "../../.azureml/config" ] && cat "../../.azureml/config"; + papermill -k python run_interactive_session_notebook.ipynb run_interactive_session_notebook.output.ipynb + working-directory: sdk/python/jobs/spark + - name: upload notebook's working folder as an artifact + if: ${{ always() }} + uses: actions/upload-artifact@v2 + with: + name: run_interactive_session_notebook + path: sdk/python/jobs/spark diff --git a/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb b/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb new file mode 100644 index 0000000000..ba3eab9a9d --- /dev/null +++ b/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb @@ -0,0 +1,75 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use an attached Synapse Spark pool" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You should have an attached Synapse Spark pool available in your workspace. Please see documentation page: [Attach and manage a Synapse Spark pool in Azure Machine Learning (preview)](https://learn.microsoft.com/azure/machine-learning/how-to-manage-synapse-spark-pool) for more details.\n", + "\n", + "**Note** - To ensure successful execution of Spark job, the identity being used for the Spark job should be assigned **Contributor** and **Storage Blob Data Contributor** roles on the Azure storage account used for data input and output." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azure.ai.ml import MLClient, spark, Input, Output\n", + "from azure.identity import DefaultAzureCredential\n", + "\n", + "subscription_id = \"\"\n", + "resource_group = \"\"\n", + "workspace = \"\"\n", + "ml_client = MLClient(\n", + " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", + ")\n", + "\n", + "spark_job = spark(\n", + " display_name=\"interactive_data_wrangling\",\n", + " code=\"../../data-wrangling\",\n", + " entry={\"file\": \"interactive_data_wrangling.ipynb\"},\n", + " driver_cores=1,\n", + " driver_memory=\"2g\",\n", + " executor_cores=2,\n", + " executor_memory=\"2g\",\n", + " executor_instances=2,\n", + " compute=\"\",\n", + ")\n", + "\n", + "returned_spark_job = ml_client.jobs.create_or_update(spark_job)\n", + "\n", + "# Wait until the job completes\n", + "ml_client.jobs.stream(returned_spark_job.name)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.10 - SDK V2", + "language": "python", + "name": "python310-sdkv2" + }, + "language_info": { + "name": "python", + "version": "3.7.10" + }, + "orig_nbformat": 4, + "vscode": { + "interpreter": { + "hash": "6aeff17a1aa7735c2f7cb3a6d691fe1b4d4c3b8d2d650f644ad0f24e1b8e3f3f" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 3449368b695e9ef25c1cd0393a0b5c2937240f68 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Fri, 7 Jul 2023 13:57:33 -0700 Subject: [PATCH 04/18] Add permanent delete to worksapce cleanup --- .../jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb index 03319209a6..346a3151dd 100644 --- a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb +++ b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb @@ -862,7 +862,7 @@ }, "outputs": [], "source": [ - "ml_client.workspaces.begin_delete(name=ws_name, delete_dependent_resources=True)" + "ml_client.workspaces.begin_delete(name=ws_name, permanently_delete=True, delete_dependent_resources=True)" ] } ], From 59358d4f2b7297f9b2b41fb18ba9a72d63acd8af Mon Sep 17 00:00:00 2001 From: Fred Li Date: Mon, 10 Jul 2023 09:48:18 -0700 Subject: [PATCH 05/18] Rename the vnet workspace --- sdk/python/jobs/spark/setup_spark.sh | 5 ++++- .../spark/submit_spark_standalone_jobs_managed_vnet.ipynb | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index 953cfd852c..0f89c2ce3d 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -43,11 +43,14 @@ az identity create --name $AML_USER_MANAGED_ID --resource-group $RESOURCE_GROUP AML_USER_MANAGED_ID_OID=$(az identity show --resource-group $RESOURCE_GROUP -n $AML_USER_MANAGED_ID --query principalId -o tsv) # +# +# +ipython nbconvert --to script ../../data-wrangling/run_interactive_session_notebook.ipynb # if [[ "$2" == *"managed_vnet"* ]] then - AML_WORKSPACE_NAME=${AML_WORKSPACE_NAME}vnet + AML_WORKSPACE_NAME=${AML_WORKSPACE_NAME}-vnet AZURE_STORAGE_ACCOUNT="blobstoragevnet" BLOB_CONTAINER_NAME="blobstoragevnetcontainer" GEN2_STORAGE_ACCOUNT_NAME="gen2storagevnet" diff --git a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb index 346a3151dd..efdd4ecea9 100644 --- a/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb +++ b/sdk/python/jobs/spark/submit_spark_standalone_jobs_managed_vnet.ipynb @@ -862,7 +862,9 @@ }, "outputs": [], "source": [ - "ml_client.workspaces.begin_delete(name=ws_name, permanently_delete=True, delete_dependent_resources=True)" + "ml_client.workspaces.begin_delete(\n", + " name=ws_name, permanently_delete=True, delete_dependent_resources=True\n", + ")" ] } ], From 3e8257acd1b2fef7191d67d795cd7a0c0eb3103b Mon Sep 17 00:00:00 2001 From: Fred Li Date: Mon, 10 Jul 2023 11:35:38 -0700 Subject: [PATCH 06/18] Support interactive session test --- .../interactive_data_wrangling.ipynb | 6 +- ...b => run_interactive_data_wrangling.ipynb} | 7 ++- sdk/python/jobs/spark/setup_spark.sh | 57 +++++++++++++++++-- 3 files changed, 60 insertions(+), 10 deletions(-) rename sdk/python/jobs/spark/{run_interactive_session_notebook.ipynb => run_interactive_data_wrangling.ipynb} (91%) diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb index f5939089b2..f4da9fe0dd 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb +++ b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb @@ -237,7 +237,7 @@ "from pyspark.ml.feature import Imputer\n", "\n", "df = pd.read_csv(\n", - " \"abfss://@.dfs.core.windows.net/data/titanic.csv\",\n", + " \"abfss://@.dfs.core.windows.net/data/titanic.csv\",\n", " index_col=\"PassengerId\",\n", ")\n", "imputer = Imputer(inputCols=[\"Age\"], outputCol=\"Age\").setStrategy(\n", @@ -356,7 +356,7 @@ "from pyspark.ml.feature import Imputer\n", "\n", "df = pd.read_csv(\n", - " \"abfss://@.dfs.core.windows.net/data/titanic.csv\",\n", + " \"abfss://@.dfs.core.windows.net/data/titanic.csv\",\n", " index_col=\"PassengerId\",\n", ")\n", "imputer = Imputer(inputCols=[\"Age\"], outputCol=\"Age\").setStrategy(\n", @@ -367,7 +367,7 @@ ") # Fill Cabin column with value \"None\" if missing\n", "df.dropna(inplace=True) # Drop the rows which still have any missing value\n", "df.to_csv(\n", - " \"abfss://@.dfs.core.windows.net/data/wrangled\",\n", + " \"abfss://@.dfs.core.windows.net/data/wrangled\",\n", " index_col=\"PassengerId\",\n", ")" ] diff --git a/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb b/sdk/python/jobs/spark/run_interactive_data_wrangling.ipynb similarity index 91% rename from sdk/python/jobs/spark/run_interactive_session_notebook.ipynb rename to sdk/python/jobs/spark/run_interactive_data_wrangling.ipynb index ba3eab9a9d..4450a4fdd1 100644 --- a/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb +++ b/sdk/python/jobs/spark/run_interactive_data_wrangling.ipynb @@ -37,13 +37,16 @@ "spark_job = spark(\n", " display_name=\"interactive_data_wrangling\",\n", " code=\"../../data-wrangling\",\n", - " entry={\"file\": \"interactive_data_wrangling.ipynb\"},\n", + " entry={\"file\": \"interactive_data_wrangling.py\"},\n", " driver_cores=1,\n", " driver_memory=\"2g\",\n", " executor_cores=2,\n", " executor_memory=\"2g\",\n", " executor_instances=2,\n", - " compute=\"\",\n", + " resources={\n", + " \"instance_type\": \"Standard_E8S_V3\",\n", + " \"runtime_version\": \"3.2.0\",\n", + " },\n", ")\n", "\n", "returned_spark_job = ml_client.jobs.create_or_update(spark_job)\n", diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index 0f89c2ce3d..ca4a9fbd54 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -43,10 +43,6 @@ az identity create --name $AML_USER_MANAGED_ID --resource-group $RESOURCE_GROUP AML_USER_MANAGED_ID_OID=$(az identity show --resource-group $RESOURCE_GROUP -n $AML_USER_MANAGED_ID --query principalId -o tsv) # -# - -# -ipython nbconvert --to script ../../data-wrangling/run_interactive_session_notebook.ipynb # if [[ "$2" == *"managed_vnet"* ]] then @@ -63,7 +59,7 @@ then ACCOUNT_KEY=$(az storage account keys list --account-name $AZURE_STORAGE_ACCOUNT --query "[0].value" -o tsv) ACCESS_KEY_SECRET_NAME="autotestaccountkey" - KEY_VAULT=$(az ml workspace show -g feli1devrg -n automation-eus --query key_vault -o tsv) + KEY_VAULT=$(az ml workspace show -g $RESOURCE_GROUP -n $AML_WORKSPACE_NAME --query key_vault -o tsv) KEY_VAULT_NAME=$(basename "$KEY_VAULT") az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY @@ -76,6 +72,57 @@ then s//$GEN2_STORAGE_ACCOUNT_NAME/g; s//$ADLS_CONTAINER_NAME/g;" $2 # +# +elif [[ "$2" == *"interactive_data_wrangling"* ]] +then + NOTEBOOK_TO_CONVERT="../../data-wrangling/interactive_data_wrangling.ipynb" + jupyter nbconvert $NOTEBOOK_TO_CONVERT --to script + + ACCOUNT_KEY=$(az storage account keys list --account-name $AZURE_STORAGE_ACCOUNT --query "[0].value" -o tsv) + ACCESS_KEY_SECRET_NAME="autotestaccountkey" + KEY_VAULT=$(az ml workspace show -g $RESOURCE_GROUP -n $AML_WORKSPACE_NAME --query key_vault -o tsv) + KEY_VAULT_NAME=$(basename "$KEY_VAULT") + NOTEBOOK_PY="../../data-wrangling/interactive_data_wrangling.py" + az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY + + END_TIME=`date -u -d "60 minutes" '+%Y-%m-%dT%H:%MZ'` + SAS_TOKEN=`az storage container generate-sas -n $BLOB_CONTAINER_NAME --account-name $AZURE_STORAGE_ACCOUNT --https-only --permissions dlrw --expiry $end -o tsv` + SAS_TOKEN_SECRET_NAME="autotestsastoken" + az keyvault secret set --name $SAS_TOKEN_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SAS_TOKEN + + GEN2_STORAGE_ACCOUNT_NAME=${RESOURCE_GROUP}gen2 + FILE_SYSTEM_NAME=${RESOURCE_GROUP}file + az storage account create --name $GEN2_STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku Standard_LRS --kind StorageV2 --enable-hierarchical-namespace true + az storage fs create -n $FILE_SYSTEM_NAME --account-name $GEN2_STORAGE_ACCOUNT_NAME + az role assignment create --role "Storage Blob Data Contributor" --assignee $AML_USER_MANAGED_ID_OID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME + + SERVICE_PRINCIPAL_NAME="${RESOURCE_GROUP}sp" + az ad sp create-for-rbac --name $SERVICE_PRINCIPAL_NAME + LIST_SP_DETAILS=$(az ad sp list --display-name $SERVICE_PRINCIPAL_NAME) + SP_APPID=$(echo $LIST_SP_DETAILS | jq -r '[0].appId') + SP_OBJECTID=$(echo $LIST_SP_DETAILS | jq -r '[0].id') + SP_TENANTID=$(echo $LIST_SP_DETAILS | jq -r '[0].appOwnerOrganizationId') + SPA_SP_SECRET=$(az ad sp credential reset --id $SP_OBJECTID --query "password") + + CLIENT_ID_SECRET_NAME="autotestspsecretclient" + TENANT_ID_SECRET_NAME="autotestspsecrettenant" + CLIENT_SECRET_NAME="autotestspsecret" + az keyvault secret set --name $CLIENT_ID_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SP_APPID + az keyvault secret set --name $TENANT_ID_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SP_TENANTID + az keyvault secret set --name $CLIENT_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SPA_SP_SECRET + az role assignment create --role "Storage Blob Data Contributor" --assignee $SP_APPID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME + + sed -i "s//$KEY_VAULT_NAME/g; + s//$ACCESS_KEY_SECRET_NAME/g; + s//$AZURE_STORAGE_ACCOUNT/g; + s//$BLOB_CONTAINER_NAME/g + s//$SAS_TOKEN_SECRET_NAME/g; + s//$GEN2_STORAGE_ACCOUNT_NAME/g + s//$FILE_SYSTEM_NAME/g; + s//$CLIENT_ID_SECRET_NAME/g; + s//$TENANT_ID_SECRET_NAME/g; + s//$CLIENT_SECRET_NAME/g;" $NOTEBOOK_PY +# else # az storage account create --name $GEN2_STORAGE_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku Standard_LRS --kind StorageV2 --enable-hierarchical-namespace true From 2cc1ab2991dd2766fee42a5f40d1361e716796a0 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Mon, 10 Jul 2023 11:59:46 -0700 Subject: [PATCH 07/18] rename run session file notebook --- ...a_wrangling.ipynb => run_interactive_session_notebook.ipynb} | 0 sdk/python/jobs/spark/setup_spark.sh | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename sdk/python/jobs/spark/{run_interactive_data_wrangling.ipynb => run_interactive_session_notebook.ipynb} (100%) diff --git a/sdk/python/jobs/spark/run_interactive_data_wrangling.ipynb b/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb similarity index 100% rename from sdk/python/jobs/spark/run_interactive_data_wrangling.ipynb rename to sdk/python/jobs/spark/run_interactive_session_notebook.ipynb diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index ca4a9fbd54..cf79600599 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -73,7 +73,7 @@ then s//$ADLS_CONTAINER_NAME/g;" $2 # # -elif [[ "$2" == *"interactive_data_wrangling"* ]] +elif [[ "$2" == *"run_interactive_session_notebook"* ]] then NOTEBOOK_TO_CONVERT="../../data-wrangling/interactive_data_wrangling.ipynb" jupyter nbconvert $NOTEBOOK_TO_CONVERT --to script From c31a656007552406075c7de9bd7c780b49397a2a Mon Sep 17 00:00:00 2001 From: Fred Li Date: Mon, 10 Jul 2023 13:39:29 -0700 Subject: [PATCH 08/18] Update to use ipython --- sdk/python/jobs/spark/setup_spark.sh | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index cf79600599..99929c18c7 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -76,12 +76,14 @@ then elif [[ "$2" == *"run_interactive_session_notebook"* ]] then NOTEBOOK_TO_CONVERT="../../data-wrangling/interactive_data_wrangling.ipynb" - jupyter nbconvert $NOTEBOOK_TO_CONVERT --to script + ipython nbconvert $NOTEBOOK_TO_CONVERT --to script ACCOUNT_KEY=$(az storage account keys list --account-name $AZURE_STORAGE_ACCOUNT --query "[0].value" -o tsv) ACCESS_KEY_SECRET_NAME="autotestaccountkey" - KEY_VAULT=$(az ml workspace show -g $RESOURCE_GROUP -n $AML_WORKSPACE_NAME --query key_vault -o tsv) - KEY_VAULT_NAME=$(basename "$KEY_VAULT") + + KEY_VAULT_NAME="autotestsparkkv" + az keyvault create -n $KEY_VAULT_NAME -g $RESOURCE_GROUP + NOTEBOOK_PY="../../data-wrangling/interactive_data_wrangling.py" az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY From 8704d86a6ab86435ae24a004f92dd7d65ef4ce70 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Mon, 10 Jul 2023 14:24:21 -0700 Subject: [PATCH 09/18] Add py file for notebook session --- .../interactive_data_wrangling.py | 247 ++++++++++++++++++ sdk/python/jobs/spark/setup_spark.sh | 11 +- 2 files changed, 253 insertions(+), 5 deletions(-) create mode 100644 sdk/python/data-wrangling/interactive_data_wrangling.py diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.py b/sdk/python/data-wrangling/interactive_data_wrangling.py new file mode 100644 index 0000000000..e46df05e75 --- /dev/null +++ b/sdk/python/data-wrangling/interactive_data_wrangling.py @@ -0,0 +1,247 @@ +## Interactive Data Wrangling using Apache Spark in Azure Machine Learning. Before executing these sample codes in an Azure Machine Learning Notebook, select **Serverless Spark Compute** under **Azure Machine Learning Serverless Spark** or select an attached Synapse Spark pool under **Synapse Spark pools** from the **Compute** selection menu. It is highly recommened to follow the documentation page: [Interactive data wrangling with Apache Spark in Azure Machine Learning](https://learn.microsoft.com/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml) for more details related to the code samples provided in this notebook. + +### Access and wrangle Azure Blob storage data using Access Key + +#### First, Set the access key as configuration property `fs.azure.account.key..blob.core.windows.net`. + +from pyspark.sql import SparkSession + +sc = SparkSession.builder.getOrCreate() +token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary +access_key = token_library.getSecret("", "") +sc._jsc.hadoopConfiguration().set( + "fs.azure.account.key..blob.core.windows.net", access_key +) + +#### Access data using `wasbs://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "wasbs://@.blob.core.windows.net/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "wasbs://@.blob.core.windows.net/data/wrangled", + index_col="PassengerId", +) + +### Access and wrangle Azure Blob storage data using SAS token + +#### First, set the SAS token as configuration property `fs.azure.sas...blob.core.windows.net`. +from pyspark.sql import SparkSession + +sc = SparkSession.builder.getOrCreate() +token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary +sas_token = token_library.getSecret("", "") +sc._jsc.hadoopConfiguration().set( + "fs.azure.sas...blob.core.windows.net", + sas_token, +) + +#### Access data using `wasbs://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "wasbs://@.blob.core.windows.net/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "wasbs://@.blob.core.windows.net/data/wrangled", + index_col="PassengerId", +) + +### Access and wrangle ADLS Gen 2 data using User Identity passthrough + +#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity. +#### - Access data using `abfss://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "abfss://@.dfs.core.windows.net/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "abfss://@.dfs.core.windows.net/data/wrangled", + index_col="PassengerId", +) + +### Access and wrangle ADLS Gen 2 data using Service Principal + +#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity. +#### - Set configuration properties as follows: +#### - Client ID property: `fs.azure.account.oauth2.client.id..dfs.core.windows.net` +#### - Client secret property: `fs.azure.account.oauth2.client.secret..dfs.core.windows.net` +#### - Tenant ID property: `fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net` +#### - Tenant ID value: `https://login.microsoftonline.com//oauth2/token` +from pyspark.sql import SparkSession + +sc = SparkSession.builder.getOrCreate() +token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary + +# Set up service principal tenant ID, client ID and secret from Azure Key Vault +client_id = token_library.getSecret("", "") +tenant_id = token_library.getSecret("", "") +client_secret = token_library.getSecret("", "") + +# Set up service principal which has access of the data +sc._jsc.hadoopConfiguration().set( + "fs.azure.account.auth.type..dfs.core.windows.net", "OAuth" +) +sc._jsc.hadoopConfiguration().set( + "fs.azure.account.oauth.provider.type..dfs.core.windows.net", + "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", +) +sc._jsc.hadoopConfiguration().set( + "fs.azure.account.oauth2.client.id..dfs.core.windows.net", + client_id, +) +sc._jsc.hadoopConfiguration().set( + "fs.azure.account.oauth2.client.secret..dfs.core.windows.net", + client_secret, +) +sc._jsc.hadoopConfiguration().set( + "fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", + "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", +) + +#### - Access data using `abfss://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "abfss://@.dfs.core.windows.net/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "abfss://@.dfs.core.windows.net/data/wrangled", + index_col="PassengerId", +) + +### Access and wrangle data using credentialed AzureML Blob Datastore +#### - Access data using `azureml://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "azureml://datastores/workspaceblobstore/paths/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "azureml://datastores/workspaceblobstore/paths/data/wrangled", + index_col="PassengerId", +) + +### Access and wrangle data using credentialless AzureML Blob Datastore +#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity on the Azure Blob storage account that the datastore points to. +#### - Access data using `azureml://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "azureml://datastores/credlessblobdatastore/paths/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "azureml://datastores/credlessblobdatastore/paths/data/wrangled", + index_col="PassengerId", +) + +### Access credentialed AzureML ADLS Gen 2 Datastore +#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the service principal used by datastore. +#### - Access data using `azureml://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "azureml://datastores/adlsg2datastore/paths/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "azureml://datastores/credadlsg2datastore/paths/data/wrangled", + index_col="PassengerId", +) + +### Access and wrangle data using credentialless AzureML ADLS Gen 2 Datastore +#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity on the Azure Data Lake Storage (ADLS) Gen 2 storage account that the datastore points to. +#### - Access data using `azureml://` URI and perform data wrangling. +import pyspark.pandas as pd +from pyspark.ml.feature import Imputer + +df = pd.read_csv( + "azureml://datastores/credlessadlsg2datastore/paths/data/titanic.csv", + index_col="PassengerId", +) +imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( + "mean" +) # Replace missing values in Age column with the mean value +df.fillna( + value={"Cabin": "None"}, inplace=True +) # Fill Cabin column with value "None" if missing +df.dropna(inplace=True) # Drop the rows which still have any missing value +df.to_csv( + "azureml://datastores/credlessadlsg2datastore/paths/data/wrangled", + index_col="PassengerId", +) + +### Access mounted File Share +#### Access data on mounted File share by constructing absolute path. +import os +import pyspark.pandas as pd + +abspath = os.path.abspath(".") +file = "file://" + abspath + "/Users//data/titanic.csv" +print(file) +df = pd.read_csv(file) +df.head() diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index 99929c18c7..93e76d69fe 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -75,8 +75,8 @@ then # elif [[ "$2" == *"run_interactive_session_notebook"* ]] then - NOTEBOOK_TO_CONVERT="../../data-wrangling/interactive_data_wrangling.ipynb" - ipython nbconvert $NOTEBOOK_TO_CONVERT --to script + #NOTEBOOK_TO_CONVERT="../../data-wrangling/interactive_data_wrangling.ipynb" + #ipython nbconvert $NOTEBOOK_TO_CONVERT --to script ACCOUNT_KEY=$(az storage account keys list --account-name $AZURE_STORAGE_ACCOUNT --query "[0].value" -o tsv) ACCESS_KEY_SECRET_NAME="autotestaccountkey" @@ -88,7 +88,7 @@ then az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY END_TIME=`date -u -d "60 minutes" '+%Y-%m-%dT%H:%MZ'` - SAS_TOKEN=`az storage container generate-sas -n $BLOB_CONTAINER_NAME --account-name $AZURE_STORAGE_ACCOUNT --https-only --permissions dlrw --expiry $end -o tsv` + SAS_TOKEN=`az storage container generate-sas -n $AZUREML_DEFAULT_CONTAINER --account-name $AZURE_STORAGE_ACCOUNT --https-only --permissions dlrw --expiry $end -o tsv` SAS_TOKEN_SECRET_NAME="autotestsastoken" az keyvault secret set --name $SAS_TOKEN_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SAS_TOKEN @@ -105,7 +105,7 @@ then SP_OBJECTID=$(echo $LIST_SP_DETAILS | jq -r '[0].id') SP_TENANTID=$(echo $LIST_SP_DETAILS | jq -r '[0].appOwnerOrganizationId') SPA_SP_SECRET=$(az ad sp credential reset --id $SP_OBJECTID --query "password") - + USER="azuremlsdk" CLIENT_ID_SECRET_NAME="autotestspsecretclient" TENANT_ID_SECRET_NAME="autotestspsecrettenant" CLIENT_SECRET_NAME="autotestspsecret" @@ -123,7 +123,8 @@ then s//$FILE_SYSTEM_NAME/g; s//$CLIENT_ID_SECRET_NAME/g; s//$TENANT_ID_SECRET_NAME/g; - s//$CLIENT_SECRET_NAME/g;" $NOTEBOOK_PY + s//$CLIENT_SECRET_NAME/g; + s//$USER/g;" $NOTEBOOK_PY # else # From d96170ff7146dde3b996a407669f302629f04ed7 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Tue, 11 Jul 2023 11:39:51 -0700 Subject: [PATCH 10/18] Update relative path to py file --- sdk/python/jobs/spark/setup_spark.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index 93e76d69fe..daaa5dfff1 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -84,7 +84,7 @@ then KEY_VAULT_NAME="autotestsparkkv" az keyvault create -n $KEY_VAULT_NAME -g $RESOURCE_GROUP - NOTEBOOK_PY="../../data-wrangling/interactive_data_wrangling.py" + NOTEBOOK_PY="./data-wrangling/interactive_data_wrangling.py" az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY END_TIME=`date -u -d "60 minutes" '+%Y-%m-%dT%H:%MZ'` From cee56e8c27ceadd1fdb26d85e3962cfe3b0276f0 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Tue, 11 Jul 2023 11:59:51 -0700 Subject: [PATCH 11/18] Update continaer value --- sdk/python/jobs/spark/setup_spark.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index daaa5dfff1..044eb639c7 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -117,7 +117,7 @@ then sed -i "s//$KEY_VAULT_NAME/g; s//$ACCESS_KEY_SECRET_NAME/g; s//$AZURE_STORAGE_ACCOUNT/g; - s//$BLOB_CONTAINER_NAME/g + s//$AZUREML_DEFAULT_CONTAINER/g s//$SAS_TOKEN_SECRET_NAME/g; s//$GEN2_STORAGE_ACCOUNT_NAME/g s//$FILE_SYSTEM_NAME/g; From 37cec950926e5c31220a87a671b2cd42bd6b9878 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Tue, 11 Jul 2023 12:46:07 -0700 Subject: [PATCH 12/18] Update expiry time --- sdk/python/jobs/spark/setup_spark.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index 044eb639c7..a542fac74b 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -88,7 +88,7 @@ then az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY END_TIME=`date -u -d "60 minutes" '+%Y-%m-%dT%H:%MZ'` - SAS_TOKEN=`az storage container generate-sas -n $AZUREML_DEFAULT_CONTAINER --account-name $AZURE_STORAGE_ACCOUNT --https-only --permissions dlrw --expiry $end -o tsv` + SAS_TOKEN=`az storage container generate-sas -n $AZUREML_DEFAULT_CONTAINER --account-name $AZURE_STORAGE_ACCOUNT --https-only --permissions dlrw --expiry $END_TIME -o tsv` SAS_TOKEN_SECRET_NAME="autotestsastoken" az keyvault secret set --name $SAS_TOKEN_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SAS_TOKEN From de90d64d6a2ba7bac6ecb9bd9271595aa5a13f99 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Tue, 11 Jul 2023 13:58:47 -0700 Subject: [PATCH 13/18] upload wrangling data to gen2 storage --- sdk/python/data-wrangling/interactive_data_wrangling.ipynb | 2 +- sdk/python/data-wrangling/interactive_data_wrangling.py | 4 ++-- sdk/python/jobs/spark/run_interactive_session_notebook.ipynb | 3 ++- sdk/python/jobs/spark/setup_spark.sh | 4 ++++ 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb index f4da9fe0dd..8abdb7b00f 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb +++ b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb @@ -272,7 +272,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "- To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity.\n", + "- To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the Service Principal.\n", "- Set configuration properties as follows:\n", " - Client ID property: `fs.azure.account.oauth2.client.id..dfs.core.windows.net`\n", " - Client secret property: `fs.azure.account.oauth2.client.secret..dfs.core.windows.net`\n", diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.py b/sdk/python/data-wrangling/interactive_data_wrangling.py index e46df05e75..edfa031307 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.py +++ b/sdk/python/data-wrangling/interactive_data_wrangling.py @@ -85,13 +85,13 @@ ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( - "abfss://@.dfs.core.windows.net/data/wrangled", + "abfss://@.dfs.core.windows.net/data/wrangled", index_col="PassengerId", ) ### Access and wrangle ADLS Gen 2 data using Service Principal -#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity. +#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the Service Principal. #### - Set configuration properties as follows: #### - Client ID property: `fs.azure.account.oauth2.client.id..dfs.core.windows.net` #### - Client secret property: `fs.azure.account.oauth2.client.secret..dfs.core.windows.net` diff --git a/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb b/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb index 4450a4fdd1..6db7bd28c4 100644 --- a/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb +++ b/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb @@ -5,7 +5,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Use an attached Synapse Spark pool" + "## Use a serverless Spark compute" ] }, { @@ -51,6 +51,7 @@ "\n", "returned_spark_job = ml_client.jobs.create_or_update(spark_job)\n", "\n", + "print(returned_spark_job.id)\n", "# Wait until the job completes\n", "ml_client.jobs.stream(returned_spark_job.name)" ] diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index a542fac74b..648033029a 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -98,6 +98,9 @@ then az storage fs create -n $FILE_SYSTEM_NAME --account-name $GEN2_STORAGE_ACCOUNT_NAME az role assignment create --role "Storage Blob Data Contributor" --assignee $AML_USER_MANAGED_ID_OID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME + TITANIC_DATA_FILE="titanic.csv" + az storage fs file upload --file-system $FILE_SYSTEM_NAME --source ./data-wrangling/data/$TITANIC_DATA_FILE --path data/$TITANIC_DATA_FILE --account-name $GEN2_STORAGE_ACCOUNT_NAME + SERVICE_PRINCIPAL_NAME="${RESOURCE_GROUP}sp" az ad sp create-for-rbac --name $SERVICE_PRINCIPAL_NAME LIST_SP_DETAILS=$(az ad sp list --display-name $SERVICE_PRINCIPAL_NAME) @@ -113,6 +116,7 @@ then az keyvault secret set --name $TENANT_ID_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SP_TENANTID az keyvault secret set --name $CLIENT_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SPA_SP_SECRET az role assignment create --role "Storage Blob Data Contributor" --assignee $SP_APPID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME + az role assignment create --role "Contributor" --assignee $SP_APPID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME sed -i "s//$KEY_VAULT_NAME/g; s//$ACCESS_KEY_SECRET_NAME/g; From 17e5805605fd9086d93dba0dacca6d2baf85c1e1 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Thu, 13 Jul 2023 12:41:21 -0700 Subject: [PATCH 14/18] Remove gen2 using service principal --- .../interactive_data_wrangling.py | 190 +++++++++--------- .../create_credential_less_data_store.yml | 6 + sdk/python/jobs/spark/setup_spark.sh | 44 ++-- 3 files changed, 125 insertions(+), 115 deletions(-) create mode 100644 sdk/python/jobs/spark/create_credential_less_data_store.yml diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.py b/sdk/python/data-wrangling/interactive_data_wrangling.py index edfa031307..3974bef94b 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.py +++ b/sdk/python/data-wrangling/interactive_data_wrangling.py @@ -97,56 +97,56 @@ #### - Client secret property: `fs.azure.account.oauth2.client.secret..dfs.core.windows.net` #### - Tenant ID property: `fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net` #### - Tenant ID value: `https://login.microsoftonline.com//oauth2/token` -from pyspark.sql import SparkSession - -sc = SparkSession.builder.getOrCreate() -token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary - -# Set up service principal tenant ID, client ID and secret from Azure Key Vault -client_id = token_library.getSecret("", "") -tenant_id = token_library.getSecret("", "") -client_secret = token_library.getSecret("", "") - -# Set up service principal which has access of the data -sc._jsc.hadoopConfiguration().set( - "fs.azure.account.auth.type..dfs.core.windows.net", "OAuth" -) -sc._jsc.hadoopConfiguration().set( - "fs.azure.account.oauth.provider.type..dfs.core.windows.net", - "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", -) -sc._jsc.hadoopConfiguration().set( - "fs.azure.account.oauth2.client.id..dfs.core.windows.net", - client_id, -) -sc._jsc.hadoopConfiguration().set( - "fs.azure.account.oauth2.client.secret..dfs.core.windows.net", - client_secret, -) -sc._jsc.hadoopConfiguration().set( - "fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", - "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", -) - -#### - Access data using `abfss://` URI and perform data wrangling. -import pyspark.pandas as pd -from pyspark.ml.feature import Imputer - -df = pd.read_csv( - "abfss://@.dfs.core.windows.net/data/titanic.csv", - index_col="PassengerId", -) -imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( - "mean" -) # Replace missing values in Age column with the mean value -df.fillna( - value={"Cabin": "None"}, inplace=True -) # Fill Cabin column with value "None" if missing -df.dropna(inplace=True) # Drop the rows which still have any missing value -df.to_csv( - "abfss://@.dfs.core.windows.net/data/wrangled", - index_col="PassengerId", -) +# from pyspark.sql import SparkSession + +# sc = SparkSession.builder.getOrCreate() +# token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary + +# # Set up service principal tenant ID, client ID and secret from Azure Key Vault +# client_id = token_library.getSecret("", "") +# tenant_id = token_library.getSecret("", "") +# client_secret = token_library.getSecret("", "") + +# # Set up service principal which has access of the data +# sc._jsc.hadoopConfiguration().set( +# "fs.azure.account.auth.type..dfs.core.windows.net", "OAuth" +# ) +# sc._jsc.hadoopConfiguration().set( +# "fs.azure.account.oauth.provider.type..dfs.core.windows.net", +# "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", +# ) +# sc._jsc.hadoopConfiguration().set( +# "fs.azure.account.oauth2.client.id..dfs.core.windows.net", +# client_id, +# ) +# sc._jsc.hadoopConfiguration().set( +# "fs.azure.account.oauth2.client.secret..dfs.core.windows.net", +# client_secret, +# ) +# sc._jsc.hadoopConfiguration().set( +# "fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", +# "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", +# ) + +# #### - Access data using `abfss://` URI and perform data wrangling. +# import pyspark.pandas as pd +# from pyspark.ml.feature import Imputer + +# df = pd.read_csv( +# "abfss://@.dfs.core.windows.net/data/titanic.csv", +# index_col="PassengerId", +# ) +# imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( +# "mean" +# ) # Replace missing values in Age column with the mean value +# df.fillna( +# value={"Cabin": "None"}, inplace=True +# ) # Fill Cabin column with value "None" if missing +# df.dropna(inplace=True) # Drop the rows which still have any missing value +# df.to_csv( +# "abfss://@.dfs.core.windows.net/data/wrangled", +# index_col="PassengerId", +# ) ### Access and wrangle data using credentialed AzureML Blob Datastore #### - Access data using `azureml://` URI and perform data wrangling. @@ -191,51 +191,51 @@ index_col="PassengerId", ) -### Access credentialed AzureML ADLS Gen 2 Datastore -#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the service principal used by datastore. -#### - Access data using `azureml://` URI and perform data wrangling. -import pyspark.pandas as pd -from pyspark.ml.feature import Imputer - -df = pd.read_csv( - "azureml://datastores/adlsg2datastore/paths/data/titanic.csv", - index_col="PassengerId", -) -imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( - "mean" -) # Replace missing values in Age column with the mean value -df.fillna( - value={"Cabin": "None"}, inplace=True -) # Fill Cabin column with value "None" if missing -df.dropna(inplace=True) # Drop the rows which still have any missing value -df.to_csv( - "azureml://datastores/credadlsg2datastore/paths/data/wrangled", - index_col="PassengerId", -) - -### Access and wrangle data using credentialless AzureML ADLS Gen 2 Datastore -#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity on the Azure Data Lake Storage (ADLS) Gen 2 storage account that the datastore points to. -#### - Access data using `azureml://` URI and perform data wrangling. -import pyspark.pandas as pd -from pyspark.ml.feature import Imputer - -df = pd.read_csv( - "azureml://datastores/credlessadlsg2datastore/paths/data/titanic.csv", - index_col="PassengerId", -) -imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( - "mean" -) # Replace missing values in Age column with the mean value -df.fillna( - value={"Cabin": "None"}, inplace=True -) # Fill Cabin column with value "None" if missing -df.dropna(inplace=True) # Drop the rows which still have any missing value -df.to_csv( - "azureml://datastores/credlessadlsg2datastore/paths/data/wrangled", - index_col="PassengerId", -) - -### Access mounted File Share +# ## Access credentialed AzureML ADLS Gen 2 Datastore +# ### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the service principal used by datastore. +# ### - Access data using `azureml://` URI and perform data wrangling. +# import pyspark.pandas as pd +# from pyspark.ml.feature import Imputer + +# df = pd.read_csv( +# "azureml://datastores/adlsg2datastore/paths/data/titanic.csv", +# index_col="PassengerId", +# ) +# imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( +# "mean" +# ) # Replace missing values in Age column with the mean value +# df.fillna( +# value={"Cabin": "None"}, inplace=True +# ) # Fill Cabin column with value "None" if missing +# df.dropna(inplace=True) # Drop the rows which still have any missing value +# df.to_csv( +# "azureml://datastores/credadlsg2datastore/paths/data/wrangled", +# index_col="PassengerId", +# ) + +# ### Access and wrangle data using credentialless AzureML ADLS Gen 2 Datastore +# #### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity on the Azure Data Lake Storage (ADLS) Gen 2 storage account that the datastore points to. +# #### - Access data using `azureml://` URI and perform data wrangling. +# import pyspark.pandas as pd +# from pyspark.ml.feature import Imputer + +# df = pd.read_csv( +# "azureml://datastores/credlessadlsg2datastore/paths/data/titanic.csv", +# index_col="PassengerId", +# ) +# imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( +# "mean" +# ) # Replace missing values in Age column with the mean value +# df.fillna( +# value={"Cabin": "None"}, inplace=True +# ) # Fill Cabin column with value "None" if missing +# df.dropna(inplace=True) # Drop the rows which still have any missing value +# df.to_csv( +# "azureml://datastores/credlessadlsg2datastore/paths/data/wrangled", +# index_col="PassengerId", +# ) + +# ## Access mounted File Share #### Access data on mounted File share by constructing absolute path. import os import pyspark.pandas as pd diff --git a/sdk/python/jobs/spark/create_credential_less_data_store.yml b/sdk/python/jobs/spark/create_credential_less_data_store.yml new file mode 100644 index 0000000000..4c56659dcb --- /dev/null +++ b/sdk/python/jobs/spark/create_credential_less_data_store.yml @@ -0,0 +1,6 @@ +$schema: https://azuremlschemas.azureedge.net/latest/azureBlob.schema.json +name: +type: azure_blob +description: Credential-less datastore pointing to a blob container. +account_name: +container_name: \ No newline at end of file diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index 648033029a..0309251741 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -17,6 +17,7 @@ SQL_ADMIN_LOGIN_PASSWORD="auto123!" SPARK_POOL_NAME="automationpool" SPARK_POOL_ADMIN_ROLE_ID="6e4bf58a-b8e1-4cc3-bbf9-d73143322b78" USER_IDENTITY_YML="jobs/spark/user-assigned-identity.yml" +CREAT_CREDENTIAL_LESS_DS_YML="jobs/spark/create_credential_less_data_store.yml" AZURE_REGION_NAME=${LOCATION} OUTBOUND_RULE_NAME="automationtestrule" # @@ -101,22 +102,28 @@ then TITANIC_DATA_FILE="titanic.csv" az storage fs file upload --file-system $FILE_SYSTEM_NAME --source ./data-wrangling/data/$TITANIC_DATA_FILE --path data/$TITANIC_DATA_FILE --account-name $GEN2_STORAGE_ACCOUNT_NAME - SERVICE_PRINCIPAL_NAME="${RESOURCE_GROUP}sp" - az ad sp create-for-rbac --name $SERVICE_PRINCIPAL_NAME - LIST_SP_DETAILS=$(az ad sp list --display-name $SERVICE_PRINCIPAL_NAME) - SP_APPID=$(echo $LIST_SP_DETAILS | jq -r '[0].appId') - SP_OBJECTID=$(echo $LIST_SP_DETAILS | jq -r '[0].id') - SP_TENANTID=$(echo $LIST_SP_DETAILS | jq -r '[0].appOwnerOrganizationId') - SPA_SP_SECRET=$(az ad sp credential reset --id $SP_OBJECTID --query "password") - USER="azuremlsdk" - CLIENT_ID_SECRET_NAME="autotestspsecretclient" - TENANT_ID_SECRET_NAME="autotestspsecrettenant" - CLIENT_SECRET_NAME="autotestspsecret" - az keyvault secret set --name $CLIENT_ID_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SP_APPID - az keyvault secret set --name $TENANT_ID_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SP_TENANTID - az keyvault secret set --name $CLIENT_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SPA_SP_SECRET - az role assignment create --role "Storage Blob Data Contributor" --assignee $SP_APPID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME - az role assignment create --role "Contributor" --assignee $SP_APPID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME + # SERVICE_PRINCIPAL_NAME="${RESOURCE_GROUP}sp" + # az ad sp create-for-rbac --name $SERVICE_PRINCIPAL_NAME + # LIST_SP_DETAILS=$(az ad sp list --display-name $SERVICE_PRINCIPAL_NAME) + # SP_APPID=$(echo $LIST_SP_DETAILS | jq -r '[0].appId') + # SP_OBJECTID=$(echo $LIST_SP_DETAILS | jq -r '[0].id') + # SP_TENANTID=$(echo $LIST_SP_DETAILS | jq -r '[0].appOwnerOrganizationId') + # SPA_SP_SECRET=$(az ad sp credential reset --id $SP_OBJECTID --query "password") + # USER="azuremlsdk" + # CLIENT_ID_SECRET_NAME="autotestspsecretclient" + # TENANT_ID_SECRET_NAME="autotestspsecrettenant" + # CLIENT_SECRET_NAME="autotestspsecret" + # az keyvault secret set --name $CLIENT_ID_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SP_APPID + # az keyvault secret set --name $TENANT_ID_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SP_TENANTID + # az keyvault secret set --name $CLIENT_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $SPA_SP_SECRET + # az role assignment create --role "Storage Blob Data Contributor" --assignee $SP_APPID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME + # az role assignment create --role "Contributor" --assignee $SP_APPID --scope /subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.Storage/storageAccounts/$GEN2_STORAGE_ACCOUNT_NAME/blobServices/default/containers/$FILE_SYSTEM_NAME + + CREDENTIAL_LESS_DATA_STORE_NAME="credlessblobdatastore" + sed -i "s//$AZURE_STORAGE_ACCOUNT/g; + s//$AZUREML_DEFAULT_CONTAINER/g + s//$CREDENTIAL_LESS_DATA_STORE_NAME/g;" $CREAT_CREDENTIAL_LESS_DS_YML + az ml datastore create --file $CREAT_CREDENTIAL_LESS_DS_YML --resource-group $RESOURCE_GROUP --workspace-name $AML_WORKSPACE_NAME sed -i "s//$KEY_VAULT_NAME/g; s//$ACCESS_KEY_SECRET_NAME/g; @@ -125,9 +132,6 @@ then s//$SAS_TOKEN_SECRET_NAME/g; s//$GEN2_STORAGE_ACCOUNT_NAME/g s//$FILE_SYSTEM_NAME/g; - s//$CLIENT_ID_SECRET_NAME/g; - s//$TENANT_ID_SECRET_NAME/g; - s//$CLIENT_SECRET_NAME/g; s//$USER/g;" $NOTEBOOK_PY # else @@ -176,7 +180,7 @@ else az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $COMPUTE_MANAGED_IDENTITY fi - az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $AML_USER_MANAGED_ID + az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $AML_USER_MANAGED_ID_OID fi # From bc3f29255b18c7fe4bb1dc07534ab0fb9c4820a3 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Thu, 13 Jul 2023 13:02:18 -0700 Subject: [PATCH 15/18] Remove session mount script --- .../interactive_data_wrangling.py | 18 +++++++++--------- sdk/python/jobs/spark/setup_spark.sh | 7 +++---- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.py b/sdk/python/data-wrangling/interactive_data_wrangling.py index 3974bef94b..51db0b2592 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.py +++ b/sdk/python/data-wrangling/interactive_data_wrangling.py @@ -235,13 +235,13 @@ # index_col="PassengerId", # ) -# ## Access mounted File Share -#### Access data on mounted File share by constructing absolute path. -import os -import pyspark.pandas as pd +# # ## Access mounted File Share +# #### Access data on mounted File share by constructing absolute path. +# import os +# import pyspark.pandas as pd -abspath = os.path.abspath(".") -file = "file://" + abspath + "/Users//data/titanic.csv" -print(file) -df = pd.read_csv(file) -df.head() +# abspath = os.path.abspath(".") +# file = "file://" + abspath + "/Users//data/titanic.csv" +# print(file) +# df = pd.read_csv(file) +# df.head() diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index 0309251741..e713e4b26f 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -109,7 +109,7 @@ then # SP_OBJECTID=$(echo $LIST_SP_DETAILS | jq -r '[0].id') # SP_TENANTID=$(echo $LIST_SP_DETAILS | jq -r '[0].appOwnerOrganizationId') # SPA_SP_SECRET=$(az ad sp credential reset --id $SP_OBJECTID --query "password") - # USER="azuremlsdk" + # CLIENT_ID_SECRET_NAME="autotestspsecretclient" # TENANT_ID_SECRET_NAME="autotestspsecrettenant" # CLIENT_SECRET_NAME="autotestspsecret" @@ -124,15 +124,14 @@ then s//$AZUREML_DEFAULT_CONTAINER/g s//$CREDENTIAL_LESS_DATA_STORE_NAME/g;" $CREAT_CREDENTIAL_LESS_DS_YML az ml datastore create --file $CREAT_CREDENTIAL_LESS_DS_YML --resource-group $RESOURCE_GROUP --workspace-name $AML_WORKSPACE_NAME - + # USER="azuremlsdk" sed -i "s//$KEY_VAULT_NAME/g; s//$ACCESS_KEY_SECRET_NAME/g; s//$AZURE_STORAGE_ACCOUNT/g; s//$AZUREML_DEFAULT_CONTAINER/g s//$SAS_TOKEN_SECRET_NAME/g; s//$GEN2_STORAGE_ACCOUNT_NAME/g - s//$FILE_SYSTEM_NAME/g; - s//$USER/g;" $NOTEBOOK_PY + s//$FILE_SYSTEM_NAME/g;" $NOTEBOOK_PY # else # From ae90c4161211119274a1ccc722673c5cdc213df9 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Fri, 14 Jul 2023 12:06:15 -0700 Subject: [PATCH 16/18] Move test file into folder and updae variables --- .../interactive_data_wrangling.ipynb | 68 +++++--- .../interactive_data_wrangling.py | 151 ++++-------------- .../create_credential_less_data_store.yml | 0 .../run_interactive_session_notebook.ipynb | 2 +- .../user-assigned-identity.yml | 0 sdk/python/jobs/spark/setup_spark.sh | 36 +++-- .../spark/submit_spark_pipeline_jobs.ipynb | 9 +- .../spark/submit_spark_standalone_jobs.ipynb | 9 +- 8 files changed, 110 insertions(+), 165 deletions(-) rename sdk/python/jobs/spark/{ => automation}/create_credential_less_data_store.yml (100%) rename sdk/python/jobs/spark/{ => automation}/run_interactive_session_notebook.ipynb (98%) rename sdk/python/jobs/spark/{ => automation}/user-assigned-identity.yml (100%) diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb index 8abdb7b00f..3e23bf6461 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb +++ b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb @@ -49,11 +49,15 @@ "source": [ "from pyspark.sql import SparkSession\n", "\n", + "key_vault_name = \"\"\n", + "access_key_secret_name = \"\"\n", + "storage_account_name = \"\"\n", + "\n", "sc = SparkSession.builder.getOrCreate()\n", "token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary\n", - "access_key = token_library.getSecret(\"\", \"\")\n", + "access_key = token_library.getSecret(key_vault_name, access_key_secret_name)\n", "sc._jsc.hadoopConfiguration().set(\n", - " \"fs.azure.account.key..blob.core.windows.net\", access_key\n", + " f\"fs.azure.account.key.{storage_account_name}.blob.core.windows.net\", access_key\n", ")" ] }, @@ -84,8 +88,11 @@ "import pyspark.pandas as pd\n", "from pyspark.ml.feature import Imputer\n", "\n", + "blob_container_name = \"\"\n", + "storage_account_name = \"\"\n", + "\n", "df = pd.read_csv(\n", - " \"wasbs://@.blob.core.windows.net/data/titanic.csv\",\n", + " f\"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/titanic.csv\",\n", " index_col=\"PassengerId\",\n", ")\n", "imputer = Imputer(inputCols=[\"Age\"], outputCol=\"Age\").setStrategy(\n", @@ -96,7 +103,7 @@ ") # Fill Cabin column with value \"None\" if missing\n", "df.dropna(inplace=True) # Drop the rows which still have any missing value\n", "df.to_csv(\n", - " \"wasbs://@.blob.core.windows.net/data/wrangled\",\n", + " f\"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/wrangled\",\n", " index_col=\"PassengerId\",\n", ")" ] @@ -141,11 +148,16 @@ "source": [ "from pyspark.sql import SparkSession\n", "\n", + "key_vault_name = \"\"\n", + "sas_token_secret_name = \"\"\n", + "blob_container_name = \"\"\n", + "storage_account_name = \"\"\n", + "\n", "sc = SparkSession.builder.getOrCreate()\n", "token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary\n", - "sas_token = token_library.getSecret(\"\", \"\")\n", + "sas_token = token_library.getSecret(key_vault_name, sas_token_secret_name)\n", "sc._jsc.hadoopConfiguration().set(\n", - " \"fs.azure.sas...blob.core.windows.net\",\n", + " f\"fs.azure.sas.{blob_container_name}.{storage_account_name}.blob.core.windows.net\",\n", " sas_token,\n", ")" ] @@ -177,8 +189,11 @@ "import pyspark.pandas as pd\n", "from pyspark.ml.feature import Imputer\n", "\n", + "blob_container_name = \"\"\n", + "blob_container_name = \"\"\n", + "\n", "df = pd.read_csv(\n", - " \"wasbs://@.blob.core.windows.net/data/titanic.csv\",\n", + " f\"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/titanic.csv\",\n", " index_col=\"PassengerId\",\n", ")\n", "imputer = Imputer(inputCols=[\"Age\"], outputCol=\"Age\").setStrategy(\n", @@ -189,7 +204,7 @@ ") # Fill Cabin column with value \"None\" if missing\n", "df.dropna(inplace=True) # Drop the rows which still have any missing value\n", "df.to_csv(\n", - " \"wasbs://@.blob.core.windows.net/data/wrangled\",\n", + " f\"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/wrangled\",\n", " index_col=\"PassengerId\",\n", ")" ] @@ -236,8 +251,11 @@ "import pyspark.pandas as pd\n", "from pyspark.ml.feature import Imputer\n", "\n", + "file_system_name = \"\"\n", + "gen2_storage_account_name = \"\"\n", + "\n", "df = pd.read_csv(\n", - " \"abfss://@.dfs.core.windows.net/data/titanic.csv\",\n", + " f\"abfss://{file_system_name}@{gen2_storage_account_name}.dfs.core.windows.net/data/titanic.csv\",\n", " index_col=\"PassengerId\",\n", ")\n", "imputer = Imputer(inputCols=[\"Age\"], outputCol=\"Age\").setStrategy(\n", @@ -248,7 +266,7 @@ ") # Fill Cabin column with value \"None\" if missing\n", "df.dropna(inplace=True) # Drop the rows which still have any missing value\n", "df.to_csv(\n", - " \"abfss://@.dfs.core.windows.net/data/wrangled\",\n", + " f\"abfss://{file_system_name}@{gen2_storage_account_name}.dfs.core.windows.net/data/wrangled\",\n", " index_col=\"PassengerId\",\n", ")" ] @@ -298,32 +316,39 @@ "source": [ "from pyspark.sql import SparkSession\n", "\n", + "key_vault_name = \"\"\n", + "client_id_secret_name = \"\"\n", + "tenant_id_secret_name = \"\"\n", + "client_secret_name = \"\"\n", + "gen2_storage_account_name = \"\"\n", + "\n", "sc = SparkSession.builder.getOrCreate()\n", "token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary\n", "\n", "# Set up service principal tenant ID, client ID and secret from Azure Key Vault\n", - "client_id = token_library.getSecret(\"\", \"\")\n", - "tenant_id = token_library.getSecret(\"\", \"\")\n", - "client_secret = token_library.getSecret(\"\", \"\")\n", + "client_id = token_library.getSecret(key_vault_name, client_id_secret_name)\n", + "tenant_id = token_library.getSecret(key_vault_name, tenant_id_secret_name)\n", + "client_secret = token_library.getSecret(key_vault_name, client_secret_name)\n", "\n", "# Set up service principal which has access of the data\n", "sc._jsc.hadoopConfiguration().set(\n", - " \"fs.azure.account.auth.type..dfs.core.windows.net\", \"OAuth\"\n", + " f\"fs.azure.account.auth.type.{gen2_storage_account_name}.dfs.core.windows.net\",\n", + " \"OAuth\",\n", ")\n", "sc._jsc.hadoopConfiguration().set(\n", - " \"fs.azure.account.oauth.provider.type..dfs.core.windows.net\",\n", + " f\"fs.azure.account.oauth.provider.type.{gen2_storage_account_name}.dfs.core.windows.net\",\n", " \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\",\n", ")\n", "sc._jsc.hadoopConfiguration().set(\n", - " \"fs.azure.account.oauth2.client.id..dfs.core.windows.net\",\n", + " f\"fs.azure.account.oauth2.client.id.{gen2_storage_account_name}.dfs.core.windows.net\",\n", " client_id,\n", ")\n", "sc._jsc.hadoopConfiguration().set(\n", - " \"fs.azure.account.oauth2.client.secret..dfs.core.windows.net\",\n", + " f\"fs.azure.account.oauth2.client.secret.{gen2_storage_account_name}.dfs.core.windows.net\",\n", " client_secret,\n", ")\n", "sc._jsc.hadoopConfiguration().set(\n", - " \"fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net\",\n", + " f\"fs.azure.account.oauth2.client.endpoint.{gen2_storage_account_name}.dfs.core.windows.net\",\n", " \"https://login.microsoftonline.com/\" + tenant_id + \"/oauth2/token\",\n", ")" ] @@ -355,8 +380,11 @@ "import pyspark.pandas as pd\n", "from pyspark.ml.feature import Imputer\n", "\n", + "file_system_name = \"\"\n", + "gen2_storage_account_name = \"\"\n", + "\n", "df = pd.read_csv(\n", - " \"abfss://@.dfs.core.windows.net/data/titanic.csv\",\n", + " f\"abfss://{file_system_name}@{gen2_storage_account_name}.dfs.core.windows.net/data/titanic.csv\",\n", " index_col=\"PassengerId\",\n", ")\n", "imputer = Imputer(inputCols=[\"Age\"], outputCol=\"Age\").setStrategy(\n", @@ -367,7 +395,7 @@ ") # Fill Cabin column with value \"None\" if missing\n", "df.dropna(inplace=True) # Drop the rows which still have any missing value\n", "df.to_csv(\n", - " \"abfss://@.dfs.core.windows.net/data/wrangled\",\n", + " f\"abfss://{file_system_name}@{gen2_storage_account_name}.dfs.core.windows.net/data/wrangled\",\n", " index_col=\"PassengerId\",\n", ")" ] diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.py b/sdk/python/data-wrangling/interactive_data_wrangling.py index 51db0b2592..05c07f54d7 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.py +++ b/sdk/python/data-wrangling/interactive_data_wrangling.py @@ -6,19 +6,26 @@ from pyspark.sql import SparkSession +key_vault_name = "" +access_key_secret_name = "" +storage_account_name = "" + sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary -access_key = token_library.getSecret("", "") +access_key = token_library.getSecret(key_vault_name, access_key_secret_name) sc._jsc.hadoopConfiguration().set( - "fs.azure.account.key..blob.core.windows.net", access_key + f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", access_key ) #### Access data using `wasbs://` URI and perform data wrangling. import pyspark.pandas as pd from pyspark.ml.feature import Imputer +blob_container_name = "" +storage_account_name = "" + df = pd.read_csv( - "wasbs://@.blob.core.windows.net/data/titanic.csv", + f"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( @@ -29,7 +36,7 @@ ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( - "wasbs://@.blob.core.windows.net/data/wrangled", + f"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/wrangled", index_col="PassengerId", ) @@ -38,11 +45,16 @@ #### First, set the SAS token as configuration property `fs.azure.sas...blob.core.windows.net`. from pyspark.sql import SparkSession +key_vault_name = "" +sas_token_secret_name = "" +blob_container_name = "" +storage_account_name = "" + sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary -sas_token = token_library.getSecret("", "") +sas_token = token_library.getSecret(key_vault_name, sas_token_secret_name) sc._jsc.hadoopConfiguration().set( - "fs.azure.sas...blob.core.windows.net", + f"fs.azure.sas.{blob_container_name}.{storage_account_name}.blob.core.windows.net", sas_token, ) @@ -50,8 +62,11 @@ import pyspark.pandas as pd from pyspark.ml.feature import Imputer +blob_container_name = "" +blob_container_name = "" + df = pd.read_csv( - "wasbs://@.blob.core.windows.net/data/titanic.csv", + f"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( @@ -62,7 +77,7 @@ ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( - "wasbs://@.blob.core.windows.net/data/wrangled", + f"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/wrangled", index_col="PassengerId", ) @@ -73,8 +88,11 @@ import pyspark.pandas as pd from pyspark.ml.feature import Imputer +file_system_name = "" +gen2_storage_account_name = "" + df = pd.read_csv( - "abfss://@.dfs.core.windows.net/data/titanic.csv", + f"abfss://{file_system_name}@{gen2_storage_account_name}.dfs.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( @@ -85,68 +103,10 @@ ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( - "abfss://@.dfs.core.windows.net/data/wrangled", + f"abfss://{file_system_name}@{gen2_storage_account_name}.dfs.core.windows.net/data/wrangled", index_col="PassengerId", ) -### Access and wrangle ADLS Gen 2 data using Service Principal - -#### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the Service Principal. -#### - Set configuration properties as follows: -#### - Client ID property: `fs.azure.account.oauth2.client.id..dfs.core.windows.net` -#### - Client secret property: `fs.azure.account.oauth2.client.secret..dfs.core.windows.net` -#### - Tenant ID property: `fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net` -#### - Tenant ID value: `https://login.microsoftonline.com//oauth2/token` -# from pyspark.sql import SparkSession - -# sc = SparkSession.builder.getOrCreate() -# token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary - -# # Set up service principal tenant ID, client ID and secret from Azure Key Vault -# client_id = token_library.getSecret("", "") -# tenant_id = token_library.getSecret("", "") -# client_secret = token_library.getSecret("", "") - -# # Set up service principal which has access of the data -# sc._jsc.hadoopConfiguration().set( -# "fs.azure.account.auth.type..dfs.core.windows.net", "OAuth" -# ) -# sc._jsc.hadoopConfiguration().set( -# "fs.azure.account.oauth.provider.type..dfs.core.windows.net", -# "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", -# ) -# sc._jsc.hadoopConfiguration().set( -# "fs.azure.account.oauth2.client.id..dfs.core.windows.net", -# client_id, -# ) -# sc._jsc.hadoopConfiguration().set( -# "fs.azure.account.oauth2.client.secret..dfs.core.windows.net", -# client_secret, -# ) -# sc._jsc.hadoopConfiguration().set( -# "fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", -# "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", -# ) - -# #### - Access data using `abfss://` URI and perform data wrangling. -# import pyspark.pandas as pd -# from pyspark.ml.feature import Imputer - -# df = pd.read_csv( -# "abfss://@.dfs.core.windows.net/data/titanic.csv", -# index_col="PassengerId", -# ) -# imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( -# "mean" -# ) # Replace missing values in Age column with the mean value -# df.fillna( -# value={"Cabin": "None"}, inplace=True -# ) # Fill Cabin column with value "None" if missing -# df.dropna(inplace=True) # Drop the rows which still have any missing value -# df.to_csv( -# "abfss://@.dfs.core.windows.net/data/wrangled", -# index_col="PassengerId", -# ) ### Access and wrangle data using credentialed AzureML Blob Datastore #### - Access data using `azureml://` URI and perform data wrangling. @@ -190,58 +150,3 @@ "azureml://datastores/credlessblobdatastore/paths/data/wrangled", index_col="PassengerId", ) - -# ## Access credentialed AzureML ADLS Gen 2 Datastore -# ### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the service principal used by datastore. -# ### - Access data using `azureml://` URI and perform data wrangling. -# import pyspark.pandas as pd -# from pyspark.ml.feature import Imputer - -# df = pd.read_csv( -# "azureml://datastores/adlsg2datastore/paths/data/titanic.csv", -# index_col="PassengerId", -# ) -# imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( -# "mean" -# ) # Replace missing values in Age column with the mean value -# df.fillna( -# value={"Cabin": "None"}, inplace=True -# ) # Fill Cabin column with value "None" if missing -# df.dropna(inplace=True) # Drop the rows which still have any missing value -# df.to_csv( -# "azureml://datastores/credadlsg2datastore/paths/data/wrangled", -# index_col="PassengerId", -# ) - -# ### Access and wrangle data using credentialless AzureML ADLS Gen 2 Datastore -# #### - To enable read and write access, assign **Contributor** and **Storage Blob Data Contributor** roles to the user identity on the Azure Data Lake Storage (ADLS) Gen 2 storage account that the datastore points to. -# #### - Access data using `azureml://` URI and perform data wrangling. -# import pyspark.pandas as pd -# from pyspark.ml.feature import Imputer - -# df = pd.read_csv( -# "azureml://datastores/credlessadlsg2datastore/paths/data/titanic.csv", -# index_col="PassengerId", -# ) -# imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( -# "mean" -# ) # Replace missing values in Age column with the mean value -# df.fillna( -# value={"Cabin": "None"}, inplace=True -# ) # Fill Cabin column with value "None" if missing -# df.dropna(inplace=True) # Drop the rows which still have any missing value -# df.to_csv( -# "azureml://datastores/credlessadlsg2datastore/paths/data/wrangled", -# index_col="PassengerId", -# ) - -# # ## Access mounted File Share -# #### Access data on mounted File share by constructing absolute path. -# import os -# import pyspark.pandas as pd - -# abspath = os.path.abspath(".") -# file = "file://" + abspath + "/Users//data/titanic.csv" -# print(file) -# df = pd.read_csv(file) -# df.head() diff --git a/sdk/python/jobs/spark/create_credential_less_data_store.yml b/sdk/python/jobs/spark/automation/create_credential_less_data_store.yml similarity index 100% rename from sdk/python/jobs/spark/create_credential_less_data_store.yml rename to sdk/python/jobs/spark/automation/create_credential_less_data_store.yml diff --git a/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb b/sdk/python/jobs/spark/automation/run_interactive_session_notebook.ipynb similarity index 98% rename from sdk/python/jobs/spark/run_interactive_session_notebook.ipynb rename to sdk/python/jobs/spark/automation/run_interactive_session_notebook.ipynb index 6db7bd28c4..1d87178f33 100644 --- a/sdk/python/jobs/spark/run_interactive_session_notebook.ipynb +++ b/sdk/python/jobs/spark/automation/run_interactive_session_notebook.ipynb @@ -36,7 +36,7 @@ "\n", "spark_job = spark(\n", " display_name=\"interactive_data_wrangling\",\n", - " code=\"../../data-wrangling\",\n", + " code=\"../../../data-wrangling\",\n", " entry={\"file\": \"interactive_data_wrangling.py\"},\n", " driver_cores=1,\n", " driver_memory=\"2g\",\n", diff --git a/sdk/python/jobs/spark/user-assigned-identity.yml b/sdk/python/jobs/spark/automation/user-assigned-identity.yml similarity index 100% rename from sdk/python/jobs/spark/user-assigned-identity.yml rename to sdk/python/jobs/spark/automation/user-assigned-identity.yml diff --git a/sdk/python/jobs/spark/setup_spark.sh b/sdk/python/jobs/spark/setup_spark.sh index e713e4b26f..4c9c98ddca 100644 --- a/sdk/python/jobs/spark/setup_spark.sh +++ b/sdk/python/jobs/spark/setup_spark.sh @@ -16,8 +16,8 @@ SQL_ADMIN_LOGIN_USER="automation" SQL_ADMIN_LOGIN_PASSWORD="auto123!" SPARK_POOL_NAME="automationpool" SPARK_POOL_ADMIN_ROLE_ID="6e4bf58a-b8e1-4cc3-bbf9-d73143322b78" -USER_IDENTITY_YML="jobs/spark/user-assigned-identity.yml" -CREAT_CREDENTIAL_LESS_DS_YML="jobs/spark/create_credential_less_data_store.yml" +USER_IDENTITY_YML=$1"automation/user-assigned-identity.yml" +CREAT_CREDENTIAL_LESS_DS_YML=$1"automation/create_credential_less_data_store.yml" AZURE_REGION_NAME=${LOCATION} OUTBOUND_RULE_NAME="automationtestrule" # @@ -64,7 +64,11 @@ then KEY_VAULT_NAME=$(basename "$KEY_VAULT") az keyvault secret set --name $ACCESS_KEY_SECRET_NAME --vault-name $KEY_VAULT_NAME --value $ACCOUNT_KEY - sed -i "s//$AZURE_REGION_NAME/g; + # + sed -i "s//$SUBSCRIPTION_ID/g; + s//$RESOURCE_GROUP/g; + s//$AML_WORKSPACE_NAME/g; + s//$AZURE_REGION_NAME/g; s//$AZURE_STORAGE_ACCOUNT/g; s//$OUTBOUND_RULE_NAME/g; s//$KEY_VAULT_NAME/g; @@ -132,6 +136,10 @@ then s//$SAS_TOKEN_SECRET_NAME/g; s//$GEN2_STORAGE_ACCOUNT_NAME/g s//$FILE_SYSTEM_NAME/g;" $NOTEBOOK_PY + + sed -i "s//$SUBSCRIPTION_ID/g; + s//$RESOURCE_GROUP/g; + s//$AML_WORKSPACE_NAME/g;" $2 # else # @@ -153,14 +161,14 @@ else # sed -i "s//$SUBSCRIPTION_ID/g; - s//$RESOURCE_GROUP/g; - s//$AML_WORKSPACE_NAME/g; - s//$ATTACHED_SPARK_POOL_NAME/g; - s//$SYNAPSE_WORKSPACE_NAME/g; - s//$SPARK_POOL_NAME/g; - s//$AML_USER_MANAGED_ID/g; - s//$ATTACHED_SPARK_POOL_NAME_UAI/g; - s//$AML_USER_MANAGED_ID/g;" $ATTACH_SPARK_PY + s//$RESOURCE_GROUP/g; + s//$AML_WORKSPACE_NAME/g; + s//$ATTACHED_SPARK_POOL_NAME/g; + s//$SYNAPSE_WORKSPACE_NAME/g; + s//$SPARK_POOL_NAME/g; + s//$AML_USER_MANAGED_ID/g; + s//$ATTACHED_SPARK_POOL_NAME_UAI/g; + s//$AML_USER_MANAGED_ID/g;" $ATTACH_SPARK_PY python $ATTACH_SPARK_PY # @@ -180,10 +188,8 @@ else fi az synapse role assignment create --workspace-name $SYNAPSE_WORKSPACE_NAME --role $SPARK_POOL_ADMIN_ROLE_ID --assignee $AML_USER_MANAGED_ID_OID -fi -# -sed -i "s//$SUBSCRIPTION_ID/g; + sed -i "s//$SUBSCRIPTION_ID/g; s//$RESOURCE_GROUP/g; s//$AML_WORKSPACE_NAME/g; s//$SYNAPSE_WORKSPACE_NAME/g; @@ -192,4 +198,4 @@ sed -i "s//$SUBSCRIPTION_ID/g; s//$AML_USER_MANAGED_ID/g; s//$ATTACHED_SPARK_POOL_NAME_UAI/g; s//$AML_USER_MANAGED_ID/g;" $2 -# \ No newline at end of file +fi diff --git a/sdk/python/jobs/spark/submit_spark_pipeline_jobs.ipynb b/sdk/python/jobs/spark/submit_spark_pipeline_jobs.ipynb index 0c3594274a..8f7b672f80 100644 --- a/sdk/python/jobs/spark/submit_spark_pipeline_jobs.ipynb +++ b/sdk/python/jobs/spark/submit_spark_pipeline_jobs.ipynb @@ -55,6 +55,7 @@ "subscription_id = \"\"\n", "resource_group = \"\"\n", "workspace = \"\"\n", + "attached_spark_pool_name = \"\"\n", "ml_client = MLClient(\n", " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", ")\n", @@ -91,7 +92,7 @@ " )\n", " spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT\n", " spark_step.identity = ManagedIdentityConfiguration()\n", - " spark_step.compute = \"\"\n", + " spark_step.compute = attached_spark_pool_name\n", "\n", "\n", "pipeline = spark_pipeline(\n", @@ -132,6 +133,7 @@ "subscription_id = \"\"\n", "resource_group = \"\"\n", "workspace = \"\"\n", + "attached_spark_pool_name_uai = \"\"\n", "ml_client = MLClient(\n", " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", ")\n", @@ -168,7 +170,7 @@ " )\n", " spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT\n", " spark_step.identity = UserIdentityConfiguration()\n", - " spark_step.compute = \"\"\n", + " spark_step.compute = attached_spark_pool_name_uai\n", "\n", "\n", "pipeline = spark_pipeline(\n", @@ -209,6 +211,7 @@ "subscription_id = \"\"\n", "resource_group = \"\"\n", "workspace = \"\"\n", + "attached_spark_pool_name = \"\"\n", "ml_client = MLClient(\n", " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", ")\n", @@ -242,7 +245,7 @@ " path=\"azureml://datastores/workspaceblobstore/paths/data/wrangled/\",\n", " )\n", " spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT\n", - " spark_step.compute = \"\"\n", + " spark_step.compute = attached_spark_pool_name\n", "\n", "\n", "pipeline = spark_pipeline(\n", diff --git a/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb b/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb index b202b67b7a..a435b35d42 100644 --- a/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb +++ b/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb @@ -54,6 +54,7 @@ "subscription_id = \"\"\n", "resource_group = \"\"\n", "workspace = \"\"\n", + "attached_spark_pool_name = \"\"\n", "ml_client = MLClient(\n", " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", ")\n", @@ -67,7 +68,7 @@ " executor_cores=2,\n", " executor_memory=\"2g\",\n", " executor_instances=2,\n", - " compute=\"\",\n", + " compute=attached_spark_pool_name\n", " inputs={\n", " \"titanic_data\": Input(\n", " type=\"uri_file\",\n", @@ -113,6 +114,7 @@ "subscription_id = \"\"\n", "resource_group = \"\"\n", "workspace = \"\"\n", + "attached_spark_pool_name = \"\"\n", "ml_client = MLClient(\n", " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", ")\n", @@ -126,7 +128,7 @@ " executor_cores=2,\n", " executor_memory=\"2g\",\n", " executor_instances=2,\n", - " compute=\"\",\n", + " compute=attached_spark_pool_name,\n", " inputs={\n", " \"titanic_data\": Input(\n", " type=\"uri_file\",\n", @@ -172,6 +174,7 @@ "subscription_id = \"\"\n", "resource_group = \"\"\n", "workspace = \"\"\n", + "attached_spark_pool_name = \"\"\n", "ml_client = MLClient(\n", " DefaultAzureCredential(), subscription_id, resource_group, workspace\n", ")\n", @@ -185,7 +188,7 @@ " executor_cores=2,\n", " executor_memory=\"2g\",\n", " executor_instances=2,\n", - " compute=\"\",\n", + " compute=attached_spark_pool_name,\n", " inputs={\n", " \"titanic_data\": Input(\n", " type=\"uri_file\",\n", From 094b81bb6816b16be329d6e45b770903d7cbbd45 Mon Sep 17 00:00:00 2001 From: Fred Li Date: Fri, 14 Jul 2023 12:24:10 -0700 Subject: [PATCH 17/18] Update to new workflow --- ...utomation-run_interactive_session_notebook.yml} | 14 +++++++------- .../jobs/spark/submit_spark_standalone_jobs.ipynb | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) rename .github/workflows/{sdk-jobs-spark-run_interactive_session_notebook.yml => sdk-jobs-spark-automation-run_interactive_session_notebook.yml} (86%) diff --git a/.github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml b/.github/workflows/sdk-jobs-spark-automation-run_interactive_session_notebook.yml similarity index 86% rename from .github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml rename to .github/workflows/sdk-jobs-spark-automation-run_interactive_session_notebook.yml index 474544653c..8a2aa179ee 100644 --- a/.github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml +++ b/.github/workflows/sdk-jobs-spark-automation-run_interactive_session_notebook.yml @@ -3,7 +3,7 @@ # Any manual changes to this file may cause incorrect behavior. # Any manual changes will be overwritten if the code is regenerated. -name: sdk-jobs-spark-run_interactive_session_notebook +name: sdk-jobs-spark-automation-run_interactive_session_notebook # This file is created by sdk/python/readme.py. # Please do not edit directly. on: @@ -14,8 +14,8 @@ on: branches: - main paths: - - sdk/python/jobs/spark/** - - .github/workflows/sdk-jobs-spark-run_interactive_session_notebook.yml + - sdk/python/jobs/spark/automation/** + - .github/workflows/sdk-jobs-spark-automation-run_interactive_session_notebook.yml - sdk/python/dev-requirements.txt - infra/bootstrapping/** - sdk/python/setup.sh @@ -60,10 +60,10 @@ jobs: continue-on-error: true - name: setup spark resources run: | - bash -x jobs/spark/setup_spark.sh jobs/spark/ jobs/spark/run_interactive_session_notebook.ipynb + bash -x jobs/spark/setup_spark.sh jobs/spark/ jobs/spark/automation/run_interactive_session_notebook.ipynb working-directory: sdk/python continue-on-error: true - - name: run jobs/spark/run_interactive_session_notebook.ipynb + - name: run jobs/spark/automation/run_interactive_session_notebook.ipynb run: | source "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh"; source "${{ github.workspace }}/infra/bootstrapping/init_environment.sh"; @@ -71,10 +71,10 @@ jobs: bash "${{ github.workspace }}/infra/bootstrapping/sdk_helpers.sh" replace_template_values "run_interactive_session_notebook.ipynb"; [ -f "../../.azureml/config" ] && cat "../../.azureml/config"; papermill -k python run_interactive_session_notebook.ipynb run_interactive_session_notebook.output.ipynb - working-directory: sdk/python/jobs/spark + working-directory: sdk/python/jobs/spark/automation - name: upload notebook's working folder as an artifact if: ${{ always() }} uses: actions/upload-artifact@v2 with: name: run_interactive_session_notebook - path: sdk/python/jobs/spark + path: sdk/python/jobs/spark/automation diff --git a/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb b/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb index a435b35d42..522a421ad7 100644 --- a/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb +++ b/sdk/python/jobs/spark/submit_spark_standalone_jobs.ipynb @@ -68,7 +68,7 @@ " executor_cores=2,\n", " executor_memory=\"2g\",\n", " executor_instances=2,\n", - " compute=attached_spark_pool_name\n", + " compute=attached_spark_pool_name,\n", " inputs={\n", " \"titanic_data\": Input(\n", " type=\"uri_file\",\n", From cdf3acde3c7c09494e7732c0cb6fd51ed58aa3ef Mon Sep 17 00:00:00 2001 From: Fred Li Date: Fri, 14 Jul 2023 12:51:52 -0700 Subject: [PATCH 18/18] Update blob storage name --- sdk/python/data-wrangling/interactive_data_wrangling.ipynb | 6 +++--- sdk/python/data-wrangling/interactive_data_wrangling.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb index 3e23bf6461..14e34f4b68 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.ipynb +++ b/sdk/python/data-wrangling/interactive_data_wrangling.ipynb @@ -190,10 +190,10 @@ "from pyspark.ml.feature import Imputer\n", "\n", "blob_container_name = \"\"\n", - "blob_container_name = \"\"\n", + "storage_account_name = \"\"\n", "\n", "df = pd.read_csv(\n", - " f\"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/titanic.csv\",\n", + " f\"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/titanic.csv\",\n", " index_col=\"PassengerId\",\n", ")\n", "imputer = Imputer(inputCols=[\"Age\"], outputCol=\"Age\").setStrategy(\n", @@ -204,7 +204,7 @@ ") # Fill Cabin column with value \"None\" if missing\n", "df.dropna(inplace=True) # Drop the rows which still have any missing value\n", "df.to_csv(\n", - " f\"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/wrangled\",\n", + " f\"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/wrangled\",\n", " index_col=\"PassengerId\",\n", ")" ] diff --git a/sdk/python/data-wrangling/interactive_data_wrangling.py b/sdk/python/data-wrangling/interactive_data_wrangling.py index 05c07f54d7..6d30261ef8 100644 --- a/sdk/python/data-wrangling/interactive_data_wrangling.py +++ b/sdk/python/data-wrangling/interactive_data_wrangling.py @@ -63,10 +63,10 @@ from pyspark.ml.feature import Imputer blob_container_name = "" -blob_container_name = "" +storage_account_name = "" df = pd.read_csv( - f"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/titanic.csv", + f"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( @@ -77,7 +77,7 @@ ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( - f"wasbs://{blob_container_name}@{blob_container_name}.blob.core.windows.net/data/wrangled", + f"wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net/data/wrangled", index_col="PassengerId", )