Skip to content

Commit

Permalink
feature: enable users to customize the number of event-loops
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Mar 9, 2020
1 parent 1bf49b3 commit 7f6d9fa
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 49 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -334,7 +334,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("UDP Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -388,7 +388,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -489,7 +489,7 @@ pipeline:

func (hs *httpServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("HTTP server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

Expand Down Expand Up @@ -648,7 +648,7 @@ type pushServer struct {

func (ps *pushServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Push server is listening on %s (multi-cores: %t, loops: %d), "+
"pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumLoops, ps.tick.String())
"pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumEventLoop, ps.tick.String())
return
}
func (ps *pushServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -787,7 +787,7 @@ type codecServer struct {

func (cs *codecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Test codec server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

Expand Down
12 changes: 6 additions & 6 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -332,7 +332,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("UDP Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -386,7 +386,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -487,7 +487,7 @@ pipeline:

func (hs *httpServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("HTTP server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

Expand Down Expand Up @@ -646,7 +646,7 @@ type pushServer struct {

func (ps *pushServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Push server is listening on %s (multi-cores: %t, loops: %d), "+
"pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumLoops, ps.tick.String())
"pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumEventLoop, ps.tick.String())
return
}
func (ps *pushServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down Expand Up @@ -785,7 +785,7 @@ type codecServer struct {

func (cs *codecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Test codec server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

Expand Down
2 changes: 1 addition & 1 deletion examples/codec/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type codecServer struct {

func (cs *codecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Test codec server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

Expand Down
2 changes: 1 addition & 1 deletion examples/echo_tcp/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down
2 changes: 1 addition & 1 deletion examples/echo_udp/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("UDP Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down
2 changes: 1 addition & 1 deletion examples/echo_uds/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type echoServer struct {

func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}
func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down
2 changes: 1 addition & 1 deletion examples/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pipeline:

func (hs *httpServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("HTTP server is listening on %s (multi-cores: %t, loops: %d)\n",
srv.Addr.String(), srv.Multicore, srv.NumLoops)
srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
return
}

Expand Down
2 changes: 1 addition & 1 deletion examples/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type pushServer struct {

func (ps *pushServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
log.Printf("Push server is listening on %s (multi-cores: %t, loops: %d), "+
"pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumLoops, ps.tick.String())
"pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumEventLoop, ps.tick.String())
return
}
func (ps *pushServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
Expand Down
10 changes: 5 additions & 5 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ type Server struct {
// with the addr strings passed to the Serve function.
Addr net.Addr

// NumLoops is the number of loops that the server is using.
NumLoops int
// NumEventLoop is the number of event-loops that the server is using.
NumEventLoop int

// ReUsePort indicates whether SO_REUSEPORT is enable.
ReUsePort bool
// ReusePort indicates whether SO_REUSEPORT is enable.
ReusePort bool

// TCPKeepAlive (SO_KEEPALIVE) socket option.
TCPKeepAlive time.Duration
Expand Down Expand Up @@ -204,7 +204,7 @@ func Serve(eventHandler EventHandler, addr string, opts ...Option) error {
}
}()

options := initOptions(opts...)
options := loadOptions(opts...)

ln.network, ln.addr = parseAddr(addr)
if ln.network == "unix" {
Expand Down
3 changes: 2 additions & 1 deletion gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"math/rand"
"net"
"runtime"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -626,7 +627,7 @@ func (t *testWakeConnServer) Tick() (delay time.Duration, action Action) {

func testWakeConn(network, addr string) {
svr := &testWakeConnServer{network: network, addr: addr}
must(Serve(svr, network+"://"+addr, WithTicker(true)))
must(Serve(svr, network+"://"+addr, WithTicker(true), WithNumEventLoop(2*runtime.NumCPU())))
}

func TestShutdown(t *testing.T) {
Expand Down
15 changes: 13 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import "time"
// Option is a function that will set up option.
type Option func(opts *Options)

func initOptions(options ...Option) *Options {
func loadOptions(options ...Option) *Options {
opts := new(Options)
for _, option := range options {
option(opts)
Expand All @@ -25,6 +25,10 @@ type Options struct {
// assigned to the value of runtime.NumCPU().
Multicore bool

// NumEventLoop is set up to start the given number of event-loop goroutine.
// Note: Setting up NumEventLoop will override Multicore.
NumEventLoop int

// ReusePort indicates whether to set up the SO_REUSEPORT socket option.
ReusePort bool

Expand All @@ -45,13 +49,20 @@ func WithOptions(options Options) Option {
}
}

// WithMulticore sets up multi-cores with gnet.
// WithMulticore sets up multi-cores in gnet server.
func WithMulticore(multicore bool) Option {
return func(opts *Options) {
opts.Multicore = multicore
}
}

// WithNumEventLoop sets up NumEventLoop in gnet server.
func WithNumEventLoop(numEventLoop int) Option {
return func(opts *Options) {
opts.NumEventLoop = numEventLoop
}
}

// WithReusePort sets up SO_REUSEPORT socket option.
func WithReusePort(reusePort bool) Option {
return func(opts *Options) {
Expand Down
29 changes: 15 additions & 14 deletions server_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func (svr *server) startReactors() {
})
}

func (svr *server) activateLoops(numLoops int) error {
func (svr *server) activateLoops(numEventLoop int) error {
// Create loops locally and bind the listeners.
for i := 0; i < numLoops; i++ {
for i := 0; i < numEventLoop; i++ {
if p, err := netpoll.OpenPoller(); err == nil {
el := &eventloop{
idx: i,
Expand All @@ -100,8 +100,8 @@ func (svr *server) activateLoops(numLoops int) error {
return nil
}

func (svr *server) activateReactors(numLoops int) error {
for i := 0; i < numLoops; i++ {
func (svr *server) activateReactors(numEventLoop int) error {
for i := 0; i < numEventLoop; i++ {
if p, err := netpoll.OpenPoller(); err == nil {
el := &eventloop{
idx: i,
Expand Down Expand Up @@ -141,11 +141,11 @@ func (svr *server) activateReactors(numLoops int) error {
return nil
}

func (svr *server) start(numCPU int) error {
func (svr *server) start(numEventLoop int) error {
if svr.opts.ReusePort || svr.ln.pconn != nil {
return svr.activateLoops(numCPU)
return svr.activateLoops(numEventLoop)
}
return svr.activateReactors(numCPU)
return svr.activateReactors(numEventLoop)
}

func (svr *server) stop() {
Expand Down Expand Up @@ -186,11 +186,12 @@ func (svr *server) stop() {

func serve(eventHandler EventHandler, listener *listener, options *Options) error {
// Figure out the correct number of loops/goroutines to use.
var numCPU int
numEventLoop := 1
if options.Multicore {
numCPU = runtime.NumCPU()
} else {
numCPU = 1
numEventLoop = runtime.NumCPU()
}
if options.NumEventLoop > 0 {
numEventLoop = options.NumEventLoop
}

svr := new(server)
Expand All @@ -210,8 +211,8 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) erro
server := Server{
Multicore: options.Multicore,
Addr: listener.lnaddr,
NumLoops: numCPU,
ReUsePort: options.ReusePort,
NumEventLoop: numEventLoop,
ReusePort: options.ReusePort,
TCPKeepAlive: options.TCPKeepAlive,
}
switch svr.eventHandler.OnInitComplete(server) {
Expand All @@ -220,7 +221,7 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) erro
return nil
}

if err := svr.start(numCPU); err != nil {
if err := svr.start(numEventLoop); err != nil {
svr.closeLoops()
log.Printf("gnet server is stoping with error: %v\n", err)
return err
Expand Down
19 changes: 10 additions & 9 deletions server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (svr *server) startListener() {
}()
}

func (svr *server) startLoops(numLoops int) {
for i := 0; i < numLoops; i++ {
func (svr *server) startLoops(numEventLoop int) {
for i := 0; i < numEventLoop; i++ {
el := &eventloop{
ch: make(chan interface{}, commandBufferSize),
idx: i,
Expand Down Expand Up @@ -117,11 +117,12 @@ func (svr *server) stop() {

func serve(eventHandler EventHandler, listener *listener, options *Options) (err error) {
// Figure out the correct number of loops/goroutines to use.
var numCPU int
numEventLoop := 1
if options.Multicore {
numCPU = runtime.NumCPU()
} else {
numCPU = 1
numEventLoop = runtime.NumCPU()
}
if options.NumEventLoop > 0 {
numEventLoop = options.NumEventLoop
}

svr := new(server)
Expand All @@ -141,8 +142,8 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) (err
server := Server{
Multicore: options.Multicore,
Addr: listener.lnaddr,
NumLoops: numCPU,
ReUsePort: options.ReusePort,
NumEventLoop: numEventLoop,
ReusePort: options.ReusePort,
TCPKeepAlive: options.TCPKeepAlive,
}
switch svr.eventHandler.OnInitComplete(server) {
Expand All @@ -152,7 +153,7 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) (err
}

// Start all loops.
svr.startLoops(numCPU)
svr.startLoops(numEventLoop)
// Start listener.
svr.startListener()
defer svr.stop()
Expand Down

0 comments on commit 7f6d9fa

Please sign in to comment.