From b5a5c715ca07f8b4e3c371a8460274e77953bbdf Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 4 Jul 2020 21:31:10 +0800 Subject: [PATCH] fix: resolve the issue of closing one fd twice --- eventloop_unix.go | 19 +++++++++++++++++-- eventloop_windows.go | 21 ++++++++++++++------- reactor_bsd.go | 4 ++-- reactor_linux.go | 4 ++-- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/eventloop_unix.go b/eventloop_unix.go index c7931e0e7..e7d2f4a68 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -65,11 +65,12 @@ func (el *eventloop) loopRun() { if el.idx == 0 && el.svr.opts.Ticker { go el.loopTicker() } + switch err := el.poller.Polling(el.handleEvent); err { case errors.ErrServerShutdown: el.svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err) default: - el.svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) + el.svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) } } @@ -79,6 +80,7 @@ func (el *eventloop) loopAccept(fd int) error { if el.ln.network == "udp" { return el.loopReadUDP(fd) } + nfd, sa, err := unix.Accept(fd) if err != nil { if err == unix.EAGAIN { @@ -97,6 +99,7 @@ func (el *eventloop) loopAccept(fd int) error { } return err } + return nil } @@ -104,12 +107,13 @@ func (el *eventloop) loopOpen(c *conn) error { c.opened = true c.localAddr = el.ln.lnaddr c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa) - out, action := el.eventHandler.OnOpened(c) if el.svr.opts.TCPKeepAlive > 0 { if proto := el.ln.network; proto == "tcp" || proto == "unix" { _ = netpoll.SetKeepAlive(c.fd, int(el.svr.opts.TCPKeepAlive/time.Second)) } } + + out, action := el.eventHandler.OnOpened(c) if out != nil { c.open(out) } @@ -181,6 +185,7 @@ func (el *eventloop) loopWrite(c *conn) error { if c.outboundBuffer.IsEmpty() { _ = el.poller.ModRead(c.fd) } + return nil } @@ -188,6 +193,12 @@ func (el *eventloop) loopCloseConn(c *conn, err error) error { if !c.outboundBuffer.IsEmpty() && err == nil { _ = el.loopWrite(c) } + + if !c.opened { + el.svr.logger.Debugf("The fd=%d in event-loop(%d) is already closed, skipping it", c.fd, el.idx) + return nil + } + err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd) if err0 == nil && err1 == nil { delete(el.connections, c.fd) @@ -205,6 +216,7 @@ func (el *eventloop) loopCloseConn(c *conn, err error) error { c.fd, el.idx, os.NewSyscallError("close", err1)) } } + return nil } @@ -217,6 +229,7 @@ func (el *eventloop) loopWake(c *conn) error { frame, _ := el.codec.Encode(c, out) c.write(frame) } + return el.handleAction(c, action) } @@ -271,6 +284,7 @@ func (el *eventloop) loopReadUDP(fd int) error { } return nil } + c := newUDPConn(fd, el, sa) out, action := el.eventHandler.React(el.packet[:n], c) if out != nil { @@ -281,5 +295,6 @@ func (el *eventloop) loopReadUDP(fd int) error { return errors.ErrServerShutdown } c.releaseUDP() + return nil } diff --git a/eventloop_windows.go b/eventloop_windows.go index 1cb61123f..1d3298cae 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -51,9 +51,11 @@ func (el *eventloop) loopRun() { el.loopEgress() el.svr.loopWG.Done() }() + if el.idx == 0 && el.svr.opts.Ticker { go el.loopTicker() } + for v := range el.ch { switch v := v.(type) { case error: @@ -72,7 +74,7 @@ func (el *eventloop) loopRun() { err = v() } if err != nil { - el.svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) + el.svr.logger.Infof("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) break } } @@ -83,18 +85,19 @@ func (el *eventloop) loopAccept(c *stdConn) error { c.localAddr = el.svr.ln.lnaddr c.remoteAddr = c.conn.RemoteAddr() el.calibrateCallback(el, 1) - - out, action := el.eventHandler.OnOpened(c) - if out != nil { - el.eventHandler.PreWrite() - _, _ = c.conn.Write(out) - } if el.svr.opts.TCPKeepAlive > 0 { if c, ok := c.conn.(*net.TCPConn); ok { _ = c.SetKeepAlive(true) _ = c.SetKeepAlivePeriod(el.svr.opts.TCPKeepAlive) } } + + out, action := el.eventHandler.OnOpened(c) + if out != nil { + el.eventHandler.PreWrite() + _, _ = c.conn.Write(out) + } + return el.handleAction(c, action) } @@ -123,6 +126,7 @@ func (el *eventloop) loopRead(ti *tcpIn) (err error) { _, _ = c.inboundBuffer.Write(c.buffer.Bytes()) bytebuffer.Put(c.buffer) c.buffer = nil + return } @@ -185,6 +189,7 @@ func (el *eventloop) loopError(c *stdConn, err error) (e error) { } else { el.svr.logger.Warnf("Failed to close connection(%s), error: %v", c.remoteAddr.String(), e) } + return } @@ -197,6 +202,7 @@ func (el *eventloop) loopWake(c *stdConn) error { frame, _ := el.codec.Encode(c, out) _, _ = c.conn.Write(frame) } + return el.handleAction(c, action) } @@ -224,5 +230,6 @@ func (el *eventloop) loopReadUDP(c *stdConn) error { return errors.ErrServerShutdown } c.releaseUDP() + return nil } diff --git a/reactor_bsd.go b/reactor_bsd.go index f48a5a020..ae2f2e030 100644 --- a/reactor_bsd.go +++ b/reactor_bsd.go @@ -33,7 +33,7 @@ func (svr *server) activateMainReactor() { case errors.ErrServerShutdown: svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err) default: - svr.logger.Fatalf("Main reactor is exiting due to an unexpected error: %v", err) + svr.logger.Errorf("Main reactor is exiting due to an unexpected error: %v", err) } } @@ -76,6 +76,6 @@ func (svr *server) activateSubReactor(el *eventloop) { case errors.ErrServerShutdown: svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err) default: - svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) + svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) } } diff --git a/reactor_linux.go b/reactor_linux.go index 2a9192352..5c08426d2 100644 --- a/reactor_linux.go +++ b/reactor_linux.go @@ -31,7 +31,7 @@ func (svr *server) activateMainReactor() { case errors.ErrServerShutdown: svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err) default: - svr.logger.Fatalf("Main reactor is exiting due to an unexpected error: %v", err) + svr.logger.Errorf("Main reactor is exiting due to an unexpected error: %v", err) } } @@ -70,6 +70,6 @@ func (svr *server) activateSubReactor(el *eventloop) { case errors.ErrServerShutdown: svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err) default: - svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) + svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err) } }