Skip to content

Commit

Permalink
Refactoring to events in pollers
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Oct 5, 2019
1 parent f160e17 commit 85caf3f
Show file tree
Hide file tree
Showing 20 changed files with 400 additions and 241 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# [[中文](README_ZH.md)]

`gnet` is an Event-Loop networking framework that is fast and small. It makes direct [epoll](https://en.wikipedia.org/wiki/Epoll) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) syscalls rather than using the standard Go [net](https://golang.org/pkg/net/) package, and works in a similar manner as [libuv](https://github.com/libuv/libuv) and [libevent](https://github.com/libevent/libevent).
`gnet` is an Event-Loop networking framework that is fast and small. It makes direct [epoll](https://en.wikipedia.org/wiki/Epoll) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) syscalls rather than using the standard Go [net](https://golang.org/pkg/net/) package, and works in a similar manner as [netty](https://github.com/netty/netty) and [libuv](https://github.com/libuv/libuv).

The goal of this project is to create a server framework for Go that performs on par with [Redis](http://redis.io) and [Haproxy](http://www.haproxy.org) for packet handling.

Expand Down Expand Up @@ -219,7 +219,7 @@ Servers can utilize the [SO_REUSEPORT](https://lwn.net/Articles/542629/) option
Just use functional options to set up `SO_REUSEPORT` and you can enjoy this feature:

```go
gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true)))
gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true), gnet.WithReusePort(true)))
```

# Performance
Expand Down
4 changes: 2 additions & 2 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# [[英文](README.md)]

`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll)[kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[libuv](https://github.com/libuv/libuv)[libevent](https://github.com/libevent/libevent)
`gnet` 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 [epoll](https://en.wikipedia.org/wiki/Epoll)[kqueue](https://en.wikipedia.org/wiki/Kqueue) 系统调用而非标准 Golang 网络包:[net](https://golang.org/pkg/net/) 来构建网络应用,它的工作原理类似两个开源的网络库:[netty](https://github.com/netty/netty)[libuv](https://github.com/libuv/libuv)

这个项目存在的价值是提供一个在网络包处理方面能和 [Redis](http://redis.io)[Haproxy](http://www.haproxy.org) 这两个项目具有相近性能的 Go 语言网络服务器框架。

Expand Down Expand Up @@ -216,7 +216,7 @@ events.Tick = func() (delay time.Duration, action Action){
开启这个功能也很简单,使用 functional options 设置一下即可:

```go
gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true)))
gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true), gnet.WithReusePort(true)))
```

# 性能测试
Expand Down
83 changes: 83 additions & 0 deletions acceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2019 Andy Pan. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

// +build darwin netbsd freebsd openbsd dragonfly linux

package gnet

import (
"net"
"os"

"github.com/panjf2000/gnet/ringbuffer"
"golang.org/x/sys/unix"
)

func (svr *server) acceptNewConnection(fd int) error {
nfd, sa, err := unix.Accept(fd)
if err != nil {
if err == unix.EAGAIN {
return nil
}
return err
}
if err := unix.SetNonblock(nfd, true); err != nil {
return err
}
lp := svr.subLoopGroup.next()
c := &conn{
fd: nfd,
sa: sa,
loop: lp,
inboundBuffer: ringbuffer.New(socketRingBufferSize),
outboundBuffer: ringbuffer.New(socketRingBufferSize),
}
_ = lp.loopOpen(c)
_ = lp.poller.Trigger(func() (err error) {
if err = lp.poller.AddRead(nfd); err == nil {
lp.connections[nfd] = c
return
}
return
})
return nil
}

func (ln *listener) close() {
if ln.f != nil {
sniffError(ln.f.Close())
}
if ln.ln != nil {
sniffError(ln.ln.Close())
}
if ln.pconn != nil {
sniffError(ln.pconn.Close())
}
if ln.network == "unix" {
sniffError(os.RemoveAll(ln.addr))
}
}

// system takes the net listener and detaches it from it's parent
// event loop, grabs the file descriptor, and makes it non-blocking.
func (ln *listener) system() error {
var err error
switch netln := ln.ln.(type) {
case nil:
switch pconn := ln.pconn.(type) {
case *net.UDPConn:
ln.f, err = pconn.File()
}
case *net.TCPListener:
ln.f, err = netln.File()
case *net.UnixListener:
ln.f, err = netln.File()
}
if err != nil {
ln.close()
return err
}
ln.fd = int(ln.f.Fd())
return unix.SetNonblock(ln.fd, true)
}
45 changes: 24 additions & 21 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,35 @@ import (
)

type conn struct {
fd int // file descriptor
inBuf *ringbuffer.RingBuffer
outBuf *ringbuffer.RingBuffer
sa unix.Sockaddr // remote socket address
opened bool // connection opened event fired
action Action // next user action
ctx interface{} // user-defined context
localAddr net.Addr // local addre
remoteAddr net.Addr // remote addr
loop *loop // connected loop
extra []byte
fd int // file descriptor
sa unix.Sockaddr // remote socket address
ctx interface{} // user-defined context
loop *loop // connected loop
extra []byte // reuse memory of inbound data
opened bool // connection opened event fired
action Action // next user action
localAddr net.Addr // local addre
remoteAddr net.Addr // remote addr
inboundBuffer *ringbuffer.RingBuffer // buffer for data from client
outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to client
}

func (c *conn) ReadPair() (top, tail []byte) {
if c.inBuf.IsEmpty() {
if c.inboundBuffer.IsEmpty() {
top = c.extra
return
}
top, _ = c.inBuf.PreReadAll()
top, _ = c.inboundBuffer.PreReadAll()
tail = c.extra
return
}

func (c *conn) ReadBytes() []byte {
return c.inBuf.WithBytes(c.extra)
return c.inboundBuffer.WithBytes(c.extra)
}

func (c *conn) ResetBuffer() {
c.inBuf.Reset()
c.inboundBuffer.Reset()
}

func (c *conn) AsyncWrite(buf []byte) {
Expand All @@ -57,32 +57,32 @@ func (c *conn) AsyncWrite(buf []byte) {
func (c *conn) open(buf []byte) {
n, err := unix.Write(c.fd, buf)
if err != nil {
_, _ = c.outBuf.Write(buf)
_, _ = c.outboundBuffer.Write(buf)
return
}

if n < len(buf) {
_, _ = c.outBuf.Write(buf[n:])
_, _ = c.outboundBuffer.Write(buf[n:])
}
}

func (c *conn) write(buf []byte) {
if !c.outBuf.IsEmpty() {
_, _ = c.outBuf.Write(buf)
if !c.outboundBuffer.IsEmpty() {
_, _ = c.outboundBuffer.Write(buf)
return
}
n, err := unix.Write(c.fd, buf)
if err != nil {
if err == unix.EAGAIN {
_, _ = c.outBuf.Write(buf)
_, _ = c.outboundBuffer.Write(buf)
_ = c.loop.poller.ModReadWrite(c.fd)
return
}
_ = c.loop.loopCloseConn(c, err)
return
}
if n < len(buf) {
_, _ = c.outBuf.Write(buf[n:])
_, _ = c.outboundBuffer.Write(buf[n:])
_ = c.loop.poller.ModReadWrite(c.fd)
}
}
Expand All @@ -91,6 +91,9 @@ func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *conn) SendTo(buf []byte, sa unix.Sockaddr) {
_ = unix.Sendto(c.fd, buf, 0, sa)
}

//func (c *conn) Wake() {
// if c.loop != nil {
Expand Down
64 changes: 24 additions & 40 deletions eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ import (
"net"
"time"

"github.com/panjf2000/gnet/internal"
"github.com/panjf2000/gnet/netpoll"
"github.com/panjf2000/gnet/ringbuffer"
"golang.org/x/sys/unix"
)

type loop struct {
idx int // loop index in the server loops list
poller *netpoll.Poller // epoll or kqueue
svr *server // server in loop
packet []byte // read packet buffer
poller *netpoll.Poller // epoll or kqueue
connections map[int]*conn // loop connections fd -> conn
svr *server
}

func (lp *loop) loopRun() {
Expand All @@ -32,29 +31,13 @@ func (lp *loop) loopRun() {
go lp.loopTicker()
}

_ = lp.poller.Polling(func(fd int, job internal.Job) error {
if fd == 0 {
return job()
}
if c, ok := lp.connections[fd]; ok {
switch {
case !c.opened:
return lp.loopOpened(c)
case c.outBuf.Length() > 0:
return lp.loopWrite(c)
default:
return lp.loopRead(c)
}
} else {
return lp.loopAccept(fd)
}
})
_ = lp.poller.Polling(lp.handleEvent)
}

func (lp *loop) loopAccept(fd int) error {
if fd == lp.svr.ln.fd {
if lp.svr.ln.pconn != nil {
return lp.loopUDPRead(fd)
return lp.loopUDPIn(fd)
}
nfd, sa, err := unix.Accept(fd)
if err != nil {
Expand All @@ -67,10 +50,10 @@ func (lp *loop) loopAccept(fd int) error {
return err
}
c := &conn{fd: nfd,
sa: sa,
inBuf: ringbuffer.New(connRingBufferSize),
outBuf: ringbuffer.New(connRingBufferSize),
loop: lp,
sa: sa,
inboundBuffer: ringbuffer.New(socketRingBufferSize),
outboundBuffer: ringbuffer.New(socketRingBufferSize),
loop: lp,
}
if err = lp.poller.AddReadWrite(c.fd); err == nil {
lp.connections[c.fd] = c
Expand All @@ -81,7 +64,7 @@ func (lp *loop) loopAccept(fd int) error {
return nil
}

func (lp *loop) loopOpened(c *conn) error {
func (lp *loop) loopOpen(c *conn) error {
c.opened = true
c.localAddr = lp.svr.ln.lnaddr
c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa)
Expand All @@ -96,13 +79,13 @@ func (lp *loop) loopOpened(c *conn) error {
if len(out) > 0 {
c.open(out)
}
if c.outBuf.Length() != 0 {
if !c.outboundBuffer.IsEmpty() {
_ = lp.poller.AddWrite(c.fd)
}
return lp.handleAction(c)
}

func (lp *loop) loopRead(c *conn) error {
func (lp *loop) loopIn(c *conn) error {
n, err := unix.Read(c.fd, lp.packet)
if n == 0 || err != nil {
if err == unix.EAGAIN {
Expand All @@ -116,23 +99,23 @@ func (lp *loop) loopRead(c *conn) error {
if len(out) > 0 {
c.write(out)
} else if action != DataRead {
_, _ = c.inBuf.Write(c.extra)
_, _ = c.inboundBuffer.Write(c.extra)
}
return lp.handleAction(c)
}

func (lp *loop) loopWrite(c *conn) error {
func (lp *loop) loopOut(c *conn) error {
lp.svr.eventHandler.PreWrite()

top, tail := c.outBuf.PreReadAll()
top, tail := c.outboundBuffer.PreReadAll()
n, err := unix.Write(c.fd, top)
if err != nil {
if err == unix.EAGAIN {
return nil
}
return lp.loopCloseConn(c, err)
}
c.outBuf.Advance(n)
c.outboundBuffer.Advance(n)
if len(top) == n && tail != nil {
n, err = unix.Write(c.fd, tail)
if err != nil {
Expand All @@ -141,10 +124,10 @@ func (lp *loop) loopWrite(c *conn) error {
}
return lp.loopCloseConn(c, err)
}
c.outBuf.Advance(n)
c.outboundBuffer.Advance(n)
}

if c.outBuf.Length() == 0 {
if c.outboundBuffer.IsEmpty() {
_ = lp.poller.ModRead(c.fd)
}
return nil
Expand Down Expand Up @@ -234,7 +217,7 @@ func (lp *loop) handleAction(c *conn) error {
}
}

func (lp *loop) loopUDPRead(fd int) error {
func (lp *loop) loopUDPIn(fd int) error {
n, sa, err := unix.Recvfrom(fd, lp.packet, 0)
if err != nil || n == 0 {
return nil
Expand All @@ -255,15 +238,16 @@ func (lp *loop) loopUDPRead(fd int) error {
sa6 = *sa
}
c := &conn{
localAddr: lp.svr.ln.lnaddr,
remoteAddr: netpoll.SockaddrToUDPAddr(&sa6),
inBuf: ringbuffer.New(connRingBufferSize),
fd: fd,
localAddr: lp.svr.ln.lnaddr,
remoteAddr: netpoll.SockaddrToUDPAddr(&sa6),
inboundBuffer: ringbuffer.New(socketRingBufferSize),
}
_, _ = c.inBuf.Write(lp.packet[:n])
_, _ = c.inboundBuffer.Write(lp.packet[:n])
out, action := lp.svr.eventHandler.React(c)
if len(out) > 0 {
lp.svr.eventHandler.PreWrite()
sniffError(unix.Sendto(fd, out, 0, sa))
c.SendTo(out, sa)
}
switch action {
case Shutdown:
Expand Down
Loading

0 comments on commit 85caf3f

Please sign in to comment.