Skip to content

Commit

Permalink
[FLINK-22348][python] Fix the Python operators of Python DataStream A…
Browse files Browse the repository at this point in the history
…PI doesn't use managed memory in execute_and_collect
  • Loading branch information
HuangXingBo committed Apr 21, 2021
1 parent f511680 commit a0ea4ca
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 134 deletions.
2 changes: 2 additions & 0 deletions flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ def execute_and_collect(self, job_execution_name: str = None, limit: int = None)
:param job_execution_name: The name of the job execution.
:param limit: The limit for the collected elements.
"""
JPythonConfigUtil = get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil
JPythonConfigUtil.configPythonOperator(self._j_data_stream.getExecutionEnvironment())
if job_execution_name is None and limit is None:
return CloseableIterator(self._j_data_stream.executeAndCollect(), self.get_type())
elif job_execution_name is not None and limit is None:
Expand Down
10 changes: 5 additions & 5 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,11 @@ def test_execute_and_collect(self):
decimal.Decimal('2000000000000000000.061111111111111'
'11111111111111'))]
expected = test_data
ds = self.env.from_collection(test_data)
ds = self.env.from_collection(test_data).map(lambda a: a)
with ds.execute_and_collect() as results:
actual = []
for result in results:
actual.append(result)
actual = [result for result in results]
actual.sort()
expected.sort()
self.assertEqual(expected, actual)

def test_key_by_map(self):
Expand Down Expand Up @@ -942,7 +942,7 @@ def test_partition_custom(self):
expected_num_partitions = 5

def my_partitioner(key, num_partitions):
assert expected_num_partitions, num_partitions
assert expected_num_partitions == num_partitions
return key % num_partitions

partitioned_stream = ds.map(lambda x: x, output_type=Types.ROW([Types.STRING(),
Expand Down
Loading

0 comments on commit a0ea4ca

Please sign in to comment.