Skip to content

Commit

Permalink
Update RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
luxiaoxun committed Dec 9, 2017
1 parent efd4e17 commit d4ff0f5
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 48 deletions.
27 changes: 14 additions & 13 deletions src/main/java/com/nettyrpc/client/ConnectManage.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
* Created by luxiaoxun on 2016-03-16.
*/
public class ConnectManage {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectManage.class);
private static final Logger logger = LoggerFactory.getLogger(ConnectManage.class);
private volatile static ConnectManage connectManage;

EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(16, 16, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));

private CopyOnWriteArrayList<RpcClientHandler> connectedHandlers = new CopyOnWriteArrayList<>();
Expand All @@ -37,7 +36,7 @@ public class ConnectManage {

private ReentrantLock lock = new ReentrantLock();
private Condition connected = lock.newCondition();
protected long connectTimeoutMillis = 6000;
private long connectTimeoutMillis = 6000;
private AtomicInteger roundRobin = new AtomicInteger(0);
private volatile boolean isRuning = true;

Expand Down Expand Up @@ -82,16 +81,18 @@ public void updateConnectedServer(List<String> allServerAddress) {
RpcClientHandler connectedServerHandler = connectedHandlers.get(i);
SocketAddress remotePeer = connectedServerHandler.getRemotePeer();
if (!newAllServerNodeSet.contains(remotePeer)) {
LOGGER.info("Remove invalid server node " + remotePeer);
logger.info("Remove invalid server node " + remotePeer);
RpcClientHandler handler = connectedServerNodes.get(remotePeer);
handler.close();
if (handler != null) {
handler.close();
}
connectedServerNodes.remove(remotePeer);
connectedHandlers.remove(connectedServerHandler);
}
}

} else { // No available server node ( All server nodes are down )
LOGGER.error("No available server node. All server nodes are down !!!");
logger.error("No available server node. All server nodes are down !!!");
for (final RpcClientHandler connectedServerHandler : connectedHandlers) {
SocketAddress remotePeer = connectedServerHandler.getRemotePeer();
RpcClientHandler handler = connectedServerNodes.get(remotePeer);
Expand All @@ -103,12 +104,12 @@ public void updateConnectedServer(List<String> allServerAddress) {
}
}

public void reconnect(final RpcClientHandler handler, final SocketAddress remotePeer){
if(handler!=null){
public void reconnect(final RpcClientHandler handler, final SocketAddress remotePeer) {
if (handler != null) {
connectedHandlers.remove(handler);
connectedServerNodes.remove(handler.getRemotePeer());
}
connectServerNode((InetSocketAddress)remotePeer);
connectServerNode((InetSocketAddress) remotePeer);
}

private void connectServerNode(final InetSocketAddress remotePeer) {
Expand All @@ -125,7 +126,7 @@ public void run() {
@Override
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
LOGGER.debug("Successfully connect to remote server. remote peer = " + remotePeer);
logger.debug("Successfully connect to remote server. remote peer = " + remotePeer);
RpcClientHandler handler = channelFuture.channel().pipeline().get(RpcClientHandler.class);
addHandler(handler);
}
Expand Down Expand Up @@ -171,15 +172,15 @@ public RpcClientHandler chooseHandler() {
size = handlers.size();
}
} catch (InterruptedException e) {
LOGGER.error("Waiting for available node is interrupted! ", e);
logger.error("Waiting for available node is interrupted! ", e);
throw new RuntimeException("Can't connect any servers!", e);
}
}
int index = (roundRobin.getAndAdd(1) + size) % size;
return handlers.get(index);
}

public void stop(){
public void stop() {
isRuning = false;
for (int i = 0; i < connectedHandlers.size(); ++i) {
RpcClientHandler connectedServerHandler = connectedHandlers.get(i);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/nettyrpc/client/RPCFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* Created by luxiaoxun on 2016-03-15.
*/
public class RPCFuture implements Future<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(RPCFuture.class);
private static final Logger logger = LoggerFactory.getLogger(RPCFuture.class);

private Sync sync;
private RpcRequest request;
Expand Down Expand Up @@ -85,7 +85,7 @@ public void done(RpcResponse reponse) {
// Threshold
long responseTime = System.currentTimeMillis() - startTime;
if (responseTime > this.responseTimeThreshold) {
LOGGER.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms");
logger.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms");
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/nettyrpc/client/RpcClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Created by luxiaoxun on 2016-03-14.
*/
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClientHandler.class);
private static final Logger logger = LoggerFactory.getLogger(RpcClientHandler.class);

private ConcurrentHashMap<String, RPCFuture> pendingRPC = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -54,7 +54,7 @@ public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("client caught exception", cause);
logger.error("client caught exception", cause);
ctx.close();
}

Expand All @@ -75,7 +75,7 @@ public void operationComplete(ChannelFuture future) {
try {
latch.await();
} catch (InterruptedException e) {
LOGGER.error(e.getMessage());
logger.error(e.getMessage());
}

return rpcFuture;
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/nettyrpc/protocol/RpcRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* @author huangyong
*/
public class RpcRequest {

private String requestId;
private String className;
private String methodName;
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/nettyrpc/protocol/RpcResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* @author huangyong
*/
public class RpcResponse {

private String requestId;
private String error;
private Object result;
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/com/nettyrpc/registry/ServiceDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
public class ServiceDiscovery {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);

private CountDownLatch latch = new CountDownLatch(1);

Expand All @@ -45,10 +45,10 @@ public String discover() {
if (size > 0) {
if (size == 1) {
data = dataList.get(0);
LOGGER.debug("using only data: {}", data);
logger.debug("using only data: {}", data);
} else {
data = dataList.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random data: {}", data);
logger.debug("using random data: {}", data);
}
}
return data;
Expand All @@ -67,7 +67,7 @@ public void process(WatchedEvent event) {
});
latch.await();
} catch (IOException | InterruptedException e) {
LOGGER.error("", e);
logger.error("", e);
}
return zk;
}
Expand All @@ -87,13 +87,13 @@ public void process(WatchedEvent event) {
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
LOGGER.debug("node data: {}", dataList);
logger.debug("node data: {}", dataList);
this.dataList = dataList;

LOGGER.debug("Service discovery triggered updating connected server node.");
logger.debug("Service discovery triggered updating connected server node.");
UpdateConnectedServer();
} catch (KeeperException | InterruptedException e) {
LOGGER.error("", e);
logger.error("", e);
}
}

Expand All @@ -106,7 +106,7 @@ public void stop(){
try {
zookeeper.close();
} catch (InterruptedException e) {
LOGGER.error("", e);
logger.error("", e);
}
}
}
Expand Down
17 changes: 8 additions & 9 deletions src/main/java/com/nettyrpc/registry/ServiceRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.rmi.runtime.Log;

/**
* 服务注册
Expand All @@ -21,7 +20,7 @@
*/
public class ServiceRegistry {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);

private CountDownLatch latch = new CountDownLatch(1);

Expand Down Expand Up @@ -54,10 +53,10 @@ public void process(WatchedEvent event) {
});
latch.await();
} catch (IOException e) {
LOGGER.error("", e);
logger.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
logger.error("", ex);
}
return zk;
}
Expand All @@ -69,22 +68,22 @@ private void AddRootNode(ZooKeeper zk){
zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
LOGGER.error(e.toString());
logger.error(e.toString());
} catch (InterruptedException e) {
LOGGER.error(e.toString());
logger.error(e.toString());
}
}

private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
logger.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException e) {
LOGGER.error("", e);
logger.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
logger.error("", ex);
}
}
}
18 changes: 9 additions & 9 deletions src/main/java/com/nettyrpc/server/RpcHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {

private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler.class);
private static final Logger logger = LoggerFactory.getLogger(RpcHandler.class);

private final Map<String, Object> handlerMap;

Expand All @@ -31,20 +31,20 @@ public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest reques
RpcServer.submit(new Runnable() {
@Override
public void run() {
LOGGER.debug("Receive request " + request.getRequestId());
logger.debug("Receive request " + request.getRequestId());
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t.toString());
LOGGER.error("RPC Server handle request error",t);
logger.error("RPC Server handle request error",t);
}
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
LOGGER.debug("Send response for request " + request.getRequestId());
logger.debug("Send response for request " + request.getRequestId());
}
});
}
Expand All @@ -60,13 +60,13 @@ private Object handle(RpcRequest request) throws Throwable {
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();

LOGGER.debug(serviceClass.getName());
LOGGER.debug(methodName);
logger.debug(serviceClass.getName());
logger.debug(methodName);
for (int i = 0; i < parameterTypes.length; ++i) {
LOGGER.debug(parameterTypes[i].getName());
logger.debug(parameterTypes[i].getName());
}
for (int i = 0; i < parameters.length; ++i) {
LOGGER.debug(parameters[i].toString());
logger.debug(parameters[i].toString());
}

// JDK reflect
Expand All @@ -82,7 +82,7 @@ private Object handle(RpcRequest request) throws Throwable {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("server caught exception", cause);
logger.error("server caught exception", cause);
ctx.close();
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/nettyrpc/server/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public class RpcServer implements ApplicationContextAware, InitializingBean {

private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

private String serverAddress;
private ServiceRegistry serviceRegistry;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void initChannel(SocketChannel channel) throws Exception {
int port = Integer.parseInt(array[1]);

ChannelFuture future = bootstrap.bind(host, port).sync();
LOGGER.debug("Server started on port {}", port);
logger.debug("Server started on port {}", port);

if (serviceRegistry != null) {
serviceRegistry.register(serverAddress);
Expand Down

0 comments on commit d4ff0f5

Please sign in to comment.