Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add search pipeline README #9825

Prev Previous commit
Next Next commit
initial feedback
Signed-off-by: Stephen Crawford <steecraw@amazon.com>
  • Loading branch information
stephen-crawford committed Sep 8, 2023
commit b27ed5a5d656694ca05f9ead7f8cab2bb78a4032
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@ You can create many search pipelines by combining search processors in various o
1. search request processors which transform a request _before_ it is executed;
2. search response processors which transform the output of a request _after_ it is executed.

Currently, there are two search request and two search response processors.
You can find all existing search processors registered in `SearchPipelineCommonModulePlugin.java` and described on the documentation website.

### Creating a search processor

New search processors can be created by following these steps:
New search processors can be created in two different ways.

Generally, a search processor can be created in your own `SearchPipelinePlugin`. This method is best for when you are creating a unique search
processor for your niche application. This method should also be used when your processor relies on an outside service. For an example of a
processor which was implemented in this manner, you can reference the [`personalized_search_ranking` processor](https://github.com/opensearch-project/search-processor/blob/7e56847fa9d9e6e8201eb92d91802ec54abedcaf/amazon-personalize-ranking/src/main/java/org/opensearch/search/relevance/AmazonPersonalizeRankingPlugin.java).

Alternatively, if you think your processor may be valuable to _all_ OpenSearch users you can follow these steps:

1. Create a new class in `org.opensearch.search.pipeline.common`, this class will hold your new processor and should include whether it is a request or response processor. For example, a response processor which deleted a target field could be called `DeleteFieldResponseProcessor`.
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved

2. Make the class extend the generic `AbstractProcessor` class as well as implement either the `SearchRequestProcessor` or `SearchResponseProcessor` class depending on what type of processor it is. In the `DeleteFieldResponseProcessor` example, this would look like:


public class DeleteFieldResponseProcessor extends AbstractProcessor implements SearchResponseProcessor
```public class DeleteFieldResponseProcessor extends AbstractProcessor implements SearchResponseProcessor```

3. Create the main functionality of your processor and implement the methods required by the implemented interface. This will be `SearchRequest processRequest(SearchRequest request) throws Exception;` for a search request processor or `SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception;` for a search response processor.

Expand All @@ -43,41 +48,84 @@ For the example field `DeleteFieldResponseProcessor`, this will look like:
```
@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
boolean foundField = false;

SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
boolean foundField = false;
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {

// Process each hit as desired
}
// Process each hit as desired

if (hit.hasSource()) {
// Change hit source if needed
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(field)) {
// Handle source as map
}
}

if (!foundField && !ignoreMissing) {
// Handle error scenarios
}

return response;
}
```

if (hit.hasSource()) {

// Change hit source if needed
);
4. Create a factory to parse processor-specific JSON configurations. These are used for constructing a processor instance.

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(field)) {

// Handle source as map
}
}
In the `DeleteFieldResponseProcessor`, this would look something like:

if (!foundField && !ignoreMissing) {

// Handle error scenarios
}
}

return response;
```
public static final class Factory implements Processor.Factory<SearchResponseProcessor> {

/**
* Constructor for factory
*/
Factory() {}

@Override
public DeleteFieldResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
return new DeleteFieldResponseProcessor(tag, description, ignoreFailure, field, ignoreMissing);
}
}
```

4. Create a factory to encapsulate the behavior of the class so that processors can be handled uniformly.
In this example, we provide specific configurations for which field should be deleted and whether the processor should ignore attempts to remove a non-existent field.

5. Add the newly added search processor to the `SearchPieplineCommonModulePlugin` getter for the corresponding processor type.

6. After creating a search processor, the operator can use it in their search pipeline.
For the `DeleteFieldResponseProcessor`, you would modify the response processor getter to have:

```
@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Map.of(
RenameFieldResponseProcessor.TYPE,
new RenameFieldResponseProcessor.Factory(),
DeleteFieldResponseProcessor.TYPE,
new DeleteFieldResponseProcessor.Factory()
);
}
```

6. After creating a search processor, the processor is ready to be tested in a search pipeline.

To test your new search processor, you can make use of the test [`SearchPipelineCommonYamlTestSuiteIT`](../../../../../../../yamlRestTest/java/org/opensearch/search/pipeline/common).

Following the format of the YAML files in [`rest-api-spec.test.search_pipeline`](../../../../../../../yamlRestTest/resources/rest-api-spec/test/search_pipeline), you should be able to create your own YAML test file to exercise your new processor.

stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
7. Finally, the processor is ready to used in a cluster.

To use the new processor, make sure the cluster is reloaded and that the new processor is accessible.
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -117,3 +165,19 @@ PUT /_search/pipeline/my_pipeline
]
}
```

Alternatively, if you want to use just the `DeleteFieldResponseProcessor` created before, you would use:

```
PUT /_search/pipeline/my_pipeline2

{
"response_processors": [
{
"delete_field": {
"field": "message"
}
}
]
}
```
stephen-crawford marked this conversation as resolved.
Show resolved Hide resolved
Loading