Skip to content

Commit

Permalink
update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
windisch committed Dec 15, 2023
1 parent 5c119a3 commit 26be7a9
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 31 deletions.
101 changes: 70 additions & 31 deletions docs/source/decorators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Decorators are used for:
* Sorting tasks into `raw/interim/final` directory structure


Outputs
-------
Local targets
-------------
Here is a list of decorators for the outfile of a task:

Generally, tasks with those decorators can persist their output with
Expand All @@ -28,8 +28,7 @@ output with `self.input().read()`
* :py:func:`~luisy.decorators.pickle_output`: For tasks whose output
is a pickle. May be used to (de)serialize any python object.
* :py:func:`~luisy.decorators.parquetdir_output`: For tasks whose output
is a directory holding parquet files, like the result of an Apache
Spark computation.
is a directory holding parquet files, like the result of a Spark computation.
* :py:func:`~luisy.decorators.make_directory_output`: Factory to create
your own directory output decorator. This method wants you to pass a
function, which tells luisy how to handle the files in your directory.
Expand All @@ -39,56 +38,96 @@ output with `self.input().read()`
Parquet dir output
~~~~~~~~~~~~~~~~~~

In many cases, a pyspark job writes a folder holding multiple
parquet-files into a Blob-storage. Using the
:py:func:`~luisy.decorators.parquetdir_output` together with the
cloud-synchronisation, those files can be automatically downloaded:

When persisting results of a spark computation, the prefered output
format are parquet files. This may look as follows:

.. code-block:: python
import luisy
@luisy.raw
@luisy.parquetdir_output
class SomePySparkResult(luisy.ExternalTask):
@luisy.requires(SomeTask)
@luisy.parquet_output('some_dir/spark_result')
class SomePySparkResult(luisy.SparkTask):
def run(self):
df = self.read_input()
# do something
self.write(df)
If the resulting path needs to be parametrized, the method
:code:`get_folder_name` needs to be implemented:

partition = luigi.Parameter()
.. code-block:: python
def get_folder_name(self):
return f"pyspark_result/Partition=self.partition"
import luisy
@luisy.raw
@luisy.requires(SomeTask)
@luisy.parquet_output
class SomePySparkResult(luisy.SparkTask):
partition = luigi.Parameter()
This task points to the folder
:code:`[project_name]/raw/pyspark_result/` in the Blob storage holding
multiple parquet-files. The output can be used in a subsequent task as
follows:
def run(self):
df = self.read_input()
# do something
self.write(df)
def get_folder_name(self):
return f"some_dir/spark_result/Partition=self.partition"
In some cases, only the result of a spark pipeline triggered
externally need to be further processed. Here, these files are external inputs to
the pipeline and
:py:func:`~luisy.decorators.parquetdir_output` together with the
cloud-synchronisation can be used download these files automatically
and process them locally:

.. code-block:: python
@luisy.interim
@luisy.requires(SomePySparkResult)
class ProcessedResult(luisy.Task):
import luisy
@luisy.raw
@luisy.parquetdir_output('some_dir/some_pyspark_output')
class SomePySparkResult(luisy.ExternalTask):
pass
This task points to the folder
:code:`[project_name]/raw/some_dir/some_py/` in the Blob storage holding
multiple parquet-files.

partition = luigi.Parameter()

def run(self):
Cloud targets
-------------

df = self.input().read()
# do something
self.write(df)
The following targets can be used in combination with a
`luisy.task.SparkTask`:

Invoking
* :py:func:`~luisy.decorators.deltatable_output`: For spark tasks whose
output should be saved in a deltatable
* :py:func:`~luisy.decorators.azure_blob_storage_output`: For spark tasks whose
output should be saved in a azure blob storage.

.. code-block:: bash
See :ref:`databricks` for more details on how to use these targets.

luisy --module [project_name].[module] ProcessedResult --partition=my_partition --download
Cloud inputs
------------

To ease the development of cloud pipelines using
`luisy.task.SparkTask` that should access data already persisted in
cloud storages, we provide input decorators that render the usage of
:py:class:`~luisy.tasks.base.ExternalTask` unnecessary:

will first download the parquet-files locally and then run the
subsequent task.
* :py:func:`~luisy.decorators.deltatable_input`: For spark tasks that should acess
data saved in a deltatable
* :py:func:`~luisy.decorators.azure_blob_storage_input`: For spark tasks whose
output should be saved in a azure blob storage.

See :ref:`databricks` for more details on how to implement pipelines
using these inputs.

Directory structure
-------------------
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Learn more about luisy in our :doc:`Tutorials <./tutorials/getting_started>`.
Directory Output <tutorials/multi_file>
Up- and Downloading Files <tutorials/cloud>
Trigger reruns by changing code <tutorials/reruns>
Interact with Databricks <tutorials/databricks>


.. toctree::
Expand Down
123 changes: 123 additions & 0 deletions docs/source/tutorials/databricks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@

.. _databricks:

Interaction with Databricks
===========================

.. note::

This is an experimental feature. Expect sharp edges and bugs.


Overview
--------

The prefered way to interact with databricks objects like a pyspark
cluster or delta tables is by using it in a databricks notebook. Its
also possible to connect from a local session using
:py:mod:`databricks_connect` (see :ref:`databricks-connect`).


Example pipeline
----------------

.. note::
All task have to be implemented in a python
package, only execution can be done via a databricks notebook.

This is how a cloud pipeline may looks like:

.. code-block:: python
import luisy
@luisy.deltatable_input(schema='my_schema', catalog='my_catalog', table_name='raw')
@luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='interim')
class TaskA(SparkTask):
def run(self):
df = self.input().read()
df = df.drop('c')
self.write(df)
@luisy.requires(TaskA)
@luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='final')
class TaskB(SparkTask):
def run(self):
df = self.input().read()
df = df.withColumn('f', 2*df.a)
self.write(df)
@luisy.requires(TaskB)
@luisy.final
@luisy.pickle_output
class TaskC(SparkTask):
def run(self):
df = self.input().read()
self.write(df)
Here, :code:`TaskA` and :code:`TaskB` read and write their data from
and to delta tables and process them with spark. :code:`TaskC`,
however, persits its output into a pickle file stored in dbfs.

Running a pipeline
------------------

First, the working dir needs to be set. Here, we can use databricks
file system (:code:`dbfs`) allowing to run the pipeline completely in
the cloud. The :py:class:`pyspark.SparkContext()` is automatically
propagated to :py:mod:`luisy` from the active session:

.. code-block:: python
working_dir = "/dbfs/FileStore/my_working_dir"
Config().set_param("working_dir", working_dir)
A given pipeline can be executed as follows:

.. code-block:: python
build(SomeTask(), cloud_mode=True)
Here, all :py:class:`~luisy.tasks.base.SparkTask` objects use the
pyspark cluster of the databricks instance.

.. _databricks-connect:

Using databricks connect
------------------------

Using :py:mod:`databricks-connect`, cloud pipelines can be triggered
from python sessions outside of databricks. There, a local proxy for the remote spark
session from databricks is created in the local spark. First,
databricks connect needs to be installed.

.. code-block:: bash
pip install databricks-connect
Make sure that the version of databricks-connect is compatible with
the spark version in the databricks cluster.

To run the cloud pipelines locally, the following parameters need to
be set:

.. code-block:: python
spark = DatabricksSession.builder.remote(
host="https://adb-<...>.azuredatabricks.net",
token="<your secret token>",
cluster_id="<cluster id>,
).getOrCreate()
Config().set_param('spark', spark)
From there, everything works as in a databricks notebook.

.. note::

The unity catalog needs to be enabled in your databricks instance.
4 changes: 4 additions & 0 deletions luisy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
from luisy.decorators import hdf_output # noqa
from luisy.decorators import pickle_output # noqa
from luisy.decorators import parquetdir_output # noqa
from luisy.decorators import deltatable_output # noqa
from luisy.decorators import deltatable_input # noqa
from luisy.decorators import azure_blob_storage_output # noqa
from luisy.decorators import azure_blob_storage_input # noqa

from luigi import Parameter # noqa
from luigi import IntParameter # noqa
Expand Down

0 comments on commit 26be7a9

Please sign in to comment.