Skip to content

Commit

Permalink
Migrate thrift server stats to Fast Stats lib to avoid lock contention
Browse files Browse the repository at this point in the history
Summary: Airlift stats use synchronization instead of lock free or wait free semantics causing lock contention on high core count. This change moves distributions and EWMA counters to use lock free semantics to avoid locking under high load write scenarios.

Reviewed By: AGFeldman

Differential Revision: D62396291

fbshipit-source-id: 8f57e7f529ecd1af14accad8de3eb1b10a3065a9
  • Loading branch information
j-bahr authored and facebook-github-bot committed Sep 26, 2024
1 parent f3d34c9 commit 580889b
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 218 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public MultiWindowDistribution() {
Arrays.asList(P50, P75, P90, P95, P99, AVG, MIN, MAX, SUM));
}

public MultiWindowDistribution(List<Quantile> quantiles) {
this(Utils.getExecutorService(), Utils.getClock(), quantiles);
}

public MultiWindowDistribution(ScheduledExecutorService executorService, Clock clock) {
this(executorService, clock, Arrays.asList(P50, P75, P90, P95, P99, AVG, MIN, MAX, SUM));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.facebook.thrift.metrics.common.Clock;
import com.facebook.thrift.metrics.common.NanoClock;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,6 +44,11 @@ public static ScheduledExecutorService getExecutorService() {
return executorService;
}

@VisibleForTesting
public static void setExecutorService(ScheduledExecutorService executorService) {
Utils.executorService = executorService;
}

/** Allow clean shutdown for testing purposes */
public static void shutdownExecutorService() {
if (executorService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class ExpMovingAverageRate implements Rate {
private final AtomicLong lastTick;
private final Clock clock;

public ExpMovingAverageRate() {
this(NANO_CLOCK);
}

public ExpMovingAverageRate(Clock clock) {
this.m1 = EWMA.oneMinuteEWMA();
this.m10 = EWMA.tenMinuteEWMA();
Expand All @@ -47,11 +51,7 @@ public ExpMovingAverageRate(Clock clock) {
this.lastTick = new AtomicLong(now);
}

public ExpMovingAverageRate() {
this(NANO_CLOCK);
}

public void update(long count) {
public void add(long count) {
if (lastTick == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package com.facebook.thrift.metrics.rate;

public interface Rate {
default void update() {
update(1);
default void add() {
add(1);
}

void update(long count);
void add(long count);

long oneMinuteRate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class SlidingTimeWindowMovingAverages implements Rate {
public class SlidingTimeWindowMovingCounter implements Rate {
private static final long TIME_WINDOW_DURATION_MINUTES = 60L;
private static final long TICK_INTERVAL = TimeUnit.SECONDS.toNanos(1L);
private static final Duration TIME_WINDOW_DURATION =
Expand All @@ -41,11 +41,11 @@ public class SlidingTimeWindowMovingAverages implements Rate {
private final Instant bucketBaseTime;
Instant oldestBucketTime;

public SlidingTimeWindowMovingAverages() {
public SlidingTimeWindowMovingCounter() {
this(new NanoClock());
}

public SlidingTimeWindowMovingAverages(Clock clock) {
public SlidingTimeWindowMovingCounter(Clock clock) {
this.clock = clock;
long startTime = clock.tickNanos();
this.lastTick = new AtomicLong(startTime);
Expand All @@ -61,7 +61,7 @@ public SlidingTimeWindowMovingAverages(Clock clock) {
this.currentBucketIndex = 0;
}

public void update(long n) {
public void add(long n) {
tickIfNecessary();
(buckets.get(this.currentBucketIndex)).add(n);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.swift.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;

import com.facebook.swift.service.stats.ServerStats;
import com.facebook.thrift.metrics.distribution.Utils;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.*;

public class TestThriftServerStats {

private ServerStats thriftServerStats = new ServerStats();

@Mock ScheduledExecutorService executorService;

@Captor ArgumentCaptor<Runnable> runnableCaptor;

@Before
public void setup() {
initMocks(this);
Utils.setExecutorService(executorService);
}

// Ensure at least one interval sample is done on histogram so that data is collected
private void performIntervalSampleOnDistributions() {
verify(executorService, atLeastOnce())
.scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
runnableCaptor.getAllValues().forEach(Runnable::run);
}

@Test
public void testRequestReceived() throws Exception {
thriftServerStats.requestReceived(10L, "foo");
Map<String, Long> actual = thriftServerStats.getCounters();
Assert.assertEquals(1L, (long) actual.get("thrift.received_requests.count"));
Assert.assertEquals(1L, (long) actual.get("thrift.received_requests.count.60"));
Assert.assertEquals(1L, (long) actual.get("thrift.received_requests.count.3600"));

Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_calls.sum"));
Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_calls.sum.60"));
Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_calls.sum.3600"));
}

@Test
public void testReplySent() throws Exception {
thriftServerStats.replySent(20L, "foo");
Map<String, Long> actual = thriftServerStats.getCounters();
Assert.assertEquals(1L, (long) actual.get("thrift.sent_replies.count"));
Assert.assertEquals(1L, (long) actual.get("thrift.sent_replies.count.60"));
Assert.assertEquals(1L, (long) actual.get("thrift.sent_replies.count.3600"));
}

@Test
public void testProcessTime() throws Exception {
thriftServerStats.replySent(10L, "foo");

performIntervalSampleOnDistributions();

Map<String, Long> actual = thriftServerStats.getCounters();

Assert.assertEquals(10L, (long) actual.get("thrift.process_time.avg"));
Assert.assertEquals(10L, (long) actual.get("thrift.process_time.avg.60"));
Assert.assertEquals(10L, (long) actual.get("thrift.process_time.avg.3600"));

Assert.assertEquals(10L, (long) actual.get("thrift.process_time.p99"));
Assert.assertEquals(10L, (long) actual.get("thrift.process_time.p99.60"));
Assert.assertEquals(10L, (long) actual.get("thrift.process_time.p99.3600"));

Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_processed.sum"));
Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_processed.sum.60"));
Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_processed.sum.3600"));

Assert.assertEquals(10L, (long) actual.get("thrift.foo.time_process_us.avg"));
Assert.assertEquals(10L, (long) actual.get("thrift.foo.time_process_us.avg.60"));
Assert.assertEquals(10L, (long) actual.get("thrift.foo.time_process_us.avg.3600"));
}

@Test
public void testError() throws Exception {
thriftServerStats.error("foo");
Map<String, Long> actual = thriftServerStats.getCounters();
Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_exceptions.sum"));
Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_exceptions.sum.60"));
Assert.assertEquals(1L, (long) actual.get("thrift.foo.num_exceptions.sum.3600"));
}

@Test
public void testWriteTime() throws Exception {
thriftServerStats.publishWriteTime(10L);

performIntervalSampleOnDistributions();

Map<String, Long> actual = thriftServerStats.getCounters();

Assert.assertEquals(10L, (long) actual.get("thrift.write_time.p99"));
Assert.assertEquals(10L, (long) actual.get("thrift.write_time.p99.60"));
Assert.assertEquals(10L, (long) actual.get("thrift.write_time.p99.3600"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void setUp() {

private void advanceSeconds(long seconds) {
testClock.incrementSec(seconds);
rate.update(0);
rate.add(0);
}

private void advanceMinutes(long minutes) {
Expand All @@ -45,7 +45,7 @@ private void advanceMinutes(long minutes) {
public void testNormalQuantilesOverOneCycle() {
for (int i = 1; i <= 60; i++) {
advanceSeconds(1);
rate.update(1);
rate.add(1);
}

// One Minute, one per second
Expand All @@ -57,7 +57,7 @@ public void testNormalQuantilesOverOneCycle() {
public void testDataDecaysToZeroOver5Minutes() {
for (int i = 1; i <= 60; i++) {
advanceSeconds(1);
rate.update(1);
rate.add(1);
}

assertThat(rate.oneMinuteRate()).isEqualTo(60);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.facebook.thrift.metrics.rate.SlidingTimeWindowMovingAverages;
import com.facebook.thrift.metrics.rate.SlidingTimeWindowMovingCounter;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

public class SlidingTimeWindowMovingAverageTest {
private SlidingTimeWindowMovingAverages rate;
private SlidingTimeWindowMovingCounter rate;
private final TestClock testClock = new TestClock();

@Before
public void setUp() {
rate = new SlidingTimeWindowMovingAverages(testClock);
rate = new SlidingTimeWindowMovingCounter(testClock);
}

private void advanceSeconds(long seconds) {
testClock.incrementSec(seconds);
rate.update(0);
rate.add(0);
}

private void advanceMinutes(long minutes) {
Expand All @@ -45,7 +45,7 @@ private void advanceMinutes(long minutes) {
public void testNormalQuantilesOverOneCycle() {
for (int i = 1; i <= 60; i++) {
advanceSeconds(1);
rate.update(1);
rate.add(1);
}

// One Minute, one per second
Expand All @@ -57,7 +57,7 @@ public void testNormalQuantilesOverOneCycle() {
public void testOneMinuteDataDecaysToZeroOver1Minute() {
for (int i = 1; i <= 60; i++) {
advanceSeconds(1);
rate.update(1);
rate.add(1);
}

assertThat(rate.oneMinuteRate()).isEqualTo(60);
Expand All @@ -72,7 +72,7 @@ public void testOneMinuteDataDecaysToZeroOver1Minute() {
public void testTenMinuteDataDecaysToZeroOver10Minutes() {
for (int i = 1; i <= 600; i++) {
advanceSeconds(1);
rate.update(1);
rate.add(1);
}

assertThat(rate.tenMinuteRate()).isEqualTo(600);
Expand All @@ -87,7 +87,7 @@ public void testTenMinuteDataDecaysToZeroOver10Minutes() {
public void testOneHourDataDecaysToZeroOver10Minutes() {
for (int i = 1; i <= 60; i++) {
advanceMinutes(1);
rate.update(1);
rate.add(1);
}

assertThat(rate.oneHourRate()).isEqualTo(60);
Expand Down

0 comments on commit 580889b

Please sign in to comment.