Skip to content

Commit

Permalink
Performance fix for allreduce on large buffers (facebookincubator#192)
Browse files Browse the repository at this point in the history
Summary:
See facebookincubator#169 for the issue and the original post reporting this issue. The
problem at the root of this performance issue is an optimization
attempt to keep intermediate buffers bounded, regardless of the size
of the input/output buffers.

To pipeline communication with reduction, every chunk to send to a
neighboring process is split in a minimum of 2 segments. For large
buffers, the number of segments per chunk may be larger, depending on
the maximum segment size setting.

The implementation of the algorithm still assumed just 2 segments per
chunk and would run longer than needed. This resulted in garbage
values for the additional segments it would process in the reduce
scatter phase. This garbage didn't affect the correctness because it
would later be overwritten by the allgather phase. The only side
effect from this extra work was additional bytes on the wire, so it
would run slower. For large inputs (>= 10MB) for low number of
processes (e.g. 2), this meant up to twice the projected bandwidth.

This commit addresses the error and restores performance for large
input buffers to what we expect from the algorithm.

Additionally, it addresses a potential issue with segment size
imbalance. If the natural segment size is a single byte larger than
the maximum, the algorithm would run as if the input was padded to the
closest integer multiple of the maximum segment size. In terms of
performance, this can look like a performance cliff for certain input
sizes that result in a segment size close to the maximum segment size.

The benchmark tool is updated to include a run for the new style ring
allreduce algorithm. The benchmark results included below are the
result of running the new version on 2 machines with by 10Gb Ethernet.

In particular, look at the average bandwidth for runs against 1000000
elements and larger. Without the fix, there is a steep drop. With the
fix, it remains at the link bandwidth.

Without the fix:

```
Devices:
  - tcp, pci=0000:01:00.0, iface=enp1s0f0, speed=10000, addr=[100.97.16.108]
  - tcp, pci=0000:01:00.0, iface=enp1s0f0, speed=10000, addr=[100.97.16.108]
Algorithm:   new_allreduce_ring
Options:     processes=2, inputs=1, threads=2

   elements   min (us)   p50 (us)   p99 (us)   max (us)   avg (GB/s)    samples
     100000        672        945       1180       1825        0.785       3664
     200000       1190       1652       2086       2229        0.920       2290
     500000       2957       3424       4101       4624        1.072        872
    1000000       5916       6820       7642       7846        1.088        568
    2000000      17172      21496      22860      23047        0.691        180
    5000000      57343      64547      75051      75051        0.575         60
```

With the fix:

```
Devices:
  - tcp, pci=0000:01:00.0, iface=enp1s0f0, speed=10000, addr=[100.97.16.108]
  - tcp, pci=0000:01:00.0, iface=enp1s0f0, speed=10000, addr=[100.97.16.108]
Algorithm:   new_allreduce_ring
Options:     processes=2, inputs=1, threads=2

   elements   min (us)   p50 (us)   p99 (us)   max (us)   avg (GB/s)    samples
     100000        696        956       1214       2156        0.777       3492
     200000       1252       1667       2225       2906        0.910       2240
     500000       2606       3424       4096       4425        1.070       1026
    1000000       4367       6825       9270      11283        1.081        494
    2000000      10980      13661      14527      14674        1.086        292
    5000000      32987      34462      35088      35854        1.082        114
```

Closes facebookincubator#169.
Pull Request resolved: facebookincubator#192

Reviewed By: mrshenli

Differential Revision: D16517254

Pulled By: pietern

fbshipit-source-id: bce3f6af30b747e89cc96de9331441267be340e0
  • Loading branch information
pietern authored and facebook-github-bot committed Aug 1, 2019
1 parent 805346e commit 7d54ffc
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 17 deletions.
39 changes: 22 additions & 17 deletions gloo/allreduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,8 @@ void ring(
// rounding it up to the nearest multiple of the element size.
// For example, if maxSegmentSize = 10, and elementSize = 4,
// then after rounding up: segmentSize = 12;
const size_t maxSegmentSize =
opts.elementSize * (opts.maxSegmentSize / opts.elementSize);

// The number of bytes per segment must be a multiple of the bytes
// per element for the reduction to work; round up if necessary.
const size_t segmentBytes = roundUp(
std::min(
// Rounded division to have >= 2 segments per chunk.
(totalBytes + (context->size * 2 - 1)) / (context->size * 2),
// Configurable segment size limit
maxSegmentSize),
opts.elementSize);
const size_t maxSegmentBytes = opts.elementSize *
std::max((size_t)1, opts.maxSegmentSize / opts.elementSize);

// Compute how many segments make up the input buffer.
//
Expand All @@ -216,12 +206,14 @@ void ring(
//
const size_t numSegments = roundUp(
std::max(
(totalBytes + (segmentBytes - 1)) / segmentBytes,
(totalBytes + (maxSegmentBytes - 1)) / maxSegmentBytes,
(size_t)context->size * 2),
(size_t)context->size);
GLOO_ENFORCE_EQ(numSegments % context->size, 0);
GLOO_ENFORCE_GE(numSegments, context->size * 2);
const size_t numSegmentsPerRank = numSegments / context->size;
const size_t segmentBytes =
roundUp((totalBytes + numSegments - 1) / numSegments, opts.elementSize);

// Allocate scratch space to hold two chunks
std::unique_ptr<uint8_t[]> tmpAllocation(new uint8_t[segmentBytes * 2]);
Expand Down Expand Up @@ -271,7 +263,17 @@ void ring(
return result;
};

for (auto i = 0; i < numSegments; i++) {
// Ring reduce/scatter.
//
// Number of iterations is computed as follows:
// - Take `numSegments` for the total number of segments,
// - Subtract `numSegmentsPerRank` because the final segments hold
// the partial result and must not be forwarded in this phase.
// - Add 2 because we pipeline send and receive operations (we issue
// send/recv operations on iterations 0 and 1 and wait for them to
// complete on iterations 2 and 3).
//
for (auto i = 0; i < (numSegments - numSegmentsPerRank + 2); i++) {
if (i >= 2) {
// Compute send and receive offsets and lengths two iterations
// ago. Needed so we know when to wait for an operation and when
Expand Down Expand Up @@ -299,7 +301,7 @@ void ring(
// iterations. At that point we have already sent all data we
// needed to and only have to wait for the final segments to be
// reduced into the output.
if (i < (numSegments - 2)) {
if (i < (numSegments - numSegmentsPerRank)) {
// Compute send and receive offsets and lengths for this iteration.
auto cur = computeReduceScatterOffsets(i);
if (cur.recvLength > 0) {
Expand Down Expand Up @@ -351,7 +353,10 @@ void ring(
// incompatible with the generic allgather algorithm where the
// contribution is identical across processes.
//
for (auto i = 0; i < numSegments; i++) {
// See comment prior to reduce/scatter loop on how the number of
// iterations for this loop is computed.
//
for (auto i = 0; i < (numSegments - numSegmentsPerRank + 2); i++) {
if (i >= 2) {
auto prev = computeAllgatherOffsets(i - 2);
if (prev.recvLength > 0) {
Expand All @@ -368,7 +373,7 @@ void ring(
// iterations. At that point we have already sent all data we
// needed to and only have to wait for the final segments to be
// sent to the output.
if (i < (numSegments - 2)) {
if (i < (numSegments - numSegmentsPerRank)) {
auto cur = computeAllgatherOffsets(i);
if (cur.recvLength > 0) {
out[0]->recv(recvRank, slot, cur.recvOffset, cur.recvLength);
Expand Down
94 changes: 94 additions & 0 deletions gloo/benchmark/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>

#include "gloo/allgather_ring.h"
#include "gloo/allreduce.h"
#include "gloo/allreduce_halving_doubling.h"
#include "gloo/allreduce_bcube.h"
#include "gloo/allreduce_ring.h"
Expand Down Expand Up @@ -195,6 +196,74 @@ class ReduceScatterBenchmark : public Benchmark<T> {

} // namespace

// Namespace for the new style algorithm benchmarks.
namespace {

template <typename T>
class NewAllreduceBenchmark : public Benchmark<T> {
using allocation = std::vector<std::vector<T, aligned_allocator<T, kBufferAlignment>>>;

public:
NewAllreduceBenchmark(
std::shared_ptr<::gloo::Context>& context,
struct options& options)
: Benchmark<T>(context, options),
opts_(context) {}

allocation newAllocation(int inputs, size_t elements) {
allocation out;
out.reserve(inputs);
for (size_t i = 0; i < inputs; i++) {
out.emplace_back(elements);
}
return out;
}

void initialize(size_t elements) override {
inputAllocation_ = newAllocation(this->options_.inputs, elements);
outputAllocation_ = newAllocation(this->options_.inputs, elements);

// Stride between successive values in any input.
const auto stride = this->context_->size * this->options_.inputs;
for (size_t i = 0; i < this->options_.inputs; i++) {
// Different for every input at every node. This means all
// values across all inputs and all nodes are different and we
// can accurately detect correctness errors.
const auto value = (this->context_->rank * this->options_.inputs) + i;
for (size_t j = 0; j < elements; j++) {
inputAllocation_[i][j] = (j * stride) + value;
}
}

// Generate vectors with pointers to populate the options struct.
std::vector<T*> inputPointers;
std::vector<T*> outputPointers;
for (size_t i = 0; i < this->options_.inputs; i++) {
inputPointers.push_back(inputAllocation_[i].data());
outputPointers.push_back(outputAllocation_[i].data());
}

// Configure AllreduceOptions struct
opts_.setInputs(inputPointers, elements);
opts_.setOutputs(outputPointers, elements);
opts_.setAlgorithm(AllreduceOptions::Algorithm::RING);
void (*fn)(void*, const void*, const void*, long unsigned int) = &sum<T>;
opts_.setReduceFunction(fn);
}

void run() override {
allreduce(opts_);
}

private:
AllreduceOptions opts_;

allocation inputAllocation_;
allocation outputAllocation_;
};

}

#define RUN_BENCHMARK(T) \
Runner::BenchmarkFn<T> fn; \
if (x.benchmark == "allgather_ring") { \
Expand Down Expand Up @@ -248,8 +317,33 @@ class ReduceScatterBenchmark : public Benchmark<T> {
Runner r(x); \
r.run(fn);

template <typename T>
void runNewBenchmark(options& options) {
Runner::BenchmarkFn<T> fn;

const auto name = options.benchmark.substr(4);
if (name == "allreduce_ring") {
fn = [&](std::shared_ptr<Context>& context) {
return gloo::make_unique<NewAllreduceBenchmark<T>>(context, options);
};
} else {
GLOO_ENFORCE(false, "Invalid benchmark name: ", options.benchmark);
}

Runner runner(options);
runner.run(fn);
}

int main(int argc, char** argv) {
auto x = benchmark::parseOptions(argc, argv);

// Run new style benchmarks if the benchmark name starts with "new_".
// Eventually we'd like to deprecate all the old style ones...
if (x.benchmark.substr(0, 4) == "new_") {
runNewBenchmark<float>(x);
return 0;
}

if (x.benchmark == "pairwise_exchange") {
RUN_BENCHMARK(char);
} else if (x.halfPrecision) {
Expand Down

0 comments on commit 7d54ffc

Please sign in to comment.