Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-647] Fault-tolerant sideInputs via Broadcast variables #1624

Closed

Conversation

kobisalant
Copy link
Contributor

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@asfbot
Copy link

asfbot commented Dec 15, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5952/
--none--

@@ -324,7 +327,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
LOG.info("Evaluating {}", transform);
AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
ctxt.setCurrentTransform(appliedTransform);
evaluator.evaluate(transform, ctxt);
evaluator.evaluate(transform, ctxt, pview);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass the SparkPCollectionView when the PipelineVisitor traverses ? this only happens once, on first run. I'm also pretty sure this never happens again (such as on resume from checkpoint)

// Driver only
BroadcastHelper getPCollectionView(PCollectionView<?> view, JavaSparkContext context) {
// initialize broadcastHelperMap if needed
if (broadcastHelperMap == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use double-lock synchronization broadcastHelperMap should be volatile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, fixed

import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.api.java.JavaSparkContext;

public class SparkPCollectionView implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably host some kind of thread-safe, lazy, Singleton. this might also save the trouble of passing this through the TransformEvaluator.

Fix comments by Amit + rebase from master + checkstyle
@kobisalant kobisalant force-pushed the BEAM-647-Fault-tolerant-sideInputs branch from 228c613 to 0ea9f1a Compare December 18, 2016 15:39
@asfbot
Copy link

asfbot commented Dec 18, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6061/
--none--

@kobisalant
Copy link
Contributor Author

R. @amitsela

@asfbot
Copy link

asfbot commented Dec 19, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6068/
--none--

@@ -129,10 +128,6 @@ public void putDataset(PValue pvalue, Dataset dataset) {
datasets.put((PValue) getOutput(transform), new UnboundedDataset<>(values, jssc, coder));
}

void putPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep put/get encapsulated here ? saves the ugly context.getPviews().putPViews()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be fixed

@@ -149,10 +144,6 @@ public Dataset borrowDataset(PValue pvalue) {
return dataset;
}

<T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, why not get the appropriate BroadcastHelper ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It remains the same as before the TranslationUtils.getSideInputs gets a new parameter "pviews" and is called inside the lambda, everything else is the same

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be fixed

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6361/

Build result: FAILURE

[...truncated 11169 lines...] at hudson.remoting.UserRequest.perform(UserRequest.java:153) at hudson.remoting.UserRequest.perform(UserRequest.java:50) at hudson.remoting.Request$2.run(Request.java:332) at hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:68) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoFailureException: You have 4 Checkstyle violations. at org.apache.maven.plugin.checkstyle.CheckstyleViolationCheckMojo.execute(CheckstyleViolationCheckMojo.java:588) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 more2017-01-02T13:10:32.256 [ERROR] 2017-01-02T13:10:32.256 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-01-02T13:10:32.256 [ERROR] 2017-01-02T13:10:32.257 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-01-02T13:10:32.257 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException2017-01-02T13:10:32.257 [ERROR] 2017-01-02T13:10:32.257 [ERROR] After correcting the problems, you can resume the build with the command2017-01-02T13:10:32.257 [ERROR] mvn -rf :beam-runners-sparkchannel stoppedSetting status of d0815ea to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6361/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install
--none--

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6364/
--none--

@asfbot
Copy link

asfbot commented Jan 2, 2017

// Holds the Actual data of the views in serialize form
private Map<PCollectionView<?>,
Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>>> pviews =
Collections.synchronizedMap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a real need for a synchronized map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null;

// Holds the Actual data of the views in serialize form
private Map<PCollectionView<?>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you rely on SparkPCollectionView to be serialized as part of the DStream (checkpointing), and this Map to be persisted along, it should be noted that this is sub-optimal since all side inputs would be persisted for every DoFn they appear in.

Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>>>());

// Driver only - during evaluation stage
void putPView(PCollectionView<?> view,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent parameters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throughout this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
} else if (overwrite) {
synchronized (SparkPCollectionView.class) {
helper.unpersist(context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can comment that unpersist can be blocking.. if we'd want to consider this in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return helper;
}

private BroadcastHelper getBroadcastHelper(PCollectionView<?> view, JavaSparkContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more of createBroadcastHelper

* @param value - Actual value of the view
* @param coder - Coder of the value
*/
public void putPView(PCollectionView<?> view,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent first parameter as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First parameter should still be aligned with others.
Also, naming should match the getter - either both with capital V or not: getPViews/putPView or getPviews/putPviews.

@@ -192,22 +192,30 @@ public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) {
* @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
*/
public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used throughout TransformTranslator, prefer to keep it as is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that this can be package-private, which is also nicer since it has an overriding implementation.

return new DirectBroadcastHelper<>(value);
}
return new CodedBroadcastHelper<>(value, coder);
public static <T> BroadcastHelper<T> create(byte[] bytes, Coder<T> coder) {
Copy link
Member

@amitsela amitsela Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can get rid of the abstraction.
I don't see a reason to use "direct" translation since Beam heavily relies on Coders anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Coder<Iterable<WindowedValue<?>>> coder,
JavaSparkContext context) {

pviews.put(view, new Tuple2(CoderHelpers.toByteArray(value, coder), coder));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchecked - missing "diamonds" - new Tuple2<>(....)

this.bcast = jsc.broadcast(bytes);
}

public void unpersist(JavaSparkContext jsc) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused method parameter.

@asfbot
Copy link

asfbot commented Jan 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6374/
--none--

@asfbot
Copy link

asfbot commented Jan 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6375/
--none--

@asfbot
Copy link

asfbot commented Jan 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6394/
--none--


pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder));
// overwrite/create broadcast
getPCollectionView(view, context, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought: should broadcast be eager ? maybe this line could be removed ?
Should putPView simply set the view in pviews while the actual broadcast will happen on demand ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some issues with this approach, the Write operation is a composite transform that passes side inputs between the transformation, can't remember exactly what was the problem. We can try it and see how it goes. There were many changes since the lazy version. We need to think also of updating the broadcast variable, I think it should work lazily but...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I said, mostly a thought.. If it's not trivial, this could be considered for future work in a separate ticket if it would benefit.

public class SparkPCollectionView implements Serializable {

// Holds the view --> broadcast mapping. Transient so it will be null from resume
private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't it be simpler to instantiate this map eagerly ? avoid the double-checked locking ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do it eagerly, Let me try it out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, we will need to handle it in the Deserialize. Do you think it will be easier to add resolve methods to the class? it may be dependant on the serialization framework (kryo, java, etc)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, keep it lazy as it is.

@@ -31,84 +30,42 @@
/**
* Broadcast helper.
*/
public abstract class BroadcastHelper<T> implements Serializable {
public class BroadcastHelper<T> implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class seems to suffer some alignment issues, probably from removing the abstract and inner implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right

@@ -61,6 +61,7 @@ public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, Broadca
//--- match the appropriate sideInput window.
// a tag will point to all matching sideInputs, that is all windows.
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
@SuppressWarnings("unchecked")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which warning do you suppress here ?
I don't see anything in my IDE..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchecked cast: 'java.lang.Object' to 'java.lang.Iterable<org.apache.beam.sdk.util.WindowedValue<?>>' less... (⌘F1)
Signals places where an unchecked warning is issued by the compiler, for example:

void f(HashMap map) {
map.put("key", "value");
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this for now.

@asfbot
Copy link

asfbot commented Jan 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6395/
--none--

@asfbot
Copy link

asfbot commented Jan 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6425/
--none--

Copy link
Member

@amitsela amitsela left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly Nit picking, almost there.

@@ -201,6 +193,26 @@ public void computeOutputs() {
return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
}

/**
* Retruns the current views creates in the pipepline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Return the current views crated by the pipeline.
Also a newline before @return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

/**
* Adds/Replaces a view to the current views creates in the pipepline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, newline between Adds/Replaces... and @param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param value - Actual value of the view
* @param coder - Coder of the value
*/
public void putPView(PCollectionView<?> view,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First parameter should still be aligned with others.
Also, naming should match the getter - either both with capital V or not: getPViews/putPView or getPviews/putPviews.

public class SparkPCollectionView implements Serializable {

// Holds the view --> broadcast mapping. Transient so it will be null from resume
private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, keep it lazy as it is.

PCollectionView<?> view,
Iterable<WindowedValue<?>> value,
Coder<Iterable<WindowedValue<?>>> coder,
JavaSparkContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context is unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
getSideInputs(List<PCollectionView<?>> views, JavaSparkContext context,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent parameters.

@@ -31,97 +30,53 @@
/**
* Broadcast helper.
*/
public abstract class BroadcastHelper<T> implements Serializable {
public class BroadcastHelper<T> implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should not be called BroadcastHelper anymore... SideInputBroadcast ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this class was not changed, only the constructor is from bytes and not T. I think there is no reason to change it. it is not tightly coupled to side input

@@ -61,6 +61,7 @@ public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, Broadca
//--- match the appropriate sideInput window.
// a tag will point to all matching sideInputs, that is all windows.
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
@SuppressWarnings("unchecked")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this for now.

.apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void process(ProcessContext c) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@ProcessElement
public void process(ProcessContext c) {

// Check side input is passed correctly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"... also after resuming from checkpoint."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6481/
--none--

@amitsela
Copy link
Member

@kobisalant would you mind checking the proper checkboxes in the PR template ? thanks!

@amitsela
Copy link
Member

LGTM. Thanks @kobisalant !

@asfgit asfgit closed this in c1b7f86 Jan 10, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants