-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-647] Fault-tolerant sideInputs via Broadcast variables #1624
[BEAM-647] Fault-tolerant sideInputs via Broadcast variables #1624
Conversation
Refer to this link for build results (access rights to CI server needed): |
@@ -324,7 +327,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { | |||
LOG.info("Evaluating {}", transform); | |||
AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(); | |||
ctxt.setCurrentTransform(appliedTransform); | |||
evaluator.evaluate(transform, ctxt); | |||
evaluator.evaluate(transform, ctxt, pview); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pass the SparkPCollectionView
when the PipelineVisitor
traverses ? this only happens once, on first run. I'm also pretty sure this never happens again (such as on resume from checkpoint)
// Driver only | ||
BroadcastHelper getPCollectionView(PCollectionView<?> view, JavaSparkContext context) { | ||
// initialize broadcastHelperMap if needed | ||
if (broadcastHelperMap == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use double-lock synchronization broadcastHelperMap
should be volatile
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, fixed
import org.apache.commons.lang3.tuple.Pair; | ||
import org.apache.spark.api.java.JavaSparkContext; | ||
|
||
public class SparkPCollectionView implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably host some kind of thread-safe, lazy, Singleton. this might also save the trouble of passing this through the TransformEvaluator
.
Fix comments by Amit + rebase from master + checkstyle
228c613
to
0ea9f1a
Compare
Refer to this link for build results (access rights to CI server needed): |
R. @amitsela |
Refer to this link for build results (access rights to CI server needed): |
@@ -129,10 +128,6 @@ public void putDataset(PValue pvalue, Dataset dataset) { | |||
datasets.put((PValue) getOutput(transform), new UnboundedDataset<>(values, jssc, coder)); | |||
} | |||
|
|||
void putPView(PValue view, Iterable<? extends WindowedValue<?>> value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not keep put/get
encapsulated here ? saves the ugly context.getPviews().putPViews()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be fixed
@@ -149,10 +144,6 @@ public Dataset borrowDataset(PValue pvalue) { | |||
return dataset; | |||
} | |||
|
|||
<T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, why not get the appropriate BroadcastHelper
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It remains the same as before the TranslationUtils.getSideInputs gets a new parameter "pviews" and is called inside the lambda, everything else is the same
import java.util.Collections; | ||
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be fixed
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 11169 lines...] at hudson.remoting.UserRequest.perform(UserRequest.java:153) at hudson.remoting.UserRequest.perform(UserRequest.java:50) at hudson.remoting.Request$2.run(Request.java:332) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:68) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoFailureException: You have 4 Checkstyle violations. at org.apache.maven.plugin.checkstyle.CheckstyleViolationCheckMojo.execute(CheckstyleViolationCheckMojo.java:588) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-01-02T13:10:32.256 [ERROR] 2017-01-02T13:10:32.256 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-01-02T13:10:32.256 [ERROR] 2017-01-02T13:10:32.257 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-01-02T13:10:32.257 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException2017-01-02T13:10:32.257 [ERROR] 2017-01-02T13:10:32.257 [ERROR] After correcting the problems, you can resume the build with the command2017-01-02T13:10:32.257 [ERROR] mvn -rf :beam-runners-sparkchannel stoppedSetting status of d0815ea to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6361/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 1beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-apex: 1--none-- |
// Holds the Actual data of the views in serialize form | ||
private Map<PCollectionView<?>, | ||
Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>>> pviews = | ||
Collections.synchronizedMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's a real need for a synchronized map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null; | ||
|
||
// Holds the Actual data of the views in serialize form | ||
private Map<PCollectionView<?>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you rely on SparkPCollectionView
to be serialized as part of the DStream (checkpointing), and this Map
to be persisted along, it should be noted that this is sub-optimal since all side inputs would be persisted for every DoFn they appear in.
Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>>>()); | ||
|
||
// Driver only - during evaluation stage | ||
void putPView(PCollectionView<?> view, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indent parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throughout this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
} else if (overwrite) { | ||
synchronized (SparkPCollectionView.class) { | ||
helper.unpersist(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can comment that unpersist
can be blocking.. if we'd want to consider this in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
return helper; | ||
} | ||
|
||
private BroadcastHelper getBroadcastHelper(PCollectionView<?> view, JavaSparkContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is more of createBroadcastHelper
* @param value - Actual value of the view | ||
* @param coder - Coder of the value | ||
*/ | ||
public void putPView(PCollectionView<?> view, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent first parameter as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First parameter should still be aligned with others.
Also, naming should match the getter - either both with capital V or not: getPViews/putPView
or getPviews/putPviews
.
@@ -192,22 +192,30 @@ public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) { | |||
* @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}. | |||
*/ | |||
public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> | |||
getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) { | |||
getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one seems unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used throughout TransformTranslator, prefer to keep it as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that this can be package-private, which is also nicer since it has an overriding implementation.
return new DirectBroadcastHelper<>(value); | ||
} | ||
return new CodedBroadcastHelper<>(value, coder); | ||
public static <T> BroadcastHelper<T> create(byte[] bytes, Coder<T> coder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can get rid of the abstraction.
I don't see a reason to use "direct" translation since Beam heavily relies on Coders anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Coder<Iterable<WindowedValue<?>>> coder, | ||
JavaSparkContext context) { | ||
|
||
pviews.put(view, new Tuple2(CoderHelpers.toByteArray(value, coder), coder)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked
- missing "diamonds" - new Tuple2<>(....)
this.bcast = jsc.broadcast(bytes); | ||
} | ||
|
||
public void unpersist(JavaSparkContext jsc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused method parameter.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
|
||
pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder)); | ||
// overwrite/create broadcast | ||
getPCollectionView(view, context, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought: should broadcast be eager ? maybe this line could be removed ?
Should putPView
simply set the view in pviews
while the actual broadcast will happen on demand ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some issues with this approach, the Write operation is a composite transform that passes side inputs between the transformation, can't remember exactly what was the problem. We can try it and see how it goes. There were many changes since the lazy version. We need to think also of updating the broadcast variable, I think it should work lazily but...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like I said, mostly a thought.. If it's not trivial, this could be considered for future work in a separate ticket if it would benefit.
public class SparkPCollectionView implements Serializable { | ||
|
||
// Holds the view --> broadcast mapping. Transient so it will be null from resume | ||
private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't it be simpler to instantiate this map eagerly ? avoid the double-checked locking ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do it eagerly, Let me try it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, we will need to handle it in the Deserialize. Do you think it will be easier to add resolve methods to the class? it may be dependant on the serialization framework (kryo, java, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, keep it lazy as it is.
@@ -31,84 +30,42 @@ | |||
/** | |||
* Broadcast helper. | |||
*/ | |||
public abstract class BroadcastHelper<T> implements Serializable { | |||
public class BroadcastHelper<T> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class seems to suffer some alignment issues, probably from removing the abstract and inner implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
@@ -61,6 +61,7 @@ public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, Broadca | |||
//--- match the appropriate sideInput window. | |||
// a tag will point to all matching sideInputs, that is all windows. | |||
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it. | |||
@SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which warning do you suppress here ?
I don't see anything in my IDE..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked cast: 'java.lang.Object' to 'java.lang.Iterable<org.apache.beam.sdk.util.WindowedValue<?>>' less... (⌘F1)
Signals places where an unchecked warning is issued by the compiler, for example:
void f(HashMap map) {
map.put("key", "value");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this for now.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly Nit picking, almost there.
@@ -201,6 +193,26 @@ public void computeOutputs() { | |||
return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction()); | |||
} | |||
|
|||
/** | |||
* Retruns the current views creates in the pipepline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: Return the current views crated by the pipeline.
Also a newline before @return
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
|
||
/** | ||
* Adds/Replaces a view to the current views creates in the pipepline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, newline between Adds/Replaces...
and @param
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* @param value - Actual value of the view | ||
* @param coder - Coder of the value | ||
*/ | ||
public void putPView(PCollectionView<?> view, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First parameter should still be aligned with others.
Also, naming should match the getter - either both with capital V or not: getPViews/putPView
or getPviews/putPviews
.
public class SparkPCollectionView implements Serializable { | ||
|
||
// Holds the view --> broadcast mapping. Transient so it will be null from resume | ||
private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, keep it lazy as it is.
PCollectionView<?> view, | ||
Iterable<WindowedValue<?>> value, | ||
Coder<Iterable<WindowedValue<?>>> coder, | ||
JavaSparkContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context is unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> | ||
getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) { | ||
getSideInputs(List<PCollectionView<?>> views, JavaSparkContext context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent parameters.
@@ -31,97 +30,53 @@ | |||
/** | |||
* Broadcast helper. | |||
*/ | |||
public abstract class BroadcastHelper<T> implements Serializable { | |||
public class BroadcastHelper<T> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should not be called BroadcastHelper
anymore... SideInputBroadcast
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this class was not changed, only the constructor is from bytes and not T. I think there is no reason to change it. it is not tightly coupled to side input
@@ -61,6 +61,7 @@ public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, Broadca | |||
//--- match the appropriate sideInput window. | |||
// a tag will point to all matching sideInputs, that is all windows. | |||
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it. | |||
@SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this for now.
.apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { | ||
@ProcessElement | ||
public void process(ProcessContext c) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@ProcessElement | ||
public void process(ProcessContext c) { | ||
|
||
// Check side input is passed correctly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"... also after resuming from checkpoint."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Refer to this link for build results (access rights to CI server needed): |
@kobisalant would you mind checking the proper checkboxes in the PR template ? thanks! |
LGTM. Thanks @kobisalant ! |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.