Skip to content

Commit

Permalink
Set up a bytes pool
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Oct 28, 2019
1 parent 0d26f22 commit 998b920
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 43 deletions.
2 changes: 1 addition & 1 deletion acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (svr *server) acceptNewConnection(fd int) error {
return err
}
lp := svr.subLoopGroup.next()
c := initConn(nfd, lp, sa)
c := newConn(nfd, lp, sa)
_ = lp.poller.Trigger(func() (err error) {
if err = lp.poller.AddRead(nfd); err != nil {
return
Expand Down
90 changes: 50 additions & 40 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,63 @@ type conn struct {
outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to client
}

func initConn(fd int, lp *loop, sa unix.Sockaddr) *conn {
func newConn(fd int, lp *loop, sa unix.Sockaddr) *conn {
return &conn{
fd: fd,
loop: lp,
sa: sa,
inboundBuffer: ringbuffer.New(socketRingBufferSize),
outboundBuffer: ringbuffer.New(socketRingBufferSize),
inboundBuffer: lp.svr.bytesPool.Get().(*ringbuffer.RingBuffer),
outboundBuffer: lp.svr.bytesPool.Get().(*ringbuffer.RingBuffer),
}
}

func (c *conn) reset() {
c.opened = false
c.inboundBuffer.Reset()
c.outboundBuffer.Reset()
c.loop.svr.bytesPool.Put(c.inboundBuffer)
c.loop.svr.bytesPool.Put(c.outboundBuffer)
}

func (c *conn) open(buf []byte) {
n, err := unix.Write(c.fd, buf)
if err != nil {
_, _ = c.outboundBuffer.Write(buf)
return
}

if n < len(buf) {
_, _ = c.outboundBuffer.Write(buf[n:])
}
}

func (c *conn) write(buf []byte) {
if !c.outboundBuffer.IsEmpty() {
_, _ = c.outboundBuffer.Write(buf)
return
}
n, err := unix.Write(c.fd, buf)
if err != nil {
if err == unix.EAGAIN {
_, _ = c.outboundBuffer.Write(buf)
_ = c.loop.poller.ModReadWrite(c.fd)
return
}
_ = c.loop.loopCloseConn(c, err)
return
}
if n < len(buf) {
_, _ = c.outboundBuffer.Write(buf[n:])
_ = c.loop.poller.ModReadWrite(c.fd)
}
}

func (c *conn) sendTo(buf []byte, sa unix.Sockaddr) {
_ = unix.Sendto(c.fd, buf, 0, sa)
}

// ================================= Public APIs of gnet.Conn =================================

func (c *conn) Read() []byte {
if c.inboundBuffer.IsEmpty() {
return c.cache
Expand Down Expand Up @@ -117,40 +164,3 @@ func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }

func (c *conn) open(buf []byte) {
n, err := unix.Write(c.fd, buf)
if err != nil {
_, _ = c.outboundBuffer.Write(buf)
return
}

if n < len(buf) {
_, _ = c.outboundBuffer.Write(buf[n:])
}
}

func (c *conn) write(buf []byte) {
if !c.outboundBuffer.IsEmpty() {
_, _ = c.outboundBuffer.Write(buf)
return
}
n, err := unix.Write(c.fd, buf)
if err != nil {
if err == unix.EAGAIN {
_, _ = c.outboundBuffer.Write(buf)
_ = c.loop.poller.ModReadWrite(c.fd)
return
}
_ = c.loop.loopCloseConn(c, err)
return
}
if n < len(buf) {
_, _ = c.outboundBuffer.Write(buf[n:])
_ = c.loop.poller.ModReadWrite(c.fd)
}
}

func (c *conn) sendTo(buf []byte, sa unix.Sockaddr) {
_ = unix.Sendto(c.fd, buf, 0, sa)
}
4 changes: 2 additions & 2 deletions eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (lp *loop) loopAccept(fd int) error {
if err := unix.SetNonblock(nfd, true); err != nil {
return err
}
c := initConn(nfd, lp, sa)
c := newConn(nfd, lp, sa)
if err = lp.poller.AddReadWrite(c.fd); err == nil {
lp.connections[c.fd] = c
} else {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (lp *loop) loopCloseConn(c *conn, err error) error {
case Shutdown:
return errShutdown
}
c.opened = false
c.reset()
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions gnet_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/panjf2000/gnet/netpoll"
"github.com/panjf2000/gnet/ringbuffer"
)

type server struct {
Expand All @@ -24,6 +25,7 @@ type server struct {
once sync.Once // make sure only signalShutdown once
cond *sync.Cond // shutdown signaler
mainLoop *loop // main loop for accepting connections
bytesPool sync.Pool // pool for storing bytes
eventHandler EventHandler // user eventHandler
subLoopGroup IEventLoopGroup // loops for handling events
subLoopGroupSize int // number of loops
Expand Down Expand Up @@ -208,6 +210,10 @@ func serve(eventHandler EventHandler, listener *listener, options *Options) erro
return nil
}

svr.bytesPool.New = func() interface{} {
return ringbuffer.New(socketRingBufferSize)
}

if err := svr.start(numCPU); err != nil {
svr.closeLoops()
log.Printf("gnet server is stoping with error: %v\n", err)
Expand Down

0 comments on commit 998b920

Please sign in to comment.