Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the option to disable deep copying of large MemoryDataSet objects #1258

Closed
jstammers opened this issue Feb 16, 2022 · 4 comments
Closed
Labels
Community Issue/PR opened by the open-source community Issue: Feature Request New feature or improvement to existing feature

Comments

@jstammers
Copy link
Contributor

Description

I have a suite of unit tests for a pipeline which test the functionality of each node in isolation. This includes a final test that runs the entire pipeline on a small set of input data. I've noticed that this final test runs much slower than the others, which I found to be the result of some deepcopy operations.

In my pipeline, I think this is due to a spacy model that gets loaded in one node, with additional nodes that add components to the model. Looking into the code for SequentialRunner, I can see that the default behaviour for datasets not in the catalog is to create MemoryDataSet objects using it's default parameters

def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
"""Factory method for creating the default data set for the runner.
Args:
ds_name: Name of the missing data set
Returns:
An instance of an implementation of AbstractDataSet to be used
for all unregistered data sets.
"""
return MemoryDataSet()

In the case of objects that that are pandas DataFrames or numpy arrays, this performs a deep copy of the object.
It would be useful to have an option to override this default in cases where run-time or memory usage would need to be considered.

Context

Copying MemoryDataSets is useful in cases where two nodes receive the same input and then output a mutated input e.g.

def add_gaussian_noise(x: pd.DataFrame) -> pd.DataFrame:
    x["noise"] = x["value"] + np.random.random(len(x))
    return x

def add_uniform_noise(x: pd.DataFrame) -> pd.DataFrame:
    x["noise"] = x["value"] = np.random.uniform(-1,1,len(x))
    return x


data = pd.DataFrame({"value":[0,1,2,3,4,5]})

catalog = DataCatalog({"input":data})

pipeline = Pipeline(
    [
        node(add_gaussian_noise, "input", "gaussian_noise"),
        node(add_uniform_noise, "input", "uniform_noise"),
    ]
)

runner = SequentialRunner()
runner.run(pipeline, catalog)

But in cases where a large python object is being passed between nodes, this can result in a large overhead in terms of runtime and memory

def load_model():
    return spacy.load("en_core_web_lg")

def add_custom_pipeline(nlp):
    nlp.add_pipe(...)
    return nlp

def process_docs(nlp, docs):
    return [nlp(doc) for doc in docs]

data = ["foo", "bar", "baz"]

catalog = DataCatalog({"input":data})

pipeline = Pipeline(
    [
        node(load_model, outputs="loaded_model"),
        node(add_custom_pipeline, inputs="loaded_model", outputs="nlp_model"),
        node(process_docs, inputs=["nlp_model", "input"], outputs="processed_docs"),
    ]
)

runner = SequentialRunner()
runner.run(pipeline, catalog) #performs a deep copy so that 'loaded_model' and 'nlp_model' exist in memory as separate objects

Possible Implementation

As a work-around for my use-case, I've implemented a TestRunner class that modifies the default behaviour of MemoryDataSet

class TestRunner(SequentialRunner):
    def create_default_data_set(self, ds_name: str):

        return MemoryDataSet(copy_mode="assign")

This has reduced the run-time of my slow test from 17s to 1s

@jstammers jstammers added the Issue: Feature Request New feature or improvement to existing feature label Feb 16, 2022
@datajoely
Copy link
Contributor

Hi @jstammers thank you for the detailed issue and solution.

My initial reaction when I saw your title was to make sure you were aware of the copy_mode functionality in the MemoryDataSet so it's great you're already there!

You are able to do this as well:

catalog = DataCatalog({"input": MemoryDataSet(copy_mode="assign", data=data)})

I'd be interested to see if other users in the community think this would be useful as a native runner. Or as an alternative we include an example like we do for the DryRunner in the docs.

@jstammers
Copy link
Contributor Author

Hi @datajoely, thanks for the quick response.

In my use-case, the object is one that is created as an output from one node and used as an input to a subsequent one, so I didn't bother to explicitly define it in my catalog.

If I include it in the catalog as you suggest

catalog = DataCatalog({ ... , "model": MemoryDataSet(copy_mode="assign", data=None),})

Then it does indeed avoid the deep copy, although this might be quite verbose for pipelines that have a lot of intermediate nodes with data that remains in memory.

@datajoely
Copy link
Contributor

Ah understood - you can also you use that MemoryDataSet(...) constructor within your return statement of your node as well in case that helps.

@merelcht merelcht added the Community Issue/PR opened by the open-source community label Mar 7, 2022
@merelcht
Copy link
Member

Hi @jstammers, this behaviour is currently possible if you use the copy_mode="assign" like @datajoely suggested, or by using a custom DataCatalog. So we won't add a further implementation for this behaviour. I hope that makes sense!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community Issue: Feature Request New feature or improvement to existing feature
Projects
None yet
Development

No branches or pull requests

3 participants