Skip to content

Commit

Permalink
[FLINK-23454][runtime] Notifying the subpartitions about the new rece…
Browse files Browse the repository at this point in the history
…ived buffer size.
  • Loading branch information
akalash authored and pnowojski committed Aug 11, 2021
1 parent 703662a commit ebbc860
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ void requestSubpartitionView(
Throwable getFailureCause();

InputChannelID getReceiverId();

void notifyNewBufferSize(int newBufferSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public InputChannelID getReceiverId() {
return receiverId;
}

@Override
public void notifyNewBufferSize(int newBufferSize) {
subpartitionView.notifyNewBufferSize(newBufferSize);
}

@VisibleForTesting
int getNumCreditsAvailable() {
return numCreditsAvailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ void acknowledgeAllRecordsProcessed(InputChannelID receiverId) {
obtainReader(receiverId).acknowledgeAllRecordsProcessed();
}

void notifyNewBufferSize(InputChannelID receiverId, int newBufferSize) {
if (fatalError) {
return;
}

obtainReader(receiverId).notifyNewBufferSize(newBufferSize);
}

NetworkSequenceViewReader obtainReader(InputChannelID receiverId) {
NetworkSequenceViewReader reader = allReaders.get(receiverId);
if (reader == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ else if (msgClazz == TaskEventRequest.class) {
} else if (msgClazz == NewBufferSize.class) {
NewBufferSize request = (NewBufferSize) msg;

outboundQueue.notifyNewBufferSize(request.receiverId, request.bufferSize);
} else {
LOG.warn("Received unexpected client request: {}", msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ public int getNumberOfQueuedBuffers() {
return parent.getNumberOfQueuedBuffers();
}

@Override
public void notifyNewBufferSize(int newBufferSize) {
parent.bufferSize(newBufferSize);
}

@Override
public void notifyDataAvailable() {
throw new UnsupportedOperationException("Method should never be called.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ public int getNumberOfQueuedBuffers() {
return parent.getNumberOfQueuedBuffers();
}

@Override
public void notifyNewBufferSize(int newBufferSize) {
parent.bufferSize(newBufferSize);
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
public int getNumberOfQueuedBuffers() {
return 0;
}

@Override
public void notifyNewBufferSize(int newBufferSize) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public int getNumberOfQueuedBuffers() {
return parent.getNumberOfQueuedBuffers();
}

@Override
public void notifyNewBufferSize(int newBufferSize) {
parent.bufferSize(newBufferSize);
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ default void notifyPriorityEvent(int priorityBufferNumber) {}

int getNumberOfQueuedBuffers();

void notifyNewBufferSize(int newBufferSize);

/**
* Availability of the {@link ResultSubpartitionView} and the backlog in the corresponding
* {@link ResultSubpartition}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,7 @@ public int getNumberOfQueuedBuffers() {
return buffersRead.size();
}
}

@Override
public void notifyNewBufferSize(int newBufferSize) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ public int getNumberOfQueuedBuffers() {
return 0;
}

@Override
public void notifyNewBufferSize(int newBufferSize) {}

@Override
public Throwable getFailureCause() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
Expand Down Expand Up @@ -528,6 +529,55 @@ private void testCancelPartitionRequest(boolean isAvailableView) throws Exceptio
channel.close();
}

@Test
public void testNotifyNewBufferSize() throws Exception {
// given: Result partition and the reader for subpartition 0.
ResultPartition parent = createResultPartition();

BufferAvailabilityListener bufferAvailabilityListener = new NoOpBufferAvailablityListener();
ResultSubpartitionView view = parent.createSubpartitionView(0, bufferAvailabilityListener);
ResultPartitionProvider partitionProvider =
(partitionId, index, availabilityListener) -> view;

InputChannelID receiverId = new InputChannelID();
PartitionRequestQueue queue = new PartitionRequestQueue();
CreditBasedSequenceNumberingViewReader reader =
new CreditBasedSequenceNumberingViewReader(receiverId, 2, queue);
EmbeddedChannel channel = new EmbeddedChannel(queue);

reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
queue.notifyReaderCreated(reader);

// when: New buffer size received.
queue.notifyNewBufferSize(receiverId, 65);

// and: New records emit.
parent.emitRecord(ByteBuffer.allocate(128), 0);
parent.emitRecord(ByteBuffer.allocate(10), 0);

reader.notifyDataAvailable();
channel.runPendingTasks();

// then: Buffers of received size will be in outbound channel.
Object data1 = channel.readOutbound();
assertEquals(65, ((NettyMessage.BufferResponse) data1).buffer.getSize());
Object data2 = channel.readOutbound();
assertEquals(65, ((NettyMessage.BufferResponse) data2).buffer.getSize());
}

private static ResultPartition createResultPartition() throws IOException {
NettyShuffleEnvironment network =
new NettyShuffleEnvironmentBuilder()
.setNumNetworkBuffers(10)
.setBufferSize(BUFFER_SIZE)
.build();
ResultPartition resultPartition =
createPartition(
network, NoOpFileChannelManager.INSTANCE, ResultPartitionType.PIPELINED, 2);
resultPartition.setup();
return resultPartition;
}

private static ResultPartition createFinishedPartitionWithFilledData(
ResultPartitionManager partitionManager) throws Exception {
NettyShuffleEnvironment environment =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -138,8 +139,30 @@ public void testAcknowledgeAllRecordsProcessed() throws IOException {
assertFalse(allRecordsProcessedFuture.isCompletedExceptionally());
}

@Test
public void testNewBufferSize() {
final InputChannelID inputChannelID = new InputChannelID();
final PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
final TestViewReader testViewReader =
new TestViewReader(inputChannelID, 2, partitionRequestQueue);
final PartitionRequestServerHandler serverHandler =
new PartitionRequestServerHandler(
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestQueue);
final EmbeddedChannel channel = new EmbeddedChannel(serverHandler);
partitionRequestQueue.notifyReaderCreated(testViewReader);

// Write the message of new buffer size to server
channel.writeInbound(new NettyMessage.NewBufferSize(666, inputChannelID));
channel.runPendingTasks();

assertEquals(666, testViewReader.bufferSize);
}

private static class TestViewReader extends CreditBasedSequenceNumberingViewReader {
private boolean consumptionResumed = false;
private int bufferSize;

TestViewReader(
InputChannelID receiverId, int initialCredit, PartitionRequestQueue requestQueue) {
Expand All @@ -150,5 +173,10 @@ private static class TestViewReader extends CreditBasedSequenceNumberingViewRead
public void resumeConsumption() {
consumptionResumed = true;
}

@Override
public void notifyNewBufferSize(int newBufferSize) {
bufferSize = newBufferSize;
}
}
}

0 comments on commit ebbc860

Please sign in to comment.