Skip to content

Commit

Permalink
opt: refine the code of I/O handlers (panjf2000#586)
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Apr 21, 2024
1 parent d473f26 commit 39c175b
Show file tree
Hide file tree
Showing 15 changed files with 40 additions and 103 deletions.
28 changes: 0 additions & 28 deletions acceptor_bsd.go

This file was deleted.

28 changes: 0 additions & 28 deletions acceptor_linux.go

This file was deleted.

21 changes: 12 additions & 9 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/panjf2000/gnet/v2/pkg/errors"
)

func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
for {
nfd, sa, err := socket.Accept(fd)
if err != nil {
Expand All @@ -41,33 +41,33 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
// It's a silly error, let's retry it.
continue
default:
eng.opts.Logger.Errorf("Accept() failed due to error: %v", err)
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if eng.opts.TCPKeepAlive > 0 && eng.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds()))
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive.Seconds()))
if err != nil {
eng.opts.Logger.Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
}
}

el := eng.eventLoops.next(remoteAddr)
el := el.engine.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)
el.getLogger().Errorf("failed to enqueue the accepted socket fd=%d to poller: %v", c.fd, err)
_ = unix.Close(nfd)
c.release()
}
}
}

func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
if el.listeners[fd].network == "udp" {
return el.readUDP1(fd, ev, flags)
return el.readUDP(fd, ev, flags)
}

nfd, sa, err := socket.Accept(fd)
Expand Down Expand Up @@ -98,6 +98,9 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
addEvents = el.poller.AddReadWrite
}
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
el.getLogger().Errorf("failed to register the accepted socket fd=%d to poller: %v", c.fd, err)
_ = unix.Close(c.fd)
c.release()
return err
}
el.connections.addConn(c, el.idx)
Expand Down
6 changes: 1 addition & 5 deletions connection_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
)

func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) {
func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) {
el := c.loop
switch filter {
case unix.EVFILT_READ:
Expand Down Expand Up @@ -56,7 +56,3 @@ func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) {
}
return
}

func (el *eventloop) readUDP(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error {
return el.readUDP1(fd, filter, flags)
}
6 changes: 1 addition & 5 deletions connection_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
)

func (c *conn) handleEvents(_ int, ev uint32) error {
func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
el := c.loop
// First check for any unexpected non-IO events.
// For these events we just close the corresponding connection directly.
Expand Down Expand Up @@ -68,7 +68,3 @@ func (c *conn) handleEvents(_ int, ev uint32) error {
}
return nil
}

func (el *eventloop) readUDP(fd int, ev netpoll.IOEvent) error {
return el.readUDP1(fd, ev, 0)
}
2 changes: 1 addition & 1 deletion connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, localAddr, remoteAddr n
remoteAddr: remoteAddr,
pollAttachment: netpoll.PollAttachment{FD: fd},
}
c.pollAttachment.Callback = c.handleEvents
c.pollAttachment.Callback = c.processIO
c.outboundBuffer.Reset(el.engine.opts.WriteBufferCap)
return
}
Expand Down
18 changes: 9 additions & 9 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
)

type engine struct {
listeners map[int]*listener // listeners for accepting new connections
listeners map[int]*listener // listeners for accepting incoming connections
opts *Options // options with engine
acceptor *eventloop // main event-loop for accepting connections
ingress *eventloop // main event-loop that monitors all listeners
eventLoops loadBalancer // event-loops for handling events
inShutdown int32 // whether the engine is in shutdown
ticker struct {
Expand Down Expand Up @@ -76,11 +76,11 @@ func (eng *engine) closeEventLoops() {
_ = el.poller.Close()
return true
})
if eng.acceptor != nil {
if eng.ingress != nil {
for _, ln := range eng.listeners {
ln.close()
}
err := eng.acceptor.poller.Close()
err := eng.ingress.poller.Close()
if err != nil {
eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err)
}
Expand Down Expand Up @@ -175,19 +175,19 @@ func (eng *engine) activateReactors(numEventLoop int) error {
el.poller = p
el.eventHandler = eng.eventHandler
for _, ln := range eng.listeners {
if err = el.poller.AddRead(ln.packPollAttachment(eng.accept), true); err != nil {
if err = el.poller.AddRead(ln.packPollAttachment(el.accept0), true); err != nil {
return err
}
}
eng.acceptor = el
eng.ingress = el

// Start main reactor in background.
eng.workerPool.Go(el.rotate)

// Start the ticker.
if eng.opts.Ticker {
eng.workerPool.Go(func() error {
eng.acceptor.ticker(eng.ticker.ctx)
eng.ingress.ticker(eng.ticker.ctx)
return nil
})
}
Expand Down Expand Up @@ -217,8 +217,8 @@ func (eng *engine) stop(s Engine) {
}
return true
})
if eng.acceptor != nil {
err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if eng.ingress != nil {
err := eng.ingress.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
}
}

func (el *eventloop) readUDP1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
n, sa, err := unix.Recvfrom(fd, el.buffer, 0)
if err != nil {
if err == unix.EAGAIN {
Expand Down
3 changes: 3 additions & 0 deletions internal/netpoll/defs_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package netpoll
// IOFlags represents the flags of IO events.
type IOFlags = uint16

// PollEventHandler is the callback for I/O events notified by the poller.
type PollEventHandler func(int, IOEvent, IOFlags) error

// PollAttachment is the user data which is about to be stored in "void *ptr" of epoll_data or "void *udata" of kevent.
type PollAttachment struct {
FD int
Expand Down
3 changes: 0 additions & 3 deletions internal/netpoll/defs_poller_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ const (
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
)

// PollEventHandler is the callback for I/O events notified by the poller.
type PollEventHandler func(int, uint32) error

type eventList struct {
size int
events []epollevent
Expand Down
3 changes: 0 additions & 3 deletions internal/netpoll/defs_poller_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ const (
MaxAsyncTasksAtOneTime = 128
)

// PollEventHandler is the callback for I/O events notified by the poller.
type PollEventHandler func(int, int16, uint16) error

type eventList struct {
size int
events []unix.Kevent_t
Expand Down
2 changes: 1 addition & 1 deletion internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
if fd := int(ev.Fd); fd == p.efd { // poller is awakened to run tasks in queues.
doChores = true
} else {
switch err = callback(fd, ev.Events); err {
switch err = callback(fd, ev.Events, 0); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/netpoll/poller_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *Poller) Polling() error {
if pollAttachment.FD == p.epa.FD { // poller is awakened to run tasks in queues.
doChores = true
} else {
switch err = pollAttachment.Callback(pollAttachment.FD, ev.events); err {
switch err = pollAttachment.Callback(pollAttachment.FD, ev.events, 0); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
return err
Expand Down
12 changes: 6 additions & 6 deletions reactor_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (el *eventloop) rotate() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(el.engine.accept)
err := el.poller.Polling(el.accept0)
if err == errors.ErrEngineShutdown {
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
err = nil
Expand All @@ -52,10 +52,10 @@ func (el *eventloop) orbit() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, ev uint32) error {
err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
c := el.connections.getConn(fd)
if c == nil {
// Somehow epoll notify with an event for a stale fd that is not in our connection set.
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
// We need to delete it from the epoll set.
return el.poller.Delete(fd)
}
Expand Down Expand Up @@ -121,13 +121,13 @@ func (el *eventloop) run() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, ev uint32) error {
err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
c := el.connections.getConn(fd)
if c == nil {
if _, ok := el.listeners[fd]; ok {
return el.accept(fd, ev)
return el.accept(fd, ev, flags)
}
// Somehow epoll notify with an event for a stale fd that is not in our connection set.
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
// We need to delete it from the epoll set.
return el.poller.Delete(fd)

Expand Down
7 changes: 4 additions & 3 deletions reactor_kqueue_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/pkg/errors"
)

Expand All @@ -33,7 +34,7 @@ func (el *eventloop) rotate() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(el.engine.accept)
err := el.poller.Polling(el.accept0)
if err == errors.ErrEngineShutdown {
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
err = nil
Expand All @@ -52,7 +53,7 @@ func (el *eventloop) orbit() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) {
err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) {
c := el.connections.getConn(fd)
if c == nil {
// This might happen when the connection has already been closed,
Expand Down Expand Up @@ -110,7 +111,7 @@ func (el *eventloop) run() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) {
err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) {
c := el.connections.getConn(fd)
if c == nil {
if _, ok := el.listeners[fd]; ok {
Expand Down

0 comments on commit 39c175b

Please sign in to comment.