Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion is not ignoring unknown feature in streaming source #99

Closed
pradithya opened this issue Jan 23, 2019 · 11 comments
Closed

Ingestion is not ignoring unknown feature in streaming source #99

pradithya opened this issue Jan 23, 2019 · 11 comments
Assignees

Comments

@pradithya
Copy link
Collaborator

Expected Behavior

Ingestion should ignore feature ID in FeatureRow that was not specified in Import spec

Current Behavior

Ingestion tried to ingest the unknown feature and throw following exception:

"transform":"Convert feature types","message":"Unknown feature myentity.none.unknown_feature, spec was not initialized","stackTrace":"java.lang.IllegalArgumentException: Unknown feature myentity.none.unknown_feature, spec was not initialized\n\tat com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)\n\tat feast.ingestion.model.Specs.getFeatureSpec(Specs.java:148)\n\tat feast.ingestion.transform.fn.ConvertTypesDoFn.processElementImpl(ConvertTypesDoFn.java:44)\n\tat feast.ingestion.transform.fn.BaseFeatureDoFn.baseProcessElement(BaseFeatureDoFn.java:41)\n\tat feast.ingestion.transform.fn.ConvertTypesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)\n\tat org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)\n\tat org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)\n\tat org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)\n\tat org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)\n\tat

Steps to reproduce

Run ingestion with streaming source (PubSub / Kafka) and publish FeatureRow with unknown feature inside the stream.

@tims
Copy link
Contributor

tims commented Jan 24, 2019

To clarify, you mean it discards the whole row into the errors sink right? or are you saying it crashes the job?

@tims
Copy link
Contributor

tims commented Jan 24, 2019

Copy from my PR comment.

It is not expected behaviour to ignore unknown features.

We need errors so that we know what's happening, we don't want to silently discard them.

So what you seem to want is that instead of throwing the whole row into the errors pile, you want to accept as many features as possible and just throw errors for these features.

The only place that needs to do that is the ConvertTypesDoFn. I suggest we should instead emit the row without these suspect features, while also emit an error row (to the errors tag) with just the suspect feature.

Also, if these features proceed past ConvertTypesDoFn, it's a bug, not something we should ignore. So we definitely shouldn't ignore them later in the pipeline. We should be treating everything is valid from there and throwing exceptions if it's not.

@zhilingc
Copy link
Collaborator

And from mine:

The issue is that they're not necessarily errors. We want to be able to selectively ingest features
from a given feature row in the streaming case - similar to how we do it in batch.

An example is if we had a feature creation pipeline creating rows of feature A,B and C, but we don't want to ingest B and C yet, so the ingestion job is run with only feature A declared in the schema. I don't think it's useful to log an error for every row as a result of this.

@tims
Copy link
Contributor

tims commented Jan 24, 2019

Do we need an option to turn off throwing erros for this then? Because I expect it will be very common to forget to add it to the import spec and not realise why data isn't showing up

@tims
Copy link
Contributor

tims commented Jan 24, 2019

I understand why you want the streaming and batch experiences to be the same though. At the moment the source decided what it selects.

So it would be a setting for the Pubsub or Kafka sources

@tims
Copy link
Contributor

tims commented Jan 24, 2019

So I'd suggest, rather than handling this at conversion we have a setting for ignoring unspecified fields or not in the sources. Nothing else should change, we should still throw errors in the convert DoFn and beyond

@pradithya
Copy link
Collaborator Author

pradithya commented Jan 24, 2019

Do you mean we add some thing like

  public static class KafkaReadOptions implements Options {
    public boolean ignoreUnspecifiedFeature = true;
    @NotEmpty
    public String server;
    @NotEmpty
    public String topics;
  }

And the behaviour is as follow:
true --> filter and ignore any unspecified feature in the FeatureRow
false --> discard the whole FeatureRow if there is unspecified feature in FeatureRow (current behaviour)

This will also be applicable to PubSub

@pradithya
Copy link
Collaborator Author

To clarify, you mean it discards the whole row into the errors sink right? or are you saying it crashes the job?

It currently discard the whole row into error sink

@tims
Copy link
Contributor

tims commented Jan 24, 2019

Yep I think this is good

Maybe discardUnknownFeatures reads better?

I agree that:

  • If it's true it throws away those features silently.
  • If it's false it should throw the whole row into the errors pile.

Alternatively if it's false, you could implement throwing errors for the unknown features but still accepting the others. But start with what's easy I guess.

I personally would prefer it to default to false, but happy to be out voted.

@tims
Copy link
Contributor

tims commented Jan 24, 2019

It is probably worth emitting a special metric for this using

FeastMetrics.inc("feature:{featureId}:discarded")

This however trusts that we wont receive random feature ids that pollute the metrics space too much.

@tims
Copy link
Contributor

tims commented Jan 24, 2019

Regarding the default value.

I think if you get new features that aren’t specced in the import they should be errors by default
It might seem weird that it’s different to batch I guess, but for batch you don’t actually have a feature row in the source, you're making them, so the sources don’t know what columns might be features. streaming does, so it’s not the same

Basically default should make it easy to not know why your data isn’t showing up

The issue is if you add a new feature and forget to add it to the import spec or forget to restart it. If we default to discarding it makes it hard to tell what you’ve forgotten to do.

So I suggest we default to not discarding features, and new unknown features cause errors instead. It's very easy to add a flag when you know you want it. It's harder for new people to know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants