Skip to content

Commit

Permalink
Improve errors handling (panjf2000#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Nov 2, 2020
1 parent 83d304c commit 053b78c
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 86 deletions.
3 changes: 2 additions & 1 deletion acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package gnet
import (
"os"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/netpoll"
"golang.org/x/sys/unix"
)
Expand All @@ -35,7 +36,7 @@ func (svr *server) acceptNewConnection(fd int) error {
if err == unix.EAGAIN {
return nil
}
return os.NewSyscallError("accept", err)
return errors.ErrAcceptSocket
}
if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ func (c *conn) write(buf []byte) (err error) {
err = c.loop.poller.ModReadWrite(c.fd)
return
}
_ = c.loop.loopCloseConn(c, os.NewSyscallError("write", err))
return
return c.loop.loopCloseConn(c, os.NewSyscallError("write", err))
}
if n < len(outFrame) {
_, _ = c.outboundBuffer.Write(outFrame[n:])
Expand Down
9 changes: 7 additions & 2 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ package errors
import "errors"

var (
// ErrServerShutdown occurs when server is closing.
ErrServerShutdown = errors.New("server is going to be shutdown")
// ErrAcceptSocket occurs when acceptor does not accept the new connection properly.
ErrAcceptSocket = errors.New("accept a new connection error")
// ErrTooManyEventLoopThreads occurs when attempting to set up more than 10,000 event-loop goroutines under LockOSThread mode.
ErrTooManyEventLoopThreads = errors.New("too many event-loops under LockOSThread mode")
// ErrUnsupportedProtocol occurs when trying to use protocol that is not supported.
Expand All @@ -35,8 +39,9 @@ var (
ErrUnsupportedUDSProtocol = errors.New("only unix is supported")
// ErrUnsupportedPlatform occurs when running gnet on an unsupported platform.
ErrUnsupportedPlatform = errors.New("unsupported platform in gnet")
// ErrServerShutdown occurs when server is closing.
ErrServerShutdown = errors.New("server is going to be shutdown")

// ================================================= codec errors =================================================

// ErrInvalidFixedLength occurs when the output data have invalid fixed length.
ErrInvalidFixedLength = errors.New("invalid fixed length of bytes")
// ErrUnexpectedEOF occurs when no enough data to read by codec.
Expand Down
50 changes: 25 additions & 25 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
package gnet

import (
"errors"
"fmt"
"os"
"runtime"
"time"

"github.com/panjf2000/gnet/errors"
gerrors "github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/netpoll"
"golang.org/x/sys/unix"
)
Expand Down Expand Up @@ -71,13 +73,8 @@ func (el *eventloop) loopRun(lockOSThread bool) {
go el.loopTicker()
}

switch err := el.poller.Polling(el.handleEvent); err {
case errors.ErrServerShutdown:
el.svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err)
default:
el.svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)

}
err := el.poller.Polling(el.handleEvent)
el.svr.logger.Infof("Event-loop(%d) is exiting due to error: %v", el.idx, err)
}

func (el *eventloop) loopAccept(fd int) error {
Expand Down Expand Up @@ -148,7 +145,7 @@ func (el *eventloop) loopRead(c *conn) error {
case Close:
return el.loopCloseConn(c, nil)
case Shutdown:
return errors.ErrServerShutdown
return gerrors.ErrServerShutdown
}

// Check the status of connection every loop since it might be closed during writing data back to client due to
Expand Down Expand Up @@ -193,10 +190,9 @@ func (el *eventloop) loopWrite(c *conn) error {
return nil
}

func (el *eventloop) loopCloseConn(c *conn, err error) error {
func (el *eventloop) loopCloseConn(c *conn, err error) (rerr error) {
if !c.opened {
el.svr.logger.Debugf("The fd=%d in event-loop(%d) is already closed, skipping it", c.fd, el.idx)
return nil
return fmt.Errorf("the fd=%d in event-loop(%d) is already closed, skipping it", c.fd, el.idx)
}

// Send residual data in buffer back to client before actually closing the connection.
Expand All @@ -215,20 +211,24 @@ func (el *eventloop) loopCloseConn(c *conn, err error) error {
delete(el.connections, c.fd)
el.calibrateCallback(el, -1)
if el.eventHandler.OnClosed(c, err) == Shutdown {
return errors.ErrServerShutdown
return gerrors.ErrServerShutdown
}
c.releaseTCP()
} else {
if err0 != nil {
el.svr.logger.Warnf("Failed to delete fd=%d from poller in event-loop(%d), %v", c.fd, el.idx, err0)
rerr = fmt.Errorf("failed to delete fd=%d from poller in event-loop(%d): %v", c.fd, el.idx, err0)
}
if err1 != nil {
el.svr.logger.Warnf("Failed to close fd=%d in event-loop(%d), %v",
c.fd, el.idx, os.NewSyscallError("close", err1))
err1 = fmt.Errorf("failed to close fd=%d in event-loop(%d): %v", c.fd, el.idx, os.NewSyscallError("close", err1))
if rerr != nil {
rerr = errors.New(rerr.Error() + " & " + err1.Error())
} else {
rerr = err1
}
}
}

return nil
return
}

func (el *eventloop) loopWake(c *conn) error {
Expand Down Expand Up @@ -258,7 +258,7 @@ func (el *eventloop) loopTicker() {
switch action {
case None:
case Shutdown:
err = errors.ErrServerShutdown
err = gerrors.ErrServerShutdown
}
return
})
Expand All @@ -281,20 +281,20 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
case Close:
return el.loopCloseConn(c, nil)
case Shutdown:
return errors.ErrServerShutdown
return gerrors.ErrServerShutdown
default:
return nil
}
}

func (el *eventloop) loopReadUDP(fd int) error {
n, sa, err := unix.Recvfrom(fd, el.packet, 0)
if err != nil || n == 0 {
if err != nil && err != unix.EAGAIN {
el.svr.logger.Warnf("Failed to read UDP packet from fd=%d in event-loop(%d), %v",
fd, el.idx, os.NewSyscallError("recvfrom", err))
if err != nil {
if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
return nil
}
return nil
return fmt.Errorf("failed to read UDP packet from fd=%d in event-loop(%d), %v",
fd, el.idx, os.NewSyscallError("recvfrom", err))
}

c := newUDPConn(fd, el, sa)
Expand All @@ -304,7 +304,7 @@ func (el *eventloop) loopReadUDP(fd int) error {
_ = c.sendTo(out)
}
if action == Shutdown {
return errors.ErrServerShutdown
return gerrors.ErrServerShutdown
}
c.releaseUDP()

Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
module github.com/panjf2000/gnet

go 1.13
go 1.15

require (
github.com/panjf2000/ants/v2 v2.4.1
github.com/panjf2000/ants/v2 v2.4.3
github.com/valyala/bytebufferpool v1.0.0
go.uber.org/zap v1.15.0
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/sys v0.0.0-20201101102859-da207088b7d1
)
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/panjf2000/ants/v2 v2.4.1 h1:7RtUqj5lGOw0WnZhSKDZ2zzJhaX5490ZW1sUolRXCxY=
github.com/panjf2000/ants/v2 v2.4.1/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU=
github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -26,12 +26,16 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
Expand All @@ -43,8 +47,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201101102859-da207088b7d1 h1:a/mKvvZr9Jcc8oKfcmgzyp7OwF73JPWsQLvH1z2Kxck=
golang.org/x/sys v0.0.0-20201101102859-da207088b7d1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
Expand Down
29 changes: 21 additions & 8 deletions internal/netpoll/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"unsafe"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal"
"github.com/panjf2000/gnet/internal/logging"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -88,31 +89,43 @@ func (p *Poller) Trigger(job internal.Job) (err error) {
}

// Polling blocks the current goroutine, waiting for network-events.
func (p *Poller) Polling(callback func(fd int, ev uint32) error) (err error) {
func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
el := newEventList(InitEvents)
var wakenUp bool

for {
n, err0 := unix.EpollWait(p.fd, el.events, -1)
if err0 != nil && err0 != unix.EINTR {
logging.DefaultLogger.Warnf("Error occurs in epoll, %v", os.NewSyscallError("epoll_wait", err0))
n, err := unix.EpollWait(p.fd, el.events, -1)
if err != nil && err != unix.EINTR {
logging.DefaultLogger.Warnf("Error occurs in epoll: %v", os.NewSyscallError("epoll_wait", err))
continue
}

for i := 0; i < n; i++ {
if fd := int(el.events[i].Fd); fd != p.wfd {
if err = callback(fd, el.events[i].Events); err != nil {
return
switch err = callback(fd, el.events[i].Events); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrServerShutdown:
return err
default:
logging.DefaultLogger.Warnf("Error occurs in event-loop: %v", err)
}
} else {
wakenUp = true
_, _ = unix.Read(p.wfd, p.wfdBuf)
}
}

if wakenUp {
wakenUp = false
if err = p.asyncJobQueue.ForEach(); err != nil {
return
switch err = p.asyncJobQueue.ForEach(); err {
case nil:
case errors.ErrServerShutdown:
return err
default:
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
}

if n == el.size {
el.increase()
}
Expand Down
29 changes: 21 additions & 8 deletions internal/netpoll/kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package netpoll
import (
"os"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal"
"github.com/panjf2000/gnet/internal/logging"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -79,35 +80,47 @@ func (p *Poller) Trigger(job internal.Job) (err error) {
}

// Polling blocks the current goroutine, waiting for network-events.
func (p *Poller) Polling(callback func(fd int, filter int16) error) (err error) {
func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
el := newEventList(InitEvents)
var wakenUp bool

for {
n, err0 := unix.Kevent(p.fd, nil, el.events, nil)
if err0 != nil && err0 != unix.EINTR {
logging.DefaultLogger.Warnf("Error occurs in kqueue, %v", os.NewSyscallError("kevent wait", err0))
n, err := unix.Kevent(p.fd, nil, el.events, nil)
if err != nil && err != unix.EINTR {
logging.DefaultLogger.Warnf("Error occurs in kqueue: %v", os.NewSyscallError("kevent wait", err))
continue
}

var evFilter int16
for i := 0; i < n; i++ {
if fd := int(el.events[i].Ident); fd != 0 {
evFilter = el.events[i].Filter
if (el.events[i].Flags&unix.EV_EOF != 0) || (el.events[i].Flags&unix.EV_ERROR != 0) {
evFilter = EVFilterSock
}
if err = callback(fd, evFilter); err != nil {
return
switch err = callback(fd, evFilter); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrServerShutdown:
return err
default:
logging.DefaultLogger.Warnf("Error occurs in event-loop: %v", err)
}
} else {
wakenUp = true
}
}

if wakenUp {
wakenUp = false
if err = p.asyncJobQueue.ForEach(); err != nil {
return
switch err = p.asyncJobQueue.ForEach(); err {
case nil:
case errors.ErrServerShutdown:
return err
default:
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
}

if n == el.size {
el.increase()
}
Expand Down
Loading

0 comments on commit 053b78c

Please sign in to comment.