Skip to content

Commit

Permalink
Update service registry and discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
luxiaoxun committed Aug 13, 2020
1 parent 9c73c84 commit 34f4b1a
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ConnectionManager {
600L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000));

private Map<RpcProtocol, RpcClientHandler> connectedServerNodes = new ConcurrentHashMap<>();
private CopyOnWriteArraySet<RpcProtocol> rpcProtocolSet = new CopyOnWriteArraySet<>();
private ReentrantLock lock = new ReentrantLock();
private Condition connected = lock.newCondition();
private long waitTimeout = 5000;
Expand All @@ -50,53 +51,54 @@ public static ConnectionManager getInstance() {
}

public void updateConnectedServer(List<RpcProtocol> serviceList) {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
if (serviceList != null) {
if (serviceList.size() > 0) {
// Update local serverNodes cache
HashSet<RpcProtocol> serviceSet = new HashSet<>(serviceList.size());
for (int i = 0; i < serviceList.size(); ++i) {
RpcProtocol rpcProtocol = serviceList.get(i);
serviceSet.add(rpcProtocol);
}
// Now using 2 collections to manage the service info and TCP connections because making the connection is async
// Once service info is updated on ZK, will trigger this function
// Actually client should only care about the service it is using
if (serviceList != null && serviceList.size() > 0) {
// Update local serverNodes cache
HashSet<RpcProtocol> serviceSet = new HashSet<>(serviceList.size());
for (int i = 0; i < serviceList.size(); ++i) {
RpcProtocol rpcProtocol = serviceList.get(i);
serviceSet.add(rpcProtocol);
}

// Add new server info
for (final RpcProtocol rpcProtocol : serviceSet) {
if (!connectedServerNodes.keySet().contains(rpcProtocol)) {
connectServerNode(rpcProtocol);
}
}
// Add new server info
for (final RpcProtocol rpcProtocol : serviceSet) {
if (!rpcProtocolSet.contains(rpcProtocol)) {
rpcProtocolSet.add(rpcProtocol);
connectServerNode(rpcProtocol);
}
}

// Close and remove invalid server nodes
for (RpcProtocol rpcProtocol : connectedServerNodes.keySet()) {
if (!serviceSet.contains(rpcProtocol)) {
logger.info("Remove invalid service: " + rpcProtocol.toJson());
RpcClientHandler handler = connectedServerNodes.get(rpcProtocol);
if (handler != null) {
handler.close();
}
connectedServerNodes.remove(rpcProtocol);
}
}
} else {
// No available service
logger.error("No available service!");
for (RpcProtocol rpcProtocol : connectedServerNodes.keySet()) {
RpcClientHandler handler = connectedServerNodes.get(rpcProtocol);
handler.close();
connectedServerNodes.remove(rpcProtocol);
}
// Close and remove invalid server nodes
for (RpcProtocol rpcProtocol : rpcProtocolSet) {
if (!serviceSet.contains(rpcProtocol)) {
logger.info("Remove invalid service: " + rpcProtocol.toJson());
RpcClientHandler handler = connectedServerNodes.get(rpcProtocol);
if (handler != null) {
handler.close();
}
connectedServerNodes.remove(rpcProtocol);
rpcProtocolSet.remove(rpcProtocol);
}
}
});
} else {
// No available service
logger.error("No available service!");
for (RpcProtocol rpcProtocol : rpcProtocolSet) {
RpcClientHandler handler = connectedServerNodes.get(rpcProtocol);
if (handler != null) {
handler.close();
}
connectedServerNodes.remove(rpcProtocol);
rpcProtocolSet.remove(rpcProtocol);
}
}
}

private void connectServerNode(RpcProtocol rpcProtocol) {
logger.info("New service: {}, version:{}, uuid: {}, host: {}, port:{}", rpcProtocol.getServiceName(),
rpcProtocol.getVersion(), rpcProtocol.getUuid(), rpcProtocol.getHost(), rpcProtocol.getPort());
logger.info("New service: {}, version:{}, host: {}, port:{}", rpcProtocol.getServiceName(),
rpcProtocol.getVersion(), rpcProtocol.getHost(), rpcProtocol.getPort());
final InetSocketAddress remotePeer = new InetSocketAddress(rpcProtocol.getHost(), rpcProtocol.getPort());
threadPoolExecutor.submit(new Runnable() {
@Override
Expand All @@ -111,10 +113,13 @@ public void run() {
@Override
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
logger.info("Successfully connect to remote server. remote peer = " + remotePeer);
logger.info("Successfully connect to remote server, remote peer = " + remotePeer);
RpcClientHandler handler = channelFuture.channel().pipeline().get(RpcClientHandler.class);
connectedServerNodes.put(rpcProtocol, handler);
handler.setRpcProtocol(rpcProtocol);
signalAvailableHandler();
} else {
logger.error("Can not connect to remote server, remote peer = " + remotePeer);
}
}
});
Expand Down Expand Up @@ -152,15 +157,28 @@ public RpcClientHandler chooseHandler(String serviceKey) throws Exception {
}
}
RpcProtocol rpcProtocol = loadBalance.route(serviceKey, connectedServerNodes);
return connectedServerNodes.get(rpcProtocol);
RpcClientHandler handler = connectedServerNodes.get(rpcProtocol);
if (handler != null) {
return handler;
} else {
throw new Exception("Can not get available connection");
}
}

public void removeHandler(RpcProtocol rpcProtocol) {
rpcProtocolSet.remove(rpcProtocol);
connectedServerNodes.remove(rpcProtocol);
}

public void stop() {
isRunning = false;
for (RpcProtocol rpcProtocol : connectedServerNodes.keySet()) {
for (RpcProtocol rpcProtocol : rpcProtocolSet) {
RpcClientHandler handler = connectedServerNodes.get(rpcProtocol);
handler.close();
if (handler != null) {
handler.close();
}
connectedServerNodes.remove(rpcProtocol);
rpcProtocolSet.remove(rpcProtocol);
}
signalAvailableHandler();
threadPoolExecutor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,23 @@ public ServiceDiscovery(String registryAddress) {

private void discoveryService() {
try {
// Get init service info
logger.info("Get init service info");
// Get initial service info
logger.info("Get initial service info");
getServiceAndUpdateServer();
// Add watch listener
curatorClient.watchPathChildrenNode(Constant.ZK_REGISTRY_PATH, new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
switch (type) {
case CONNECTION_RECONNECTED:
logger.info("Reconnected to zk, try to get latest service list");
getServiceAndUpdateServer();
break;
case CHILD_ADDED:
case CHILD_UPDATED:
case CHILD_REMOVED:
logger.info("Service info updated, try to get latest service list");
logger.info("Service info changed, try to get latest service list");
getServiceAndUpdateServer();
break;
}
Expand All @@ -63,8 +67,7 @@ private void getServiceAndUpdateServer() {
RpcProtocol rpcProtocol = RpcProtocol.fromJson(json);
dataList.add(rpcProtocol);
}
logger.debug("Node data: {}", dataList);
logger.debug("Service discovery triggered updating connected server node.");
logger.debug("Service node data: {}", dataList);
//Update the service info based on the latest data
UpdateConnectedServer(dataList);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.netty.rpc.client.handler;

import com.netty.rpc.client.connect.ConnectionManager;
import com.netty.rpc.codec.Beat;
import com.netty.rpc.codec.RpcRequest;
import com.netty.rpc.codec.RpcResponse;
import com.netty.rpc.protocol.RpcProtocol;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
Expand All @@ -21,6 +23,7 @@ public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
private ConcurrentHashMap<String, RpcFuture> pendingRPC = new ConcurrentHashMap<>();
private volatile Channel channel;
private SocketAddress remotePeer;
private RpcProtocol rpcProtocol;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Expand Down Expand Up @@ -82,4 +85,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
super.userEventTriggered(ctx, evt);
}
}

public void setRpcProtocol(RpcProtocol rpcProtocol) {
this.rpcProtocol = rpcProtocol;
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ConnectionManager.getInstance().removeHandler(rpcProtocol);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

public class RpcProtocol implements Serializable {
private static final long serialVersionUID = -1102180003395190700L;
private String uuid;
// service host
private String host;
// service port
private int port;
// interface name
private String serviceName;
Expand All @@ -30,30 +31,21 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
RpcProtocol that = (RpcProtocol) o;
return port == that.port &&
uuid.equals(that.uuid) &&
host.equals(that.host) &&
serviceName.equals(that.serviceName) &&
version.equals(this.version);
version.equals(that.version);
}

@Override
public int hashCode() {
return Objects.hash(uuid, host, port, serviceName, version);
return Objects.hash(host, port, serviceName, version);
}

@Override
public String toString() {
return toJson();
}

public String getUuid() {
return uuid;
}

public void setUuid(String uuid) {
this.uuid = uuid;
}

public String getHost() {
return host;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
Expand All @@ -19,7 +20,8 @@ public class CuratorClient {
public CuratorClient(String connectString, String namespace, int sessionTimeout, int connectionTimeout) {
client = CuratorFrameworkFactory.builder().namespace(namespace).connectString(connectString)
.sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
.retryPolicy(new ExponentialBackoffRetry(2000, 10)).build();
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
client.start();
}

Expand All @@ -35,6 +37,10 @@ public CuratorFramework getClient() {
return client;
}

public void addConnectionStateListener(ConnectionStateListener connectionStateListener) {
client.getConnectionStateListenable().addListener(connectionStateListener);
}

public void createPathData(String path, byte[] data) throws Exception {
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public void run() {
future.channel().closeFuture().sync();
} catch (Exception e) {
if (e instanceof InterruptedException) {
logger.info("Rpc server remoting server stop.");
logger.info("Rpc server remoting server stop");
} else {
logger.error("Rpc server remoting server error.", e);
logger.error("Rpc server remoting server error", e);
}
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public RpcServerHandler(Map<String, Object> handlerMap, final ThreadPoolExecutor
public void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) {
// filter beat ping
if (Beat.BEAT_ID.equalsIgnoreCase(request.getRequestId())) {
logger.info("Server read beat-ping.");
logger.info("Server read heartbeat ping");
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.netty.rpc.server.registry;

import cn.hutool.core.util.IdUtil;
import com.netty.rpc.config.Constant;
import com.netty.rpc.protocol.RpcProtocol;
import com.netty.rpc.util.ServiceUtil;
import com.netty.rpc.zookeeper.CuratorClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,37 +30,42 @@ public ServiceRegistry(String registryAddress) {
}

public void registerService(String host, int port, Map<String, Object> serviceMap) {
//register service info, format uuid:ip:port
if (serviceMap.size() > 0) {
for (String key : serviceMap.keySet()) {
try {
RpcProtocol rpcProtocol = new RpcProtocol();
//Add an uuid when register the service so we can distinguish the same ip:port service
String uuid = IdUtil.objectId();
rpcProtocol.setUuid(uuid);
rpcProtocol.setHost(host);
rpcProtocol.setPort(port);
String[] serviceInfo = key.split(ServiceUtil.SERVICE_CONCAT_TOKEN);
if (serviceInfo.length > 0) {
rpcProtocol.setServiceName(serviceInfo[0]);
if (serviceInfo.length == 2) {
rpcProtocol.setVersion(serviceInfo[1]);
} else {
rpcProtocol.setVersion("");
}
String serviceData = rpcProtocol.toJson();
byte[] bytes = serviceData.getBytes();
String path = Constant.ZK_DATA_PATH + "-" + uuid;
this.curatorClient.createPathData(path, bytes);
pathList.add(path);
logger.info("Registry new service:{}, host:{}, port:{}", key, host, port);
// Register service info
for (String key : serviceMap.keySet()) {
try {
RpcProtocol rpcProtocol = new RpcProtocol();
rpcProtocol.setHost(host);
rpcProtocol.setPort(port);
String[] serviceInfo = key.split(ServiceUtil.SERVICE_CONCAT_TOKEN);
if (serviceInfo.length > 0) {
rpcProtocol.setServiceName(serviceInfo[0]);
if (serviceInfo.length == 2) {
rpcProtocol.setVersion(serviceInfo[1]);
} else {
logger.warn("Can not get service name and version");
rpcProtocol.setVersion("");
}
} catch (Exception e) {
logger.error("Register service {} fail, exception:{}", key, e.getMessage());
String serviceData = rpcProtocol.toJson();
byte[] bytes = serviceData.getBytes();
String path = Constant.ZK_DATA_PATH + "-" + rpcProtocol.hashCode();
this.curatorClient.createPathData(path, bytes);
pathList.add(path);
logger.info("Register new service: {}, host: {}, port: {}", key, host, port);
} else {
logger.warn("Can not get service name and version: {}" + key);
}
} catch (Exception e) {
logger.error("Register service {} fail, exception: {}", key, e.getMessage());
}

curatorClient.addConnectionStateListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if (connectionState == ConnectionState.RECONNECTED) {
logger.info("Connection state: {}, register service after reconnected", connectionState);
registerService(host, port, serviceMap);
}
}
});
}
}

Expand Down
Loading

0 comments on commit 34f4b1a

Please sign in to comment.