Skip to content

Commit

Permalink
Disallow sliding windows with combiner fanout to prevent data loss (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Dec 10, 2022
1 parent 61e2ec1 commit 359a641
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
21 changes: 21 additions & 0 deletions sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,27 @@ def has_expected_values(actual):

assert_that(result, has_expected_values)

def test_combining_with_sliding_windows_and_fanout_raises_error(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with self.assertRaises(ValueError):
with TestPipeline(options=options) as p:
_ = (
p
| beam.Create([
window.TimestampedValue(0, Timestamp(seconds=1666707510)),
window.TimestampedValue(1, Timestamp(seconds=1666707511)),
window.TimestampedValue(2, Timestamp(seconds=1666707512)),
window.TimestampedValue(3, Timestamp(seconds=1666707513)),
window.TimestampedValue(5, Timestamp(seconds=1666707515)),
window.TimestampedValue(6, Timestamp(seconds=1666707516)),
window.TimestampedValue(7, Timestamp(seconds=1666707517)),
window.TimestampedValue(8, Timestamp(seconds=1666707518))
])
| beam.WindowInto(window.SlidingWindows(10, 5))
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).
without_defaults().with_fanout(7))

def test_MeanCombineFn_combine(self):
with TestPipeline() as p:
input = (
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from apache_beam.transforms.userstate import StateSpec
from apache_beam.transforms.userstate import TimerSpec
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowedValue
Expand Down Expand Up @@ -2746,6 +2747,11 @@ def expand(self, pcoll):
combine_fn = self._combine_fn
fanout_fn = self._fanout_fn

if isinstance(pcoll.windowing.windowfn, SlidingWindows):
raise ValueError(
'CombinePerKey.with_hot_key_fanout does not yet work properly with '
'SlidingWindows. See: https://github.com/apache/beam/issues/20528')

class SplitHotCold(DoFn):
def start_bundle(self):
# Spreading a hot key across all possible sub-keys for all bundles
Expand Down

0 comments on commit 359a641

Please sign in to comment.