Skip to content

Commit

Permalink
feature: support Closing connection from individual goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Feb 13, 2020
1 parent 8fecd38 commit 52c5f31
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 24 deletions.
10 changes: 8 additions & 2 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,18 @@ func (c *conn) SendTo(buf []byte) {
c.sendTo(buf)
}

func (c *conn) Wake() {
_ = c.loop.poller.Trigger(func() error {
func (c *conn) Wake() error {
return c.loop.poller.Trigger(func() error {
return c.loop.loopWake(c)
})
}

func (c *conn) Close() error {
return c.loop.poller.Trigger(func() error {
return c.loop.loopCloseConn(c, nil)
})
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
Expand Down
11 changes: 10 additions & 1 deletion connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,17 @@ func (c *stdConn) SendTo(buf []byte) {
_, _ = c.loop.svr.ln.pconn.WriteTo(buf, c.remoteAddr)
}

func (c *stdConn) Wake() error {
c.loop.ch <- wakeReq{c}
return nil
}

func (c *stdConn) Close() error {
c.loop.ch <- stderr{c, nil}
return nil
}

func (c *stdConn) Context() interface{} { return c.ctx }
func (c *stdConn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *stdConn) LocalAddr() net.Addr { return c.localAddr }
func (c *stdConn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *stdConn) Wake() { c.loop.ch <- wakeReq{c} }
9 changes: 4 additions & 5 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func (el *eventloop) loopWake(c *conn) error {
}
out, action := el.eventHandler.React(nil, c)
if out != nil {
c.write(out)
frame, _ := el.codec.Encode(c, out)
c.write(frame)
}
return el.handleAction(c, action)
}
Expand All @@ -179,7 +180,7 @@ func (el *eventloop) loopTicker() {
open bool
)
for {
if err := el.poller.Trigger(func() (err error) {
sniffError(el.poller.Trigger(func() (err error) {
delay, action := el.eventHandler.Tick()
el.svr.ticktock <- delay
switch action {
Expand All @@ -188,9 +189,7 @@ func (el *eventloop) loopTicker() {
err = errServerShutdown
}
return
}); err != nil {
break
}
}))
if delay, open = <-el.svr.ticktock; open {
time.Sleep(delay)
} else {
Expand Down
5 changes: 3 additions & 2 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (el *eventloop) loopRead(ti *tcpIn) (err error) {
return errServerShutdown
}
if err != nil {
return el.loopClose(c)
return el.loopError(c, err)
}
}
_, _ = c.inboundBuffer.Write(c.buffer.Bytes())
Expand Down Expand Up @@ -185,7 +185,8 @@ func (el *eventloop) loopWake(c *stdConn) error {
}
out, action := el.eventHandler.React(nil, c)
if out != nil {
_, _ = c.conn.Write(out)
frame, _ := el.codec.Encode(c, out)
_, _ = c.conn.Write(frame)
}
return el.handleAction(c, action)
}
Expand Down
5 changes: 4 additions & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ type Conn interface {
AsyncWrite(buf []byte)

// Wake triggers a React event for this connection.
Wake()
Wake() error

// Close closes the current connection.
Close() error
}

type (
Expand Down
115 changes: 104 additions & 11 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,13 @@ func (t *testWakeConnServer) OnOpened(c Conn) (out []byte, action Action) {
return
}
func (t *testWakeConnServer) OnClosed(c Conn, err error) (action Action) {
_ = t.conn.Wake()
action = Shutdown
return
}
func (t *testWakeConnServer) React(frame []byte, c Conn) (out []byte, action Action) {
out = []byte("Waking up.")
action = -1
return
}
func (t *testWakeConnServer) Tick() (delay time.Duration, action Action) {
Expand All @@ -617,7 +619,7 @@ func (t *testWakeConnServer) Tick() (delay time.Duration, action Action) {
}()
return
}
t.conn.Wake()
_ = t.conn.Wake()
delay = time.Millisecond * 100
return
}
Expand Down Expand Up @@ -728,14 +730,13 @@ func (t *testCloseActionErrorServer) Tick() (delay time.Duration, action Action)
conn, err := net.Dial(t.network, t.addr)
must(err)
defer conn.Close()
r := make([]byte, 10)
rand.Read(r)
_, _ = conn.Write(r)
_, err = conn.Read(r)
data := []byte("Hello World!")
_, _ = conn.Write(data)
_, err = conn.Read(data)
if err != nil {
panic(err)
}
fmt.Println(string(r))
fmt.Println(string(data))
}()
return
}
Expand Down Expand Up @@ -771,14 +772,13 @@ func (t *testShutdownActionErrorServer) Tick() (delay time.Duration, action Acti
conn, err := net.Dial(t.network, t.addr)
must(err)
defer conn.Close()
r := make([]byte, 10)
rand.Read(r)
_, _ = conn.Write(r)
_, err = conn.Read(r)
data := []byte("Hello World!")
_, _ = conn.Write(data)
_, err = conn.Read(data)
if err != nil {
panic(err)
}
fmt.Println(string(r))
fmt.Println(string(data))
}()
return
}
Expand Down Expand Up @@ -862,3 +862,96 @@ func testShutdownActionOnOpen(network, addr string) {
events := &testShutdownActionOnOpenServer{network: network, addr: addr}
must(Serve(events, network+"://"+addr, WithTicker(true)))
}

func TestUDPShutdown(t *testing.T) {
testUDPShutdown("udp", ":9000")
}

type testUDPShutdownServer struct {
*EventServer
network string
addr string
tick bool
}

func (t *testUDPShutdownServer) React(frame []byte, c Conn) (out []byte, action Action) {
out = frame
action = Shutdown
return
}
func (t *testUDPShutdownServer) Tick() (delay time.Duration, action Action) {
if !t.tick {
t.tick = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
must(err)
defer conn.Close()
data := []byte("Hello World!")
if _, err = conn.Write(data); err != nil {
panic(err)
}
if _, err = conn.Read(data); err != nil {
panic(err)
}
fmt.Println(string(data))
}()
return
}
delay = time.Millisecond * 100
return
}

func testUDPShutdown(network, addr string) {
svr := &testUDPShutdownServer{network: network, addr: addr}
must(Serve(svr, network+"://"+addr, WithTicker(true)))
}

func TestCloseConnection(t *testing.T) {
testCloseConnection("tcp", ":9991")
}

type testCloseConnectionServer struct {
*EventServer
network, addr string
action bool
}

func (t *testCloseConnectionServer) OnClosed(c Conn, err error) (action Action) {
action = Shutdown
return
}
func (t *testCloseConnectionServer) React(frame []byte, c Conn) (out []byte, action Action) {
out = frame
go func() {
time.Sleep(time.Second)
_ = c.Close()
}()
return
}
func (t *testCloseConnectionServer) Tick() (delay time.Duration, action Action) {
if !t.action {
t.action = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
must(err)
defer conn.Close()
data := []byte("Hello World!")
_, _ = conn.Write(data)
_, err = conn.Read(data)
if err != nil {
panic(err)
}
fmt.Println(string(data))
}()
return
}
delay = time.Millisecond * 100
return
}

func testCloseConnection(network, addr string) {
events := &testCloseConnectionServer{network: network, addr: addr}
must(Serve(events, network+"://"+addr, WithTicker(true)))
}
2 changes: 0 additions & 2 deletions loop_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func (el *eventloop) handleEvent(fd int, ev uint32) error {
return el.loopRead(c)
}
return nil
default:
return nil
}
}
return el.loopAccept(fd)
Expand Down

0 comments on commit 52c5f31

Please sign in to comment.