Skip to content

Commit

Permalink
Merge pull request fatedier#968 from fatedier/health
Browse files Browse the repository at this point in the history
support health check and code refactor
  • Loading branch information
fatedier committed Nov 9, 2018
2 parents 450e0b7 + 951d33d commit 89d1a1f
Show file tree
Hide file tree
Showing 33 changed files with 1,914 additions and 996 deletions.
20 changes: 7 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,18 @@ frpc:
test: gotest

gotest:
go test -v ./assets/...
go test -v ./client/...
go test -v ./cmd/...
go test -v ./models/...
go test -v ./server/...
go test -v ./utils/...
go test -v --cover ./assets/...
go test -v --cover ./client/...
go test -v --cover ./cmd/...
go test -v --cover ./models/...
go test -v --cover ./server/...
go test -v --cover ./utils/...

ci:
cd ./tests && ./run_test.sh && cd -
go test -v ./tests/...
cd ./tests && ./clean_test.sh && cd -

cic:
cd ./tests && ./clean_test.sh && cd -
go test -count=1 -v ./tests/...

alltest: gotest ci

clean:
rm -f ./bin/frpc
rm -f ./bin/frps
cd ./tests && ./clean_test.sh && cd -
4 changes: 2 additions & 2 deletions client/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func (svr *Service) apiReload(w http.ResponseWriter, r *http.Request) {
return
}

pxyCfgs, visitorCfgs, err := config.LoadProxyConfFromIni(g.GlbClientCfg.User, conf, newCommonCfg.Start)
pxyCfgs, visitorCfgs, err := config.LoadAllConfFromIni(g.GlbClientCfg.User, conf, newCommonCfg.Start)
if err != nil {
res.Code = 3
res.Msg = err.Error()
log.Error("reload frpc proxy config error: %v", err)
return
}

err = svr.ctl.reloadConf(pxyCfgs, visitorCfgs)
err = svr.ctl.ReloadConf(pxyCfgs, visitorCfgs)
if err != nil {
res.Code = 4
res.Msg = err.Error()
Expand Down
233 changes: 45 additions & 188 deletions client/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package client
import (
"fmt"
"io"
"io/ioutil"
"runtime"
"runtime/debug"
"sync"
"time"
Expand All @@ -28,27 +26,22 @@ import (
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version"

"github.com/fatedier/golib/control/shutdown"
"github.com/fatedier/golib/crypto"
fmux "github.com/hashicorp/yamux"
)

const (
connReadTimeout time.Duration = 10 * time.Second
)

type Control struct {
// frpc service
svr *Service

// login message to server, only used
loginMsg *msg.Login
// uniq id got from frps, attach it in loginMsg
runId string

// manage all proxies
pm *ProxyManager

// manage all visitors
vm *VisitorManager

// control connection
conn frpNet.Conn

Expand All @@ -61,14 +54,10 @@ type Control struct {
// read from this channel to get the next message sent by server
readCh chan (msg.Message)

// run id got from server
runId string

// if we call close() in control, do not reconnect to server
exit bool

// goroutines can block by reading from this channel, it will be closed only in reader() when control connection is closed
closedCh chan int
closedCh chan struct{}

closedDoneCh chan struct{}

// last time got the Pong message
lastPong time.Time
Expand All @@ -82,54 +71,36 @@ type Control struct {
log.Logger
}

func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) *Control {
loginMsg := &msg.Login{
Arch: runtime.GOARCH,
Os: runtime.GOOS,
PoolCount: g.GlbClientCfg.PoolCount,
User: g.GlbClientCfg.User,
Version: version.Full(),
}
func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) *Control {
ctl := &Control{
svr: svr,
loginMsg: loginMsg,
runId: runId,
conn: conn,
session: session,
sendCh: make(chan msg.Message, 100),
readCh: make(chan msg.Message, 100),
closedCh: make(chan int),
closedCh: make(chan struct{}),
closedDoneCh: make(chan struct{}),
readerShutdown: shutdown.New(),
writerShutdown: shutdown.New(),
msgHandlerShutdown: shutdown.New(),
Logger: log.NewPrefixLogger(""),
}
ctl.pm = NewProxyManager(ctl, ctl.sendCh, "")
ctl.pm.Reload(pxyCfgs, visitorCfgs, false)
ctl.pm = NewProxyManager(ctl.sendCh, "")
ctl.pm.Reload(pxyCfgs, false)
ctl.vm = NewVisitorManager(ctl)
ctl.vm.Reload(visitorCfgs)
return ctl
}

func (ctl *Control) Run() (err error) {
for {
err = ctl.login()
if err != nil {
ctl.Warn("login to server failed: %v", err)

// if login_fail_exit is true, just exit this program
// otherwise sleep a while and continues relogin to server
if g.GlbClientCfg.LoginFailExit {
return
} else {
time.Sleep(10 * time.Second)
}
} else {
break
}
}

func (ctl *Control) Run() {
go ctl.worker()

// start all local visitors and send NewProxy message for all configured proxies
ctl.pm.Reset(ctl.sendCh, ctl.runId)
ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew})
return nil

go ctl.vm.Run()
return
}

func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) {
Expand Down Expand Up @@ -171,82 +142,16 @@ func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) {
}

func (ctl *Control) Close() error {
ctl.mu.Lock()
defer ctl.mu.Unlock()
ctl.exit = true
ctl.pm.CloseProxies()
return nil
}

// login send a login message to server and wait for a loginResp message.
func (ctl *Control) login() (err error) {
if ctl.conn != nil {
ctl.conn.Close()
}
if ctl.session != nil {
ctl.session.Close()
}

conn, err := frpNet.ConnectServerByProxy(g.GlbClientCfg.HttpProxy, g.GlbClientCfg.Protocol,
fmt.Sprintf("%s:%d", g.GlbClientCfg.ServerAddr, g.GlbClientCfg.ServerPort))
if err != nil {
return err
}

defer func() {
if err != nil {
conn.Close()
}
}()

if g.GlbClientCfg.TcpMux {
fmuxCfg := fmux.DefaultConfig()
fmuxCfg.LogOutput = ioutil.Discard
session, errRet := fmux.Client(conn, fmuxCfg)
if errRet != nil {
return errRet
}
stream, errRet := session.OpenStream()
if errRet != nil {
session.Close()
return errRet
}
conn = frpNet.WrapConn(stream)
ctl.session = session
}

now := time.Now().Unix()
ctl.loginMsg.PrivilegeKey = util.GetAuthKey(g.GlbClientCfg.Token, now)
ctl.loginMsg.Timestamp = now
ctl.loginMsg.RunId = ctl.runId

if err = msg.WriteMsg(conn, ctl.loginMsg); err != nil {
return err
}

var loginRespMsg msg.LoginResp
conn.SetReadDeadline(time.Now().Add(connReadTimeout))
if err = msg.ReadMsgInto(conn, &loginRespMsg); err != nil {
return err
}
conn.SetReadDeadline(time.Time{})

if loginRespMsg.Error != "" {
err = fmt.Errorf("%s", loginRespMsg.Error)
ctl.Error("%s", loginRespMsg.Error)
return err
}

ctl.conn = conn
// update runId got from server
ctl.runId = loginRespMsg.RunId
g.GlbClientCfg.ServerUdpPort = loginRespMsg.ServerUdpPort
ctl.ClearLogPrefix()
ctl.AddLogPrefix(loginRespMsg.RunId)
ctl.Info("login to server success, get run id [%s], server udp port [%d]", loginRespMsg.RunId, loginRespMsg.ServerUdpPort)
return nil
// ClosedDoneCh returns a channel which will be closed after all resources are released
func (ctl *Control) ClosedDoneCh() <-chan struct{} {
return ctl.closedDoneCh
}

// connectServer return a new connection to frps
func (ctl *Control) connectServer() (conn frpNet.Conn, err error) {
if g.GlbClientCfg.TcpMux {
stream, errRet := ctl.session.OpenStream()
Expand Down Expand Up @@ -364,87 +269,39 @@ func (ctl *Control) msgHandler() {
}
}

// controler keep watching closedCh, start a new connection if previous control connection is closed.
// If controler is notified by closedCh, reader and writer and handler will exit, then recall these functions.
// If controler is notified by closedCh, reader and writer and handler will exit
func (ctl *Control) worker() {
go ctl.msgHandler()
go ctl.reader()
go ctl.writer()

var err error
maxDelayTime := 20 * time.Second
delayTime := time.Second

checkInterval := 60 * time.Second
checkProxyTicker := time.NewTicker(checkInterval)

for {
select {
case <-checkProxyTicker.C:
// check which proxy registered failed and reregister it to server
ctl.pm.CheckAndStartProxy([]string{ProxyStatusStartErr, ProxyStatusClosed})
case _, ok := <-ctl.closedCh:
// we won't get any variable from this channel
if !ok {
// close related channels and wait until other goroutines done
close(ctl.readCh)
ctl.readerShutdown.WaitDone()
ctl.msgHandlerShutdown.WaitDone()

close(ctl.sendCh)
ctl.writerShutdown.WaitDone()

ctl.pm.CloseProxies()
// if ctl.exit is true, just exit
ctl.mu.RLock()
exit := ctl.exit
ctl.mu.RUnlock()
if exit {
return
}

// loop util reconnecting to server success
for {
ctl.Info("try to reconnect to server...")
err = ctl.login()
if err != nil {
ctl.Warn("reconnect to server error: %v", err)
time.Sleep(delayTime)
delayTime = delayTime * 2
if delayTime > maxDelayTime {
delayTime = maxDelayTime
}
continue
}
// reconnect success, init delayTime
delayTime = time.Second
break
}

// init related channels and variables
ctl.sendCh = make(chan msg.Message, 100)
ctl.readCh = make(chan msg.Message, 100)
ctl.closedCh = make(chan int)
ctl.readerShutdown = shutdown.New()
ctl.writerShutdown = shutdown.New()
ctl.msgHandlerShutdown = shutdown.New()
ctl.pm.Reset(ctl.sendCh, ctl.runId)

// previous work goroutines should be closed and start them here
go ctl.msgHandler()
go ctl.writer()
go ctl.reader()

// start all configured proxies
ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew, ProxyStatusClosed})

checkProxyTicker.Stop()
checkProxyTicker = time.NewTicker(checkInterval)
}
case <-ctl.closedCh:
// close related channels and wait until other goroutines done
close(ctl.readCh)
ctl.readerShutdown.WaitDone()
ctl.msgHandlerShutdown.WaitDone()

close(ctl.sendCh)
ctl.writerShutdown.WaitDone()

ctl.pm.CloseProxies()

close(ctl.closedDoneCh)
return
}
}
}

func (ctl *Control) reloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error {
err := ctl.pm.Reload(pxyCfgs, visitorCfgs, true)
func (ctl *Control) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) error {
ctl.vm.Reload(visitorCfgs)
err := ctl.pm.Reload(pxyCfgs, true)
return err
}
Loading

0 comments on commit 89d1a1f

Please sign in to comment.