Skip to content

Commit

Permalink
feat: add multicast UDP listener support (panjf2000#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
leki75 committed Nov 7, 2022
1 parent 7fbdc53 commit ad9986e
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 0 deletions.
167 changes: 167 additions & 0 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math/rand"
"net"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -411,6 +413,171 @@ func startClient(t *testing.T, network, addr string, multicore, async bool) {
}
}

// NOTE: TestServeMulticast can fail with "write: no buffer space available" on wifi interface.
func TestServeMulticast(t *testing.T) {
t.Run("IPv4", func(t *testing.T) {
// 224.0.0.169 is an unassigned address from the Local Network Control Block
// https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml#multicast-addresses-1
t.Run("udp-multicast", func(t *testing.T) {
testMulticast(t, "224.0.0.169:9991", false, false, -1, 10)
})
t.Run("udp-multicast-reuseport", func(t *testing.T) {
testMulticast(t, "224.0.0.169:9991", true, false, -1, 10)
})
t.Run("udp-multicast-reuseaddr", func(t *testing.T) {
testMulticast(t, "224.0.0.169:9991", false, true, -1, 10)
})
})
t.Run("IPv6", func(t *testing.T) {
iface, err := findLoopbackInterface()
require.NoError(t, err)
if iface.Flags&net.FlagMulticast != net.FlagMulticast {
t.Skip("multicast is not supported on loopback interface")
}
// ff02::3 is an unassigned address from Link-Local Scope Multicast Addresses
// https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml#link-local
t.Run("udp-multicast", func(t *testing.T) {
testMulticast(t, fmt.Sprintf("[ff02::3%%%s]:9991", iface.Name), false, false, iface.Index, 10)
})
t.Run("udp-multicast-reuseport", func(t *testing.T) {
testMulticast(t, fmt.Sprintf("[ff02::3%%%s]:9991", iface.Name), true, false, iface.Index, 10)
})
t.Run("udp-multicast-reuseaddr", func(t *testing.T) {
testMulticast(t, fmt.Sprintf("[ff02::3%%%s]:9991", iface.Name), false, true, iface.Index, 10)
})
})
}

func findLoopbackInterface() (*net.Interface, error) {
ifaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range ifaces {
if iface.Flags&net.FlagLoopback == net.FlagLoopback {
return &iface, nil
}
}
return nil, errors.New("no loopback interface")
}

func testMulticast(t *testing.T, addr string, reuseport, reuseaddr bool, index, nclients int) {
ts := &testMcastServer{
t: t,
addr: addr,
nclients: nclients,
}
options := []Option{
WithReuseAddr(reuseaddr),
WithReusePort(reuseport),
WithSocketRecvBuffer(2 * nclients * 1024), // enough space to receive messages from nclients to eliminate dropped packets
WithTicker(true),
}
if index != -1 {
options = append(options, WithMulticastInterfaceIndex(index))
}
err := Run(ts, "udp://"+addr, options...)
assert.NoError(t, err)
}

type testMcastServer struct {
*BuiltinEventEngine
t *testing.T
mcast sync.Map
addr string
nclients int
started int32
active int32
}

func (s *testMcastServer) startMcastClient() {
rand.Seed(time.Now().UnixNano())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c, err := net.Dial("udp", s.addr)
require.NoError(s.t, err)
defer c.Close()
ch := make(chan []byte, 10000)
s.mcast.Store(c.LocalAddr().String(), ch)
duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2
s.t.Logf("test duration: %dms", duration/time.Millisecond)
start := time.Now()
for time.Since(start) < duration {
reqData := make([]byte, 1024)
_, err = rand.Read(reqData)
require.NoError(s.t, err)
_, err = c.Write(reqData)
require.NoError(s.t, err)
// Workaround for MacOS "write: no buffer space available" error messages
// https://developer.apple.com/forums/thread/42334
time.Sleep(time.Millisecond)
select {
case respData := <-ch:
require.Equalf(s.t, reqData, respData, "response mismatch, length of bytes: %d vs %d", len(reqData), len(respData))
case <-ctx.Done():
require.Fail(s.t, "timeout receiving message")
return
}
}
}

func (s *testMcastServer) OnTraffic(c Conn) (action Action) {
buf, _ := c.Next(-1)
b := make([]byte, len(buf))
copy(b, buf)
ch, ok := s.mcast.Load(c.RemoteAddr().String())
require.True(s.t, ok)
ch.(chan []byte) <- b
return
}

func (s *testMcastServer) OnTick() (delay time.Duration, action Action) {
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
atomic.AddInt32(&s.active, 1)
go func() {
s.startMcastClient()
atomic.AddInt32(&s.active, -1)
}()
}
}
if atomic.LoadInt32(&s.active) == 0 {
action = Shutdown
return
}
delay = time.Second / 5
return
}

type testMulticastBindServer struct {
*BuiltinEventEngine
}

func (t *testMulticastBindServer) OnTick() (delay time.Duration, action Action) {
action = Shutdown
return
}

func TestMulticastBindIPv4(t *testing.T) {
ts := &testMulticastBindServer{}
iface, err := findLoopbackInterface()
require.NoError(t, err)
err = Run(ts, "udp://224.0.0.169:9991",
WithMulticastInterfaceIndex(iface.Index),
WithTicker(true))
assert.NoError(t, err)
}

func TestMulticastBindIPv6(t *testing.T) {
ts := &testMulticastBindServer{}
iface, err := findLoopbackInterface()
require.NoError(t, err)
err = Run(ts, fmt.Sprintf("udp://[ff02::3%%%s]:9991", iface.Name),
WithMulticastInterfaceIndex(iface.Index),
WithTicker(true))
assert.NoError(t, err)
}

func TestDefaultGnetServer(t *testing.T) {
svr := BuiltinEventEngine{}
svr.OnBoot(Engine{})
Expand Down
97 changes: 97 additions & 0 deletions internal/socket/sockopts_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package socket

import (
"net"
"os"
"syscall"

"github.com/panjf2000/gnet/v2/pkg/errors"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -83,3 +85,98 @@ func SetLinger(fd, sec int) error {
}
return unix.SetsockoptLinger(fd, syscall.SOL_SOCKET, syscall.SO_LINGER, &l)
}

// SetMulticastMembership returns with a socket option function based on the IP
// version. Returns nil when multicast membership cannot be applied.
func SetMulticastMembership(proto string, udpAddr *net.UDPAddr) func(int, int) error {
udpVersion, err := determineUDPProto(proto, udpAddr)
if err != nil {
return nil
}

switch udpVersion {
case "udp4":
return func(fd int, ifIndex int) error {
return SetIPv4MulticastMembership(fd, udpAddr.IP, ifIndex)
}
case "udp6":
return func(fd int, ifIndex int) error {
return SetIPv6MulticastMembership(fd, udpAddr.IP, ifIndex)
}
default:
return nil
}
}

// SetIPv4MulticastMemership joins fd to the specified multicast IPv4 address.
// ifIndex is the index of the interface where the multicast datagrams will be
// received. If ifIndex is 0 then the operating system will choose the default,
// it is usually needed when the host has multiple network interfaces configured.
func SetIPv4MulticastMembership(fd int, mcast net.IP, ifIndex int) error {
// Multicast interfaces are selected by IP address on IPv4 (and by index on IPv6)
ip, err := interfaceFirstIPv4Addr(ifIndex)
if err != nil {
return err
}

mreq := &unix.IPMreq{}
copy(mreq.Multiaddr[:], mcast.To4())
copy(mreq.Interface[:], ip.To4())

if ifIndex > 0 {
if err := os.NewSyscallError("setsockopt", unix.SetsockoptInet4Addr(fd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, mreq.Interface)); err != nil {
return err
}
}

if err := os.NewSyscallError("setsockopt", unix.SetsockoptByte(fd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, 0)); err != nil {
return err
}
return os.NewSyscallError("setsockopt", unix.SetsockoptIPMreq(fd, syscall.IPPROTO_IP, syscall.IP_ADD_MEMBERSHIP, mreq))
}

// SetIPv6MulticastMemership joins fd to the specified multicast IPv6 address.
// ifIndex is the index of the interface where the multicast datagrams will be
// received. If ifIndex is 0 then the operating system will choose the default,
// it is usually needed when the host has multiple network interfaces configured.
func SetIPv6MulticastMembership(fd int, mcast net.IP, ifIndex int) error {
mreq := &unix.IPv6Mreq{}
mreq.Interface = uint32(ifIndex)
copy(mreq.Multiaddr[:], mcast.To16())

if ifIndex > 0 {
if err := os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_IF, ifIndex)); err != nil {
return err
}
}

if err := os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_LOOP, 0)); err != nil {
return err
}
return os.NewSyscallError("setsockopt", unix.SetsockoptIPv6Mreq(fd, syscall.IPPROTO_IPV6, syscall.IPV6_JOIN_GROUP, mreq))
}

// interfaceFirstIPv4Addr returns the first IPv4 address of the interface.
func interfaceFirstIPv4Addr(ifIndex int) (net.IP, error) {
if ifIndex == 0 {
return net.IP([]byte{0, 0, 0, 0}), nil
}
iface, err := net.InterfaceByIndex(ifIndex)
if err != nil {
return nil, err
}
addrs, err := iface.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
ip, _, err := net.ParseCIDR(addr.String())
if err != nil {
return nil, err
}
if ip.To4() != nil {
return ip, nil
}
}
return nil, errors.ErrNoIPv4AddressOnInterface
}
9 changes: 9 additions & 0 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ func initListener(network, addr string, options *Options) (l *listener, err erro
sockOpt := socket.Option{SetSockOpt: socket.SetSendBuffer, Opt: options.SocketSendBuffer}
sockOpts = append(sockOpts, sockOpt)
}
if strings.HasPrefix(network, "udp") {
udpAddr, err := net.ResolveUDPAddr(network, addr)
if err == nil && udpAddr.IP.IsMulticast() {
if sockoptFn := socket.SetMulticastMembership(network, udpAddr); sockoptFn != nil {
sockOpt := socket.Option{SetSockOpt: sockoptFn, Opt: options.MulticastInterfaceIndex}
sockOpts = append(sockOpts, sockOpt)
}
}
}
l = &listener{network: network, address: addr, sockOpts: sockOpts}
err = l.normalize()
return
Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type Options struct {
// ReusePort indicates whether to set up the SO_REUSEPORT socket option.
ReusePort bool

// MulticastInterfaceIndex is the index of the interface name where the multicast UDP addresses will be bound to.
MulticastInterfaceIndex int

// ============================= Options for both server-side and client-side =============================

// ReadBufferCap is the maximum number of bytes that can be read from the peer when the readable event comes.
Expand Down Expand Up @@ -239,3 +242,10 @@ func WithLogger(logger logging.Logger) Option {
opts.Logger = logger
}
}

// WithMulticastInterfaceIndex sets the interface name where UDP multicast sockets will be bound to.
func WithMulticastInterfaceIndex(idx int) Option {
return func(opts *Options) {
opts.MulticastInterfaceIndex = idx
}
}
2 changes: 2 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ var (
ErrUnsupportedOp = errors.New("unsupported operation")
// ErrNegativeSize occurs when trying to pass a negative size to a buffer.
ErrNegativeSize = errors.New("negative size is invalid")
// ErrNoIPv4AddressOnInterface occurs when an IPv4 multicast address is set on an interface but IPv4 is not configured.
ErrNoIPv4AddressOnInterface = errors.New("no IPv4 address on interface")
)

0 comments on commit ad9986e

Please sign in to comment.