Skip to content

Commit

Permalink
[FLINK-19083] Remove deprecated DataStream#split
Browse files Browse the repository at this point in the history
This closes apache#13343
  • Loading branch information
dawidwys committed Sep 14, 2020
1 parent 9b0fb56 commit 1a08548
Show file tree
Hide file tree
Showing 40 changed files with 151 additions and 1,623 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
Expand All @@ -50,6 +50,9 @@ public class IterateExample {

private static final int BOUND = 100;

private static final OutputTag<Tuple5<Integer, Integer, Integer, Integer, Integer>> ITERATE_TAG =
new OutputTag<Tuple5<Integer, Integer, Integer, Integer, Integer>>("iterate") {};

// *************************************************************************
// PROGRAM
// *************************************************************************
Expand Down Expand Up @@ -84,19 +87,16 @@ public static void main(String[] args) throws Exception {
.iterate(5000L);

// apply the step function to get the next Fibonacci number
// increment the counter and split the output with the output selector
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
.split(new MySelector());
// increment the counter and split the output
SingleOutputStreamOperator<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.process(new Step());

// close the iteration by selecting the tuples that were directed to the
// 'iterate' channel in the output selector
it.closeWith(step.select("iterate"));
it.closeWith(step.getSideOutput(ITERATE_TAG));

// to produce the final output select the tuples directed to the
// 'output' channel then get the input pairs that have the greatest iteration counter
// to produce the final get the input pairs that have the greatest iteration counter
// on a 1 second sliding window
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
.map(new OutputMap());
DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.map(new OutputMap());

// emit results
if (params.has("output")) {
Expand Down Expand Up @@ -176,33 +176,27 @@ public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, I
/**
* Iteration step function that calculates the next Fibonacci number.
*/
public static class Step implements
MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
Integer, Integer>> {
public static class Step
extends ProcessFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
Integer> value) throws Exception {
return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
}
}

/**
* OutputSelector testing which tuple needs to be iterated again.
*/
public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
private static final long serialVersionUID = 1L;
public void processElement(
Tuple5<Integer, Integer, Integer, Integer, Integer> value,
Context ctx,
Collector<Tuple5<Integer, Integer, Integer, Integer, Integer>> out) throws Exception {
Tuple5<Integer, Integer, Integer, Integer, Integer> element = new Tuple5<>(
value.f0,
value.f1,
value.f3,
value.f2 + value.f3,
++value.f4);

@Override
public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
List<String> output = new ArrayList<>();
if (value.f2 < BOUND && value.f3 < BOUND) {
output.add("iterate");
ctx.output(ITERATE_TAG, element);
} else {
output.add("output");
out.collect(element);
}
return output;
}
}

Expand Down
4 changes: 4 additions & 0 deletions flink-streaming-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ under the License.
<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction)</exclude>
<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.functions.windowing.WindowFunction)</exclude>
<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.functions.windowing.WindowFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>

<!-- DataStream#split was removed in 1.12 -->
<exclude>org.apache.flink.streaming.api.datastream.DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
<exclude>org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
</excludes>
</parameter>
</configuration>
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 1a08548

Please sign in to comment.