Skip to content

Commit

Permalink
[hotfix][runtime] setThroughputMeter renamed to setThroughputCalculat…
Browse files Browse the repository at this point in the history
…or according to name of the set object.
  • Loading branch information
akalash authored and pnowojski committed Aug 11, 2021
1 parent f22b6dd commit e6996cd
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public TaskEventDispatcher getTaskEventDispatcher() {
}

@Override
public ThroughputCalculator getThroughputMeter() {
public ThroughputCalculator getThroughputCalculator() {
// The throughput calculator doesn't make sense for savepoint but the not null value is
// preferable when StreamTask is instantiated.
return new ThroughputCalculator(SystemClock.getInstance(), 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void acknowledgeCheckpoint(
*
* @return the throughput calculation service.
*/
ThroughputCalculator getThroughputMeter();
ThroughputCalculator getThroughputCalculator();

// --------------------------------------------------------------------------------------------
// Fields set in the StreamTask to provide access to mailbox and other runtime resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public ExecutorService getAsyncOperationsThreadPool() {
}

@Override
public ThroughputCalculator getThroughputMeter() {
public ThroughputCalculator getThroughputCalculator() {
return throughputCalculator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public TaskEventDispatcher getTaskEventDispatcher() {
}

@Override
public ThroughputCalculator getThroughputMeter() {
public ThroughputCalculator getThroughputCalculator() {
return new ThroughputCalculator(SystemClock.getInstance(), 10);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public TaskEventDispatcher getTaskEventDispatcher() {
}

@Override
public ThroughputCalculator getThroughputMeter() {
public ThroughputCalculator getThroughputCalculator() {
return throughputCalculator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public MockEnvironmentBuilder setExternalResourceInfoProvider(
return this;
}

public MockEnvironmentBuilder setThroughputMeter(ThroughputCalculator throughputCalculator) {
public MockEnvironmentBuilder setThroughputCalculator(
ThroughputCalculator throughputCalculator) {
this.throughputCalculator = throughputCalculator;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ protected StreamTask(
injectChannelStateWriterIntoChannels();

environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
this.throughputCalculator = environment.getThroughputMeter();
this.throughputCalculator = environment.getThroughputCalculator();
this.bufferDebloatPeriod = getTaskConfiguration().get(BUFFER_DEBLOAT_PERIOD).toMillis();

if (getTaskConfiguration().get(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public TaskEventDispatcher getTaskEventDispatcher() {
}

@Override
public ThroughputCalculator getThroughputMeter() {
public ThroughputCalculator getThroughputCalculator() {
return throughputCalculator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setCheckpointResponder(
return this;
}

public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setThroughputMeter(
public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setThroughputCalculator(
ThroughputCalculator throughputCalculator) {
this.throughputCalculator = throughputCalculator;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,7 @@ public void testProcessWithUnAvailableInput() throws Exception {
.build();
TaskIOMetricGroup ioMetricGroup =
task.getEnvironment().getMetricGroup().getIOMetricGroup();
ThroughputCalculator throughputCalculator = environment.getThroughputMeter();
ThroughputCalculator throughputCalculator = environment.getThroughputCalculator();

final MailboxExecutor executor = task.mailboxProcessor.getMainMailboxExecutor();
final RunnableWithException completeFutureTask =
Expand Down Expand Up @@ -1791,7 +1791,7 @@ public void testThroughputSchedulerStartsOnInvoke() throws Exception {
.addInput(STRING_TYPE_INFO)
.setupOutputForSingletonOperatorChain(
new TestBoundedOneInputStreamOperator())
.setThroughputMeter(
.setThroughputCalculator(
new ThroughputCalculator(SystemClock.getInstance(), 10) {
@Override
public long calculateThroughput() {
Expand Down Expand Up @@ -1864,7 +1864,7 @@ public void testBufferSizeRecalculationStartSuccessfully() throws Exception {
.addInput(STRING_TYPE_INFO, inputChannels)
.setupOutputForSingletonOperatorChain(
new TestBoundedOneInputStreamOperator())
.setThroughputMeter(
.setThroughputCalculator(
new ThroughputCalculator(SystemClock.getInstance(), 10) {
@Override
public long calculateThroughput() {
Expand Down

0 comments on commit e6996cd

Please sign in to comment.