Skip to content

Commit

Permalink
opt: refactor the toolkit that manipulates sockets
Browse files Browse the repository at this point in the history
Also fixes panjf2000#190
  • Loading branch information
panjf2000 committed Mar 28, 2021
1 parent 058d53d commit 38a8605
Show file tree
Hide file tree
Showing 23 changed files with 162 additions and 124 deletions.
4 changes: 2 additions & 2 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"os"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/netpoll"
"github.com/panjf2000/gnet/internal/socket"
"golang.org/x/sys/unix"
)

Expand All @@ -42,7 +42,7 @@ func (svr *server) acceptNewConnection(fd int) error {
return err
}

netAddr := netpoll.SockaddrToTCPOrUnixAddr(sa)
netAddr := socket.SockaddrToTCPOrUnixAddr(sa)
el := svr.lb.next(netAddr)
c := newTCPConn(nfd, el, sa, netAddr)

Expand Down
27 changes: 7 additions & 20 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

//go:build linux || freebsd || dragonfly || darwin
// +build linux freebsd dragonfly darwin

package gnet
Expand All @@ -27,7 +28,7 @@ import (
"net"
"os"

"github.com/panjf2000/gnet/internal/netpoll"
"github.com/panjf2000/gnet/internal/socket"
"github.com/panjf2000/gnet/pool/bytebuffer"
prb "github.com/panjf2000/gnet/pool/ringbuffer"
"github.com/panjf2000/gnet/ringbuffer"
Expand All @@ -49,31 +50,17 @@ type conn struct {
outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to client
}

func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, remoteAddr net.Addr) *conn {
c := &conn{
func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, remoteAddr net.Addr) (c *conn) {
return &conn{
fd: fd,
sa: sa,
loop: el,
codec: el.svr.codec,
localAddr: el.ln.lnaddr,
remoteAddr: remoteAddr,
inboundBuffer: prb.Get(),
outboundBuffer: prb.Get(),
}
c.localAddr = el.ln.lnaddr
c.remoteAddr = remoteAddr

if el.svr.ln.network != "tcp" {
return c
}

var noDelay bool
switch el.svr.opts.TCPNoDelay {
case TCPNoDelay:
noDelay = true
case TCPDelay:
}
_ = netpoll.SetNoDelay(fd, noDelay)
_ = netpoll.SetKeepAlive(fd, el.svr.opts.TCPKeepAlive)
return c
}

var emptyBuffer = ringbuffer.New(0)
Expand All @@ -98,7 +85,7 @@ func newUDPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn {
fd: fd,
sa: sa,
localAddr: el.ln.lnaddr,
remoteAddr: netpoll.SockaddrToUDPAddr(sa),
remoteAddr: socket.SockaddrToUDPAddr(sa),
}
}

Expand Down
2 changes: 2 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var (
ErrUnsupportedUDSProtocol = errors.New("only unix is supported")
// ErrUnsupportedPlatform occurs when running gnet on an unsupported platform.
ErrUnsupportedPlatform = errors.New("unsupported platform in gnet")
// ErrConnectionClosed occurs when trying to operate a closed connection.
ErrConnectionClosed = errors.New("connection is already closed")

// ================================================= codec errors =================================================

Expand Down
3 changes: 2 additions & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

gerrors "github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/netpoll"
"github.com/panjf2000/gnet/internal/socket"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -96,7 +97,7 @@ func (el *eventloop) loopAccept(fd int) error {
return err
}

netAddr := netpoll.SockaddrToTCPOrUnixAddr(sa)
netAddr := socket.SockaddrToTCPOrUnixAddr(sa)
c := newTCPConn(nfd, el, sa, netAddr)
if err = el.poller.AddRead(c.fd); err == nil {
el.connections[c.fd] = c
Expand Down
24 changes: 23 additions & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal"
"github.com/panjf2000/gnet/internal/logging"
"github.com/panjf2000/gnet/internal/socket"
)

// Action is an action that occurs after the completion of an event.
Expand Down Expand Up @@ -272,7 +273,28 @@ func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err err
network, addr := parseProtoAddr(protoAddr)

var ln *listener
if ln, err = initListener(network, addr, options.ReusePort); err != nil {
var sockopts []socket.Option
if options.ReusePort {
sockopt := socket.Option{SetSockopt: socket.SetReuseport, Opt: 1}
sockopts = append(sockopts, sockopt)
}
if network == "tcp" && options.TCPNoDelay == TCPNoDelay {
sockopt := socket.Option{SetSockopt: socket.SetNoDelay, Opt: 1}
sockopts = append(sockopts, sockopt)
}
if network == "tcp" && options.TCPKeepAlive > 0 {
sockopt := socket.Option{SetSockopt: socket.SetKeepAlive, Opt: int(options.TCPKeepAlive / time.Second)}
sockopts = append(sockopts, sockopt)
}
if options.SocketRecvBuffer > 0 {
sockopt := socket.Option{SetSockopt: socket.SetRecvBuffer, Opt: options.SocketRecvBuffer}
sockopts = append(sockopts, sockopt)
}
if options.SocketSendBuffer > 0 {
sockopt := socket.Option{SetSockopt: socket.SetSendBuffer, Opt: options.SocketSendBuffer}
sockopts = append(sockopts, sockopt)
}
if ln, err = initListener(network, addr, sockopts...); err != nil {
return
}
defer ln.close()
Expand Down
58 changes: 29 additions & 29 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,29 @@ func TestDefaultGnetServer(t *testing.T) {
svr.Tick()
}

type testBadAddrServer struct {
*EventServer
}

func (t *testBadAddrServer) OnInitComplete(srv Server) (action Action) {
return Shutdown
}

func TestBadAddresses(t *testing.T) {
events := new(testBadAddrServer)
if err := Serve(events, "tulip://howdy"); err == nil {
t.Fatalf("expected error")
}
if err := Serve(events, "howdy"); err == nil {
t.Fatalf("expected error")
}
if err := Serve(events, "tcp://"); err != nil {
t.Fatalf("expected nil, got '%v'", err)
}
}

func TestTick(t *testing.T) {
testTick("tcp4", ":9991", t)
testTick("tcp", ":9991", t)
}

type testTickServer struct {
Expand Down Expand Up @@ -614,7 +635,7 @@ func testTick(network, addr string, t *testing.T) {
}

func TestWakeConn(t *testing.T) {
testWakeConn("tcp", ":9000")
testWakeConn("tcp", ":9991")
}

type testWakeConnServer struct {
Expand Down Expand Up @@ -726,29 +747,8 @@ func testShutdown(network, addr string) {
}
}

type testBadAddrServer struct {
*EventServer
}

func (t *testBadAddrServer) OnInitComplete(srv Server) (action Action) {
return Shutdown
}

func TestBadAddresses(t *testing.T) {
events := new(testBadAddrServer)
if err := Serve(events, "tulip://howdy"); err == nil {
t.Fatalf("expected error")
}
if err := Serve(events, "howdy"); err == nil {
t.Fatalf("expected error")
}
if err := Serve(events, "tcp://"); err != nil {
t.Fatalf("expected nil, got '%v'", err)
}
}

func TestCloseActionError(t *testing.T) {
testCloseActionError("tcp", ":9991")
testCloseActionError("tcp", ":9992")
}

type testCloseActionErrorServer struct {
Expand Down Expand Up @@ -796,7 +796,7 @@ func testCloseActionError(network, addr string) {
}

func TestShutdownActionError(t *testing.T) {
testShutdownActionError("tcp", ":9991")
testShutdownActionError("tcp", ":9993")
}

type testShutdownActionErrorServer struct {
Expand Down Expand Up @@ -840,7 +840,7 @@ func testShutdownActionError(network, addr string) {
}

func TestCloseActionOnOpen(t *testing.T) {
testCloseActionOnOpen("tcp", ":9991")
testCloseActionOnOpen("tcp", ":9994")
}

type testCloseActionOnOpenServer struct {
Expand Down Expand Up @@ -880,7 +880,7 @@ func testCloseActionOnOpen(network, addr string) {
}

func TestShutdownActionOnOpen(t *testing.T) {
testShutdownActionOnOpen("tcp", ":9991")
testShutdownActionOnOpen("tcp", ":9995")
}

type testShutdownActionOnOpenServer struct {
Expand Down Expand Up @@ -965,7 +965,7 @@ func testUDPShutdown(network, addr string) {
}

func TestCloseConnection(t *testing.T) {
testCloseConnection("tcp", ":9992")
testCloseConnection("tcp", ":9996")
}

type testCloseConnectionServer struct {
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func TestServerOptionsCheck(t *testing.T) {
}

func TestStop(t *testing.T) {
testStop("tcp", ":9993")
testStop("tcp", ":9997")
}

type testStopServer struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ require (
github.com/valyala/bytebufferpool v1.0.0
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/sys v0.0.0-20201116194326-cc9327a14d48
golang.org/x/sys v0.0.0-20210326220804-49726bf1d181
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201116194326-cc9327a14d48 h1:AYCWBZhgIw6XobZ5CibNJr0Rc4ZofGGKvWa1vcx2IGk=
golang.org/x/sys v0.0.0-20201116194326-cc9327a14d48/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210326220804-49726bf1d181 h1:64ChN/hjER/taL4YJuA+gpLfIMT+/NFherRZixbxOhg=
golang.org/x/sys v0.0.0-20210326220804-49726bf1d181/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

// +build freebsd dragonfly darwin

package reuseport
package socket

import (
"runtime"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

// +build linux freebsd dragonfly

package reuseport
package socket

import "golang.org/x/sys/unix"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package reuseport
package socket

import (
"bufio"
Expand Down
26 changes: 16 additions & 10 deletions internal/reuseport/reuseport.go → internal/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,29 @@

// +build linux freebsd dragonfly darwin

package reuseport
package socket

import (
"net"
)

// TCPSocket calls tcpReusablePort.
func TCPSocket(proto, addr string, reusePort bool) (int, net.Addr, error) {
return tcpReusablePort(proto, addr, reusePort)
// Option is used for setting an option on socket.
type Option struct {
SetSockopt func(int, int) error
Opt int
}

// UDPSocket calls udpReusablePort.
func UDPSocket(proto, addr string, reusePort bool) (int, net.Addr, error) {
return udpReusablePort(proto, addr, reusePort)
// TCPSocket calls the internal tcpSocket.
func TCPSocket(proto, addr string, sockopts ...Option) (int, net.Addr, error) {
return tcpSocket(proto, addr, sockopts...)
}

// UnixSocket calls udsReusablePort.
func UnixSocket(proto, addr string, reusePort bool) (int, net.Addr, error) {
return udsReusablePort(proto, addr, reusePort)
// UDPSocket calls the internal udpSocket.
func UDPSocket(proto, addr string, sockopts ...Option) (int, net.Addr, error) {
return udpSocket(proto, addr, sockopts...)
}

// UnixSocket calls the internal udsSocket.
func UnixSocket(proto, addr string, sockopts ...Option) (int, net.Addr, error) {
return udsSocket(proto, addr, sockopts...)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,21 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package netpoll
package socket

import (
"errors"
"os"
"time"

"golang.org/x/sys/unix"
)

// SetKeepAlive sets whether the operating system should send
// keep-alive messages on the connection and sets period between keep-alive's.
func SetKeepAlive(fd int, d time.Duration) error {
if d <= 0 {
func SetKeepAlive(fd, secs int) error {
if secs <= 0 {
return errors.New("invalid time duration")
}
secs := int(d / time.Second)
if err := os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 38a8605

Please sign in to comment.