Skip to content

Commit

Permalink
[hotfix][python][tests] Improve the window tests by adding window sta…
Browse files Browse the repository at this point in the history
…rt and window end to make the test results more readable
  • Loading branch information
dianfu committed Apr 19, 2022
1 parent 45f11c7 commit da65027
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions flink-python/pyflink/datastream/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pyflink.datastream.window import (TumblingEventTimeWindows,
SlidingEventTimeWindows, EventTimeSessionWindows,
CountSlidingWindowAssigner, SessionWindowTimeGapExtractor,
CountWindow, PurgingTrigger, EventTimeTrigger)
CountWindow, PurgingTrigger, EventTimeTrigger, TimeWindow)
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase

Expand Down Expand Up @@ -55,12 +55,13 @@ def test_event_time_tumbling_window(self):
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_event_time_tumbling_window')
results = self.test_sink.get_results()
expected = ['(hi,4)', '(hi,3)', '(hi,1)']
expected = ['(hi,0,5,4)', '(hi,5,10,3)', '(hi,15,20,1)']
self.assert_equals_sorted(expected, results)

def test_count_tumbling_window(self):
Expand Down Expand Up @@ -89,12 +90,14 @@ def test_event_time_sliding_window(self):
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(SlidingEventTimeWindows.of(Time.milliseconds(5), Time.milliseconds(2))) \
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_event_time_sliding_window')
results = self.test_sink.get_results()
expected = ['(hi,2)', '(hi,4)', '(hi,4)', '(hi,3)', '(hi,2)', '(hi,2)', '(hi,1)', '(hi,1)']
expected = ['(hi,-2,3,2)', '(hi,0,5,4)', '(hi,2,7,4)', '(hi,4,9,3)', '(hi,6,11,2)',
'(hi,8,13,2)', '(hi,12,17,1)', '(hi,14,19,1)']
self.assert_equals_sorted(expected, results)

def test_count_sliding_window(self):
Expand All @@ -121,12 +124,13 @@ def test_event_time_session_window(self):
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_event_time_session_window')
results = self.test_sink.get_results()
expected = ['(hi,1)', '(hi,6)']
expected = ['(hi,1,14,6)', '(hi,15,20,1)']
self.assert_equals_sorted(expected, results)

def test_event_time_dynamic_gap_session_window(self):
Expand All @@ -140,12 +144,13 @@ def test_event_time_dynamic_gap_session_window(self):
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_event_time_dynamic_gap_session_window')
results = self.test_sink.get_results()
expected = ['(hi,3)', '(hi,4)']
expected = ['(hi,1,8,4)', '(hi,9,30,3)']
self.assert_equals_sorted(expected, results)

def test_window_reduce_passthrough(self):
Expand Down Expand Up @@ -338,12 +343,13 @@ def test_session_window_late_merge(self):
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_session_window_late_merge')
results = self.test_sink.get_results()
expected = ['(hi,3)']
expected = ['(hi,0,13,3)']
self.assert_equals_sorted(expected, results)

def test_event_time_session_window_with_purging_trigger(self):
Expand All @@ -357,12 +363,13 @@ def test_event_time_session_window_with_purging_trigger(self):
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(3))) \
.trigger(PurgingTrigger.of(EventTimeTrigger.create())) \
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_event_time_session_window_with_purging_trigger')
results = self.test_sink.get_results()
expected = ['(hi,1)', '(hi,2)', '(hi,4)']
expected = ['(hi,1,7,4)', '(hi,8,12,2)', '(hi,15,18,1)']
self.assert_equals_sorted(expected, results)


Expand All @@ -387,13 +394,13 @@ def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
return [(key, result)]


class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, CountWindow]):
class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):

def process(self,
key: str,
context: ProcessWindowFunction.Context[CountWindow],
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, len([e for e in elements]))]
return [(key, context.window().start, context.window().end, len([e for e in elements]))]

def clear(self, context: ProcessWindowFunction.Context) -> None:
pass

0 comments on commit da65027

Please sign in to comment.