Skip to content

Commit

Permalink
opt: improve the load-balancer of Least-Connections
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed May 23, 2021
1 parent 0a773ba commit 8282bbf
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 57 deletions.
35 changes: 24 additions & 11 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"fmt"
"os"
"runtime"
"sync/atomic"
"time"
"unsafe"

Expand All @@ -47,15 +48,22 @@ type eventloop struct {

//nolint:structcheck
type internalEventloop struct {
ln *listener // listener
idx int // loop index in the server loops list
svr *server // server in loop
poller *netpoll.Poller // epoll or kqueue
packet []byte // read packet buffer whose capacity is 64KB
connCount int32 // number of active connections in event-loop
connections map[int]*conn // loop connections fd -> conn
eventHandler EventHandler // user eventHandler
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
ln *listener // listener
idx int // loop index in the server loops list
svr *server // server in loop
poller *netpoll.Poller // epoll or kqueue
packet []byte // read packet buffer whose capacity is 64KB
connCount int32 // number of active connections in event-loop
connections map[int]*conn // loop connections fd -> conn
eventHandler EventHandler // user eventHandler
}

func (el *eventloop) addConn(delta int32) {
atomic.AddInt32(&el.connCount, delta)
}

func (el *eventloop) loadConn() int32 {
return atomic.LoadInt32(&el.connCount)
}

func (el *eventloop) closeAllConns() {
Expand Down Expand Up @@ -112,7 +120,9 @@ func (el *eventloop) loopAccept(fd int) error {

func (el *eventloop) loopOpen(c *conn) error {
c.opened = true
el.calibrateCallback(el, 1)
el.addConn(1)

el.svr.lb.calibrate()

out, action := el.eventHandler.OnOpened(c)
if out != nil {
Expand Down Expand Up @@ -218,7 +228,10 @@ func (el *eventloop) loopCloseConn(c *conn, err error) (rerr error) {

if err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd); err0 == nil && err1 == nil {
delete(el.connections, c.fd)
el.calibrateCallback(el, -1)
el.addConn(-1)

el.svr.lb.calibrate()

if el.eventHandler.OnClosed(c, err) == Shutdown {
return gerrors.ErrServerShutdown
}
Expand Down
31 changes: 22 additions & 9 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package gnet

import (
"runtime"
"sync/atomic"
"time"
"unsafe"

Expand All @@ -41,13 +42,20 @@ type eventloop struct {

//nolint:structcheck
type internalEventloop struct {
ch chan interface{} // command channel
idx int // loop index
svr *server // server in loop
connCount int32 // number of active connections in event-loop
connections map[*stdConn]struct{} // track all the sockets bound to this loop
eventHandler EventHandler // user eventHandler
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
ch chan interface{} // command channel
idx int // loop index
svr *server // server in loop
connCount int32 // number of active connections in event-loop
connections map[*stdConn]struct{} // track all the sockets bound to this loop
eventHandler EventHandler // user eventHandler
}

func (el *eventloop) addConn(delta int32) {
atomic.AddInt32(&el.connCount, delta)
}

func (el *eventloop) loadConn() int32 {
return atomic.LoadInt32(&el.connCount)
}

func (el *eventloop) loopRun(lockOSThread bool) {
Expand Down Expand Up @@ -93,7 +101,9 @@ func (el *eventloop) loopRun(lockOSThread bool) {

func (el *eventloop) loopAccept(c *stdConn) error {
el.connections[c] = struct{}{}
el.calibrateCallback(el, 1)
el.addConn(1)

el.svr.lb.calibrate()

out, action := el.eventHandler.OnOpened(c)
if out != nil {
Expand Down Expand Up @@ -191,7 +201,10 @@ func (el *eventloop) loopError(c *stdConn, err error) (e error) {
}
}
delete(el.connections, c)
el.calibrateCallback(el, -1)
el.addConn(-1)

el.svr.lb.calibrate()

c.releaseTCP()
}()

Expand Down
3 changes: 1 addition & 2 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/panjf2000/gnet/errors"
Expand Down Expand Up @@ -77,7 +76,7 @@ type Server struct {
// CountConnections counts the number of currently active connections and returns it.
func (s Server) CountConnections() (count int) {
s.svr.lb.iterate(func(i int, el *eventloop) bool {
count += int(atomic.LoadInt32(&el.connCount))
count += int(el.loadConn())
return true
})
return
Expand Down
50 changes: 18 additions & 32 deletions load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"container/heap"
"hash/crc32"
"net"
"sync"
"sync/atomic"

"github.com/panjf2000/gnet/internal"
Expand Down Expand Up @@ -52,7 +51,7 @@ type (
next(net.Addr) *eventloop
iterate(func(int, *eventloop) bool)
len() int
calibrate(*eventloop, int32)
calibrate()
}

// roundRobinLoadBalancer with Round-Robin algorithm.
Expand All @@ -64,11 +63,11 @@ type (

// leastConnectionsLoadBalancer with Least-Connections algorithm.
leastConnectionsLoadBalancer struct {
sync.RWMutex
minHeap minEventLoopHeap
cachedRoot *eventloop
threshold int32
calibrateConnsThreshold int32
cachedRoot *eventloop
minHeap minEventLoopHeap
eventLoopsCopy []*eventloop
threshold int32
size int32
}

// sourceAddrHashLoadBalancer with Hash algorithm.
Expand Down Expand Up @@ -107,9 +106,7 @@ func (lb *roundRobinLoadBalancer) len() int {
return lb.size
}

func (lb *roundRobinLoadBalancer) calibrate(el *eventloop, delta int32) {
atomic.AddInt32(&el.connCount, delta)
}
func (lb *roundRobinLoadBalancer) calibrate() {}

// ================================= Implementation of Least-Connections load-balancer =================================

Expand All @@ -122,7 +119,7 @@ func (h minEventLoopHeap) Len() int {
}

func (h minEventLoopHeap) Less(i, j int) bool {
return atomic.LoadInt32(&h[i].connCount) < atomic.LoadInt32(&h[j].connCount)
return h[i].loadConn() < h[j].loadConn()
}

func (h minEventLoopHeap) Swap(i, j int) {
Expand All @@ -147,48 +144,39 @@ func (h *minEventLoopHeap) Pop() interface{} {
}

func (lb *leastConnectionsLoadBalancer) register(el *eventloop) {
lb.Lock()
heap.Push(&lb.minHeap, el)
if el.idx == 0 {
lb.cachedRoot = el
}
lb.calibrateConnsThreshold++
lb.Unlock()
lb.eventLoopsCopy = append(lb.eventLoopsCopy, el)
lb.size++
}

// next returns the eligible event-loop by taking the root node from minimum heap based on Least-Connections algorithm.
func (lb *leastConnectionsLoadBalancer) next(_ net.Addr) (el *eventloop) {
// In most cases, `next` method returns the cached event-loop immediately and it only reconstructs the minimum heap
// every `calibrateConnsThreshold` times for reducing locks to global mutex.
if atomic.LoadInt32(&lb.threshold) >= lb.calibrateConnsThreshold {
lb.Lock()
// In most cases, `next` method returns the cached event-loop immediately,
// it only reconstructs the minimum heap every `size` times to avoid introducing a global lock.
if atomic.LoadInt32(&lb.threshold) >= lb.size {
heap.Init(&lb.minHeap)
lb.cachedRoot = lb.minHeap[0]
atomic.StoreInt32(&lb.threshold, 0)
lb.Unlock()
}
return lb.cachedRoot
}

func (lb *leastConnectionsLoadBalancer) iterate(f func(int, *eventloop) bool) {
lb.RLock()
for i, el := range lb.minHeap {
for i, el := range lb.eventLoopsCopy {
if !f(i, el) {
break
}
}
lb.RUnlock()
}

func (lb *leastConnectionsLoadBalancer) len() (size int) {
lb.RLock()
size = lb.minHeap.Len()
lb.RUnlock()
return
func (lb *leastConnectionsLoadBalancer) len() int {
return int(lb.size)
}

func (lb *leastConnectionsLoadBalancer) calibrate(el *eventloop, delta int32) {
atomic.AddInt32(&el.connCount, delta)
func (lb *leastConnectionsLoadBalancer) calibrate() {
atomic.AddInt32(&lb.threshold, 1)
}

Expand Down Expand Up @@ -227,6 +215,4 @@ func (lb *sourceAddrHashLoadBalancer) len() int {
return lb.size
}

func (lb *sourceAddrHashLoadBalancer) calibrate(el *eventloop, delta int32) {
atomic.AddInt32(&el.connCount, delta)
}
func (lb *sourceAddrHashLoadBalancer) calibrate() {}
2 changes: 0 additions & 2 deletions server_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (svr *server) activateEventLoops(numEventLoop int) (err error) {
el.packet = make([]byte, svr.opts.ReadBufferCap)
el.connections = make(map[int]*conn)
el.eventHandler = svr.eventHandler
el.calibrateCallback = svr.lb.calibrate
_ = el.poller.AddRead(el.ln.fd)
svr.lb.register(el)

Expand Down Expand Up @@ -146,7 +145,6 @@ func (svr *server) activateReactors(numEventLoop int) error {
el.packet = make([]byte, svr.opts.ReadBufferCap)
el.connections = make(map[int]*conn)
el.eventHandler = svr.eventHandler
el.calibrateCallback = svr.lb.calibrate
svr.lb.register(el)

// Start the ticker.
Expand Down
1 change: 0 additions & 1 deletion server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (svr *server) startEventLoops(numEventLoop int) {
el.svr = svr
el.connections = make(map[*stdConn]struct{})
el.eventHandler = svr.eventHandler
el.calibrateCallback = svr.lb.calibrate
svr.lb.register(el)

// Start the ticker.
Expand Down

0 comments on commit 8282bbf

Please sign in to comment.