diff --git a/src/main/java/com/nettyrpc/client/ConnectManage.java b/src/main/java/com/nettyrpc/client/ConnectManage.java index dee3903..003b42c 100644 --- a/src/main/java/com/nettyrpc/client/ConnectManage.java +++ b/src/main/java/com/nettyrpc/client/ConnectManage.java @@ -24,11 +24,10 @@ * Created by luxiaoxun on 2016-03-16. */ public class ConnectManage { - - private static final Logger LOGGER = LoggerFactory.getLogger(ConnectManage.class); + private static final Logger logger = LoggerFactory.getLogger(ConnectManage.class); private volatile static ConnectManage connectManage; - EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4); + private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4); private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(16, 16, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue(65536)); private CopyOnWriteArrayList connectedHandlers = new CopyOnWriteArrayList<>(); @@ -37,7 +36,7 @@ public class ConnectManage { private ReentrantLock lock = new ReentrantLock(); private Condition connected = lock.newCondition(); - protected long connectTimeoutMillis = 6000; + private long connectTimeoutMillis = 6000; private AtomicInteger roundRobin = new AtomicInteger(0); private volatile boolean isRuning = true; @@ -82,16 +81,18 @@ public void updateConnectedServer(List allServerAddress) { RpcClientHandler connectedServerHandler = connectedHandlers.get(i); SocketAddress remotePeer = connectedServerHandler.getRemotePeer(); if (!newAllServerNodeSet.contains(remotePeer)) { - LOGGER.info("Remove invalid server node " + remotePeer); + logger.info("Remove invalid server node " + remotePeer); RpcClientHandler handler = connectedServerNodes.get(remotePeer); - handler.close(); + if (handler != null) { + handler.close(); + } connectedServerNodes.remove(remotePeer); connectedHandlers.remove(connectedServerHandler); } } } else { // No available server node ( All server nodes are down ) - LOGGER.error("No available server node. All server nodes are down !!!"); + logger.error("No available server node. All server nodes are down !!!"); for (final RpcClientHandler connectedServerHandler : connectedHandlers) { SocketAddress remotePeer = connectedServerHandler.getRemotePeer(); RpcClientHandler handler = connectedServerNodes.get(remotePeer); @@ -103,12 +104,12 @@ public void updateConnectedServer(List allServerAddress) { } } - public void reconnect(final RpcClientHandler handler, final SocketAddress remotePeer){ - if(handler!=null){ + public void reconnect(final RpcClientHandler handler, final SocketAddress remotePeer) { + if (handler != null) { connectedHandlers.remove(handler); connectedServerNodes.remove(handler.getRemotePeer()); } - connectServerNode((InetSocketAddress)remotePeer); + connectServerNode((InetSocketAddress) remotePeer); } private void connectServerNode(final InetSocketAddress remotePeer) { @@ -125,7 +126,7 @@ public void run() { @Override public void operationComplete(final ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { - LOGGER.debug("Successfully connect to remote server. remote peer = " + remotePeer); + logger.debug("Successfully connect to remote server. remote peer = " + remotePeer); RpcClientHandler handler = channelFuture.channel().pipeline().get(RpcClientHandler.class); addHandler(handler); } @@ -171,7 +172,7 @@ public RpcClientHandler chooseHandler() { size = handlers.size(); } } catch (InterruptedException e) { - LOGGER.error("Waiting for available node is interrupted! ", e); + logger.error("Waiting for available node is interrupted! ", e); throw new RuntimeException("Can't connect any servers!", e); } } @@ -179,7 +180,7 @@ public RpcClientHandler chooseHandler() { return handlers.get(index); } - public void stop(){ + public void stop() { isRuning = false; for (int i = 0; i < connectedHandlers.size(); ++i) { RpcClientHandler connectedServerHandler = connectedHandlers.get(i); diff --git a/src/main/java/com/nettyrpc/client/RPCFuture.java b/src/main/java/com/nettyrpc/client/RPCFuture.java index 181f913..285696b 100644 --- a/src/main/java/com/nettyrpc/client/RPCFuture.java +++ b/src/main/java/com/nettyrpc/client/RPCFuture.java @@ -19,7 +19,7 @@ * Created by luxiaoxun on 2016-03-15. */ public class RPCFuture implements Future { - private static final Logger LOGGER = LoggerFactory.getLogger(RPCFuture.class); + private static final Logger logger = LoggerFactory.getLogger(RPCFuture.class); private Sync sync; private RpcRequest request; @@ -85,7 +85,7 @@ public void done(RpcResponse reponse) { // Threshold long responseTime = System.currentTimeMillis() - startTime; if (responseTime > this.responseTimeThreshold) { - LOGGER.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms"); + logger.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms"); } } diff --git a/src/main/java/com/nettyrpc/client/RpcClientHandler.java b/src/main/java/com/nettyrpc/client/RpcClientHandler.java index 49e4f93..431a68f 100644 --- a/src/main/java/com/nettyrpc/client/RpcClientHandler.java +++ b/src/main/java/com/nettyrpc/client/RpcClientHandler.java @@ -15,7 +15,7 @@ * Created by luxiaoxun on 2016-03-14. */ public class RpcClientHandler extends SimpleChannelInboundHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(RpcClientHandler.class); + private static final Logger logger = LoggerFactory.getLogger(RpcClientHandler.class); private ConcurrentHashMap pendingRPC = new ConcurrentHashMap<>(); @@ -54,7 +54,7 @@ public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - LOGGER.error("client caught exception", cause); + logger.error("client caught exception", cause); ctx.close(); } @@ -75,7 +75,7 @@ public void operationComplete(ChannelFuture future) { try { latch.await(); } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); + logger.error(e.getMessage()); } return rpcFuture; diff --git a/src/main/java/com/nettyrpc/protocol/RpcRequest.java b/src/main/java/com/nettyrpc/protocol/RpcRequest.java index 5281fda..8491874 100644 --- a/src/main/java/com/nettyrpc/protocol/RpcRequest.java +++ b/src/main/java/com/nettyrpc/protocol/RpcRequest.java @@ -5,7 +5,6 @@ * @author huangyong */ public class RpcRequest { - private String requestId; private String className; private String methodName; diff --git a/src/main/java/com/nettyrpc/protocol/RpcResponse.java b/src/main/java/com/nettyrpc/protocol/RpcResponse.java index 4789921..e8fdd6b 100644 --- a/src/main/java/com/nettyrpc/protocol/RpcResponse.java +++ b/src/main/java/com/nettyrpc/protocol/RpcResponse.java @@ -5,7 +5,6 @@ * @author huangyong */ public class RpcResponse { - private String requestId; private String error; private Object result; diff --git a/src/main/java/com/nettyrpc/registry/ServiceDiscovery.java b/src/main/java/com/nettyrpc/registry/ServiceDiscovery.java index 0ed1cae..f841226 100644 --- a/src/main/java/com/nettyrpc/registry/ServiceDiscovery.java +++ b/src/main/java/com/nettyrpc/registry/ServiceDiscovery.java @@ -22,7 +22,7 @@ */ public class ServiceDiscovery { - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); + private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class); private CountDownLatch latch = new CountDownLatch(1); @@ -45,10 +45,10 @@ public String discover() { if (size > 0) { if (size == 1) { data = dataList.get(0); - LOGGER.debug("using only data: {}", data); + logger.debug("using only data: {}", data); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); - LOGGER.debug("using random data: {}", data); + logger.debug("using random data: {}", data); } } return data; @@ -67,7 +67,7 @@ public void process(WatchedEvent event) { }); latch.await(); } catch (IOException | InterruptedException e) { - LOGGER.error("", e); + logger.error("", e); } return zk; } @@ -87,13 +87,13 @@ public void process(WatchedEvent event) { byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); dataList.add(new String(bytes)); } - LOGGER.debug("node data: {}", dataList); + logger.debug("node data: {}", dataList); this.dataList = dataList; - LOGGER.debug("Service discovery triggered updating connected server node."); + logger.debug("Service discovery triggered updating connected server node."); UpdateConnectedServer(); } catch (KeeperException | InterruptedException e) { - LOGGER.error("", e); + logger.error("", e); } } @@ -106,7 +106,7 @@ public void stop(){ try { zookeeper.close(); } catch (InterruptedException e) { - LOGGER.error("", e); + logger.error("", e); } } } diff --git a/src/main/java/com/nettyrpc/registry/ServiceRegistry.java b/src/main/java/com/nettyrpc/registry/ServiceRegistry.java index 8d186a4..0560246 100644 --- a/src/main/java/com/nettyrpc/registry/ServiceRegistry.java +++ b/src/main/java/com/nettyrpc/registry/ServiceRegistry.java @@ -11,7 +11,6 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.rmi.runtime.Log; /** * 服务注册 @@ -21,7 +20,7 @@ */ public class ServiceRegistry { - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class); + private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class); private CountDownLatch latch = new CountDownLatch(1); @@ -54,10 +53,10 @@ public void process(WatchedEvent event) { }); latch.await(); } catch (IOException e) { - LOGGER.error("", e); + logger.error("", e); } catch (InterruptedException ex){ - LOGGER.error("", ex); + logger.error("", ex); } return zk; } @@ -69,9 +68,9 @@ private void AddRootNode(ZooKeeper zk){ zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { - LOGGER.error(e.toString()); + logger.error(e.toString()); } catch (InterruptedException e) { - LOGGER.error(e.toString()); + logger.error(e.toString()); } } @@ -79,12 +78,12 @@ private void createNode(ZooKeeper zk, String data) { try { byte[] bytes = data.getBytes(); String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - LOGGER.debug("create zookeeper node ({} => {})", path, data); + logger.debug("create zookeeper node ({} => {})", path, data); } catch (KeeperException e) { - LOGGER.error("", e); + logger.error("", e); } catch (InterruptedException ex){ - LOGGER.error("", ex); + logger.error("", ex); } } } \ No newline at end of file diff --git a/src/main/java/com/nettyrpc/server/RpcHandler.java b/src/main/java/com/nettyrpc/server/RpcHandler.java index a909858..68b881f 100644 --- a/src/main/java/com/nettyrpc/server/RpcHandler.java +++ b/src/main/java/com/nettyrpc/server/RpcHandler.java @@ -18,7 +18,7 @@ */ public class RpcHandler extends SimpleChannelInboundHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler.class); + private static final Logger logger = LoggerFactory.getLogger(RpcHandler.class); private final Map handlerMap; @@ -31,7 +31,7 @@ public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest reques RpcServer.submit(new Runnable() { @Override public void run() { - LOGGER.debug("Receive request " + request.getRequestId()); + logger.debug("Receive request " + request.getRequestId()); RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { @@ -39,12 +39,12 @@ public void run() { response.setResult(result); } catch (Throwable t) { response.setError(t.toString()); - LOGGER.error("RPC Server handle request error",t); + logger.error("RPC Server handle request error",t); } ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { - LOGGER.debug("Send response for request " + request.getRequestId()); + logger.debug("Send response for request " + request.getRequestId()); } }); } @@ -60,13 +60,13 @@ private Object handle(RpcRequest request) throws Throwable { Class[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); - LOGGER.debug(serviceClass.getName()); - LOGGER.debug(methodName); + logger.debug(serviceClass.getName()); + logger.debug(methodName); for (int i = 0; i < parameterTypes.length; ++i) { - LOGGER.debug(parameterTypes[i].getName()); + logger.debug(parameterTypes[i].getName()); } for (int i = 0; i < parameters.length; ++i) { - LOGGER.debug(parameters[i].toString()); + logger.debug(parameters[i].toString()); } // JDK reflect @@ -82,7 +82,7 @@ private Object handle(RpcRequest request) throws Throwable { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOGGER.error("server caught exception", cause); + logger.error("server caught exception", cause); ctx.close(); } } diff --git a/src/main/java/com/nettyrpc/server/RpcServer.java b/src/main/java/com/nettyrpc/server/RpcServer.java index 7061c8e..01aa9b2 100644 --- a/src/main/java/com/nettyrpc/server/RpcServer.java +++ b/src/main/java/com/nettyrpc/server/RpcServer.java @@ -32,7 +32,7 @@ */ public class RpcServer implements ApplicationContextAware, InitializingBean { - private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class); + private static final Logger logger = LoggerFactory.getLogger(RpcServer.class); private String serverAddress; private ServiceRegistry serviceRegistry; @@ -86,7 +86,7 @@ public void initChannel(SocketChannel channel) throws Exception { int port = Integer.parseInt(array[1]); ChannelFuture future = bootstrap.bind(host, port).sync(); - LOGGER.debug("Server started on port {}", port); + logger.debug("Server started on port {}", port); if (serviceRegistry != null) { serviceRegistry.register(serverAddress);