Skip to content

Commit

Permalink
Support Windows with net stdlib, fix panjf2000#19
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Nov 14, 2019
1 parent 68e6dae commit 9be6b06
Show file tree
Hide file tree
Showing 23 changed files with 754 additions and 104 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ env:
os:
- linux
- osx
- windows
go:
- "1.11.x"
- "1.12.x"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ The goal of this project is to create a server framework for Go that performs on
- [x] Concise APIs
- [x] Efficient memory usage: Ring-Buffer
- [x] Supporting multiple protocols: TCP, UDP, and Unix Sockets
- [x] Supporting two event-notification mechanisms: epoll on Linux and kqueue on FreeBSD
- [x] Supporting two event-driven mechanisms: epoll on Linux and kqueue on FreeBSD
- [x] Supporting asynchronous write operation
- [x] Flexible ticker event
- [x] SO_REUSEPORT socket option
- [x] Built-in multiple codecs to encode/decode network frames into/from TCP stream: LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec and LengthFieldBasedFrameCodec, referencing [netty codec](https://github.com/netty/netty/tree/4.1/codec/src/main/java/io/netty/handler/codec), also supporting customized codecs
- [x] Supporting Windows platform with ~~event-driven mechanism of IOCP~~ Go stdlib: net
- [ ] Additional load-balancing algorithms: Random, Least-Connections, Consistent-hashing and so on
- [ ] New event-notification mechanism: IOCP on Windows platform
- [ ] TLS support
- [ ] Implementation of `gnet` Client

Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
- [x] 灵活的事件定时器
- [x] SO_REUSEPORT 端口重用
- [x] 内置多种编解码器,支持对 TCP 数据流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,参考自 [netty codec](https://github.com/netty/netty/tree/4.1/codec/src/main/java/io/netty/handler/codec),而且支持自定制编解码器
- [x] 支持 Windows 平台,基于 ~~IOCP 事件驱动机制~~ Go 标准网络库
- [ ] 加入更多的负载均衡算法:随机、最少连接、一致性哈希等等
- [ ] 支持 Windows 平台的 IOCP 事件驱动机制
- [ ] 支持 TLS
- [ ] 实现 `gnet` 客户端

Expand Down
35 changes: 35 additions & 0 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2019 Andy Pan. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

// +build linux darwin netbsd freebsd openbsd dragonfly

package gnet

import (
"golang.org/x/sys/unix"
)

func (svr *server) acceptNewConnection(fd int) error {
nfd, sa, err := unix.Accept(fd)
if err != nil {
if err == unix.EAGAIN {
return nil
}
return err
}
if err := unix.SetNonblock(nfd, true); err != nil {
return err
}
lp := svr.subLoopGroup.next()
c := newConn(nfd, lp, sa)
_ = lp.poller.Trigger(func() (err error) {
if err = lp.poller.AddRead(nfd); err != nil {
return
}
lp.connections[nfd] = c
err = lp.loopOpen(c)
return
})
return nil
}
69 changes: 69 additions & 0 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2019 Andy Pan. All rights reserved.
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

// +build windows

package gnet

import (
"time"

"github.com/panjf2000/gnet/pool"
"github.com/panjf2000/gnet/ringbuffer"
)

func (svr *server) listenerRun() {
var err error
defer svr.signalShutdown(err)
var packet [0xFFFF]byte
inBuf := svr.bytesPool.Get().(*ringbuffer.RingBuffer)
bytesPool := pool.NewBytesPool()
for {
if svr.ln.pconn != nil {
// Read data from UDP socket.
n, addr, e := svr.ln.pconn.ReadFrom(packet[:])
if e != nil {
err = e
return
}
buf := bytesPool.GetLen(n)
copy(buf, packet[:n])

lp := svr.subLoopGroup.next()
c := &stdConn{
loop: lp,
localAddr: svr.ln.lnaddr,
remoteAddr: addr,
inboundBuffer: inBuf,
cache: buf,
}
lp.ch <- &udpIn{c}
} else {
// Accept TCP socket.
conn, e := svr.ln.ln.Accept()
if e != nil {
err = e
return
}
lp := svr.subLoopGroup.next()
c := newConn(conn, lp)
lp.ch <- c
go func() {
var packet [0xFFFF]byte
for {
n, err := c.conn.Read(packet[:])
if err != nil {
_ = c.conn.SetReadDeadline(time.Time{})
lp.ch <- &stderr{c, err}
return
}
buf := bytesPool.GetLen(n)
copy(buf, packet[:n])
lp.ch <- &tcpIn{c, buf}
}
}()
}
}
}
48 changes: 26 additions & 22 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"bytes"
"encoding/binary"
"fmt"

"github.com/smallnest/goframe"
)

// CRLFByte represents a byte of CRLF.
Expand Down Expand Up @@ -133,30 +135,32 @@ func NewLengthFieldBasedFrameCodec(encoderConfig EncoderConfig, decoderConfig De
}

// EncoderConfig config for encoder.
type EncoderConfig struct {
// ByteOrder is the ByteOrder of the length field.
ByteOrder binary.ByteOrder
// LengthFieldLength is the length of the length field.
LengthFieldLength int
// LengthAdjustment is the compensation value to add to the value of the length field
LengthAdjustment int
// LengthIncludesLengthFieldLength is true, the length of the prepended length field is added to the value of the prepended length field
LengthIncludesLengthFieldLength bool
}
type EncoderConfig = goframe.EncoderConfig
//type EncoderConfig struct {
// // ByteOrder is the ByteOrder of the length field.
// ByteOrder binary.ByteOrder
// // LengthFieldLength is the length of the length field.
// LengthFieldLength int
// // LengthAdjustment is the compensation value to add to the value of the length field
// LengthAdjustment int
// // LengthIncludesLengthFieldLength is true, the length of the prepended length field is added to the value of the prepended length field
// LengthIncludesLengthFieldLength bool
//}

// DecoderConfig config for decoder.
type DecoderConfig struct {
// ByteOrder is the ByteOrder of the length field.
ByteOrder binary.ByteOrder
// LengthFieldOffset is the offset of the length field
LengthFieldOffset int
// LengthFieldLength is the length of the length field
LengthFieldLength int
// LengthAdjustment is the compensation value to add to the value of the length field
LengthAdjustment int
// InitialBytesToStrip is the number of first bytes to strip out from the decoded frame
InitialBytesToStrip int
}
type DecoderConfig = goframe.DecoderConfig
//type DecoderConfig struct {
// // ByteOrder is the ByteOrder of the length field.
// ByteOrder binary.ByteOrder
// // LengthFieldOffset is the offset of the length field
// LengthFieldOffset int
// // LengthFieldLength is the length of the length field
// LengthFieldLength int
// // LengthAdjustment is the compensation value to add to the value of the length field
// LengthAdjustment int
// // InitialBytesToStrip is the number of first bytes to strip out from the decoded frame
// InitialBytesToStrip int
//}

// Encode ...
func (cc *LengthFieldBasedFrameCodec) Encode(buf []byte) (out []byte, err error) {
Expand Down
2 changes: 1 addition & 1 deletion connection.go → connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

// +build darwin netbsd freebsd openbsd dragonfly linux
// +build linux darwin netbsd freebsd openbsd dragonfly

package gnet

Expand Down
148 changes: 148 additions & 0 deletions connection_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2019 Andy Pan. All rights reserved.
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

// +build windows

package gnet

import (
"github.com/panjf2000/gnet/pool"
"net"

"github.com/panjf2000/gnet/ringbuffer"
)

type stderr struct {
c *stdConn
err error
}

type wakeReq struct {
c *stdConn
}

type tcpIn struct {
c *stdConn
in []byte
}

type udpIn struct {
c *stdConn
}

type stdConn struct {
ctx interface{} // user-defined context
conn net.Conn // original connection
loop *loop // owner loop
done int32 // 0: attached, 1: closed
cache []byte // reuse memory of inbound data
localAddr net.Addr // local server addr
remoteAddr net.Addr // remote peer addr
inboundBuffer *ringbuffer.RingBuffer // buffer for data from client
}

func newConn(conn net.Conn, lp *loop) *stdConn {
return &stdConn{
conn: conn,
loop: lp,
inboundBuffer: lp.svr.bytesPool.Get().(*ringbuffer.RingBuffer),
}
}

func (c *stdConn) release() {
//c.conn = nil
c.ctx = nil
c.localAddr = nil
c.remoteAddr = nil
c.inboundBuffer.Reset()
c.loop.svr.bytesPool.Put(c.inboundBuffer)
c.inboundBuffer = nil
pool.PutBytes(c.cache)
c.cache = nil
}

// ================================= Public APIs of gnet.Conn =================================

func (c *stdConn) ReadFrame() []byte {
buf, _ := c.loop.svr.codec.Decode(c)
return buf
}

func (c *stdConn) Read() []byte {
if c.inboundBuffer.IsEmpty() {
return c.cache
}
head, _ := c.inboundBuffer.LazyReadAll()
return append(head, c.cache...)
}

func (c *stdConn) ResetBuffer() {
c.cache = c.cache[:0]
c.inboundBuffer.Reset()
}

func (c *stdConn) ReadN(n int) (size int, buf []byte) {
oneOffBufferLen := len(c.cache)
inBufferLen := c.inboundBuffer.Length()
if inBufferLen+oneOffBufferLen < n {
return
}
if c.inboundBuffer.IsEmpty() {
size = n
buf = c.cache[:n]
if n == oneOffBufferLen {
c.cache = c.cache[:0]
} else {
c.cache = c.cache[n:]
}
return
}
size = n
buf, tail := c.inboundBuffer.LazyRead(n)
if tail != nil {
buf = append(buf, tail...)
}
if inBufferLen >= n {
c.inboundBuffer.Shift(n)
return
}
c.inboundBuffer.Reset()

restSize := n - inBufferLen
buf = append(buf, c.cache[:restSize]...)
if restSize == oneOffBufferLen {
c.cache = c.cache[:0]
} else {
c.cache = c.cache[restSize:]
}
return
}

func (c *stdConn) InboundBuffer() *ringbuffer.RingBuffer {
return c.inboundBuffer
}

func (c *stdConn) OutboundBuffer() *ringbuffer.RingBuffer {
return nil
}

func (c *stdConn) BufferLength() int {
return c.inboundBuffer.Length() + len(c.cache)
}

func (c *stdConn) AsyncWrite(buf []byte) {
if encodedBuf, err := c.loop.svr.codec.Encode(buf); err == nil {
c.loop.ch <- func() error {
_, _ = c.conn.Write(encodedBuf)
return nil
}
}
}

func (c *stdConn) Context() interface{} { return c.ctx }
func (c *stdConn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *stdConn) LocalAddr() net.Addr { return c.localAddr }
func (c *stdConn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *stdConn) Wake() { c.loop.ch <- wakeReq{c} }
2 changes: 1 addition & 1 deletion eventloop.go → eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

// +build darwin netbsd freebsd openbsd dragonfly linux
// +build linux darwin netbsd freebsd openbsd dragonfly

package gnet

Expand Down
Loading

0 comments on commit 9be6b06

Please sign in to comment.