Skip to content

Commit

Permalink
Set up a connections pool
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Oct 25, 2019
1 parent 10da254 commit 4cc2f96
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
19 changes: 11 additions & 8 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"net"
"os"

"github.com/panjf2000/gnet/ringbuffer"
"golang.org/x/sys/unix"
)

Expand All @@ -26,13 +25,17 @@ func (svr *server) acceptNewConnection(fd int) error {
return err
}
lp := svr.subLoopGroup.next()
c := &conn{
fd: nfd,
sa: sa,
loop: lp,
inboundBuffer: ringbuffer.New(socketRingBufferSize),
outboundBuffer: ringbuffer.New(socketRingBufferSize),
}
c := svr.connPool.Get().(*conn)
c.fd = nfd
c.loop = lp
c.sa = sa
//c := &conn{
// fd: nfd,
// sa: sa,
// loop: lp,
// inboundBuffer: ringbuffer.New(socketRingBufferSize),
// outboundBuffer: ringbuffer.New(socketRingBufferSize),
//}
_ = lp.loopOpen(c)
_ = lp.poller.Trigger(func() (err error) {
if err = lp.poller.AddRead(nfd); err == nil {
Expand Down
8 changes: 8 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func (c *conn) write(buf []byte) {
}
}

func (c *conn) reset() {
c.opened = false
c.ctx = nil
c.cache = nil
c.inboundBuffer.Reset()
c.outboundBuffer.Reset()
}

func (c *conn) SendTo(buf []byte, sa unix.Sockaddr) {
_ = unix.Sendto(c.fd, buf, 0, sa)
}
Expand Down
22 changes: 14 additions & 8 deletions eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ func (lp *loop) loopAccept(fd int) error {
if err := unix.SetNonblock(nfd, true); err != nil {
return err
}
c := &conn{fd: nfd,
sa: sa,
inboundBuffer: ringbuffer.New(socketRingBufferSize),
outboundBuffer: ringbuffer.New(socketRingBufferSize),
loop: lp,
}
c := lp.svr.connPool.Get().(*conn)
c.fd = nfd
c.loop = lp
c.sa = sa
//c := &conn{fd: nfd,
// sa: sa,
// inboundBuffer: ringbuffer.New(socketRingBufferSize),
// outboundBuffer: ringbuffer.New(socketRingBufferSize),
// loop: lp,
//}
if err = lp.poller.AddReadWrite(c.fd); err == nil {
lp.connections[c.fd] = c
} else {
Expand Down Expand Up @@ -140,8 +144,10 @@ func (lp *loop) loopCloseConn(c *conn, err error) error {
delete(lp.connections, c.fd)
_ = unix.Close(c.fd)
}

switch lp.svr.eventHandler.OnClosed(c, err) {
action := lp.svr.eventHandler.OnClosed(c, err)
c.reset()
lp.svr.connPool.Put(c)
switch action {
case None:
case Shutdown:
return errShutdown
Expand Down
9 changes: 9 additions & 0 deletions gnet_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/panjf2000/gnet/netpoll"
"github.com/panjf2000/gnet/ringbuffer"
)

type server struct {
Expand All @@ -23,6 +24,7 @@ type server struct {
opts *Options // options with server
once sync.Once // make sure only signalShutdown once
cond *sync.Cond // shutdown signaler
connPool sync.Pool // pool for caching connections
mainLoop *loop // main loop for accepting connections
eventHandler EventHandler // user eventHandler
subLoopGroup IEventLoopGroup // loops for handling events
Expand Down Expand Up @@ -203,6 +205,13 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) erro
return nil
}

svr.connPool.New = func() interface{} {
return &conn{
inboundBuffer: ringbuffer.New(socketRingBufferSize),
outboundBuffer: ringbuffer.New(socketRingBufferSize),
}
}

if err := svr.start(numCPU); err != nil {
svr.closeLoops()
log.Printf("gnet server is stoping with error: %v\n", err)
Expand Down

0 comments on commit 4cc2f96

Please sign in to comment.