Skip to content

Commit

Permalink
🌈🌈🌈Start to enhance this repo.🌈🌈🌈
Browse files Browse the repository at this point in the history
  • Loading branch information
anduo committed Jan 27, 2018
1 parent eca864a commit 41769cd
Show file tree
Hide file tree
Showing 25 changed files with 1,073 additions and 0 deletions.
71 changes: 71 additions & 0 deletions rpc-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rpc</artifactId>
<groupId>me.anduo</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-client</artifactId>

<dependencies>
<dependency>
<groupId>me.anduo</groupId>
<artifactId>rpc-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>

<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<!-- Protostuff -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
</dependency>

<!-- ZooKeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>

<!-- Apache Commons Collections -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>

<!-- Objenesis -->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
</dependency>

<!-- CGLib -->
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
</dependency>
</dependencies>

</project>
78 changes: 78 additions & 0 deletions rpc-client/src/main/java/me/anduo/rpc/client/RpcClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package me.anduo.rpc.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import me.anduo.rpc.common.RpcDecoder;
import me.anduo.rpc.common.RpcEncoder;


public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {

private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);

private String host;
private int port;

private RpcResponse response;

private final Object obj = new Object();

public RpcClient(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
this.response = response;
synchronized (obj) {
obj.notifyAll(); // 收到响应,唤醒线程
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("client caught exception", cause);
ctx.close();
}

public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new RpcEncoder(RpcRequest.class)) // 将 RPC 请求进行编码(为了发送请求)
.addLast(new RpcDecoder(RpcResponse.class)) // 将 RPC 响应进行解码(为了处理响应)
.addLast(RpcClient.this); // 使用 RpcClient 发送 RPC 请求
}
}).option(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();

synchronized (obj) {
obj.wait(); // 未收到响应,使线程等待
}

if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}
}
51 changes: 51 additions & 0 deletions rpc-client/src/main/java/me/anduo/rpc/client/RpcProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package me.anduo.rpc.client;

import java.lang.reflect.Method;
import java.util.UUID;

import net.sf.cglib.proxy.InvocationHandler;
import net.sf.cglib.proxy.Proxy;

public class RpcProxy {
private String serverAddress;
private ServiceDiscovery serviceDiscovery;

public RpcProxy(String serverAddress) {
this.serverAddress = serverAddress;
}

public RpcProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}

@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest(); // 创建并初始化 RPC 请求
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);

if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover(); // 发现服务
}
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);

RpcClient client = new RpcClient(host, port); // 初始化 RPC 客户端
RpcResponse response = client.send(request); // 通过 RPC客户端发送RPC请求并获取RPC响应
if (response.isError()) {
throw response.getError();
} else {
return response.getResult();
}
}
});
}
}
54 changes: 54 additions & 0 deletions rpc-client/src/main/java/me/anduo/rpc/client/RpcRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package me.anduo.rpc.client;

/**
* 系统内部RPC请求封装,用来将本地普通的java函数调用包装为远程的服务接口调用。
* @author duoan
*/
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;

public String getRequestId() {
return requestId;
}

public void setRequestId(String requestId) {
this.requestId = requestId;
}

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Class<?>[] getParameterTypes() {
return parameterTypes;
}

public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}

public Object[] getParameters() {
return parameters;
}

public void setParameters(Object[] parameters) {
this.parameters = parameters;
}

}
36 changes: 36 additions & 0 deletions rpc-client/src/main/java/me/anduo/rpc/client/RpcResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package me.anduo.rpc.client;

public class RpcResponse {
private String requestId;
private Throwable error;
private Object result;

public String getRequestId() {
return requestId;
}

public void setRequestId(String requestId) {
this.requestId = requestId;
}

public boolean isError(){
return error == null;
}

public Throwable getError() {
return error;
}

public void setError(Throwable error) {
this.error = error;
}

public Object getResult() {
return result;
}

public void setResult(Object result) {
this.result = result;
}

}
91 changes: 91 additions & 0 deletions rpc-client/src/main/java/me/anduo/rpc/client/ServiceDiscovery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package me.anduo.rpc.client;

import io.netty.util.internal.ThreadLocalRandom;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import me.anduo.rpc.common.Constant;


public class ServiceDiscovery {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

private CountDownLatch latch = new CountDownLatch(1);

private volatile List<String> dataList = new ArrayList<String>();

private String registryAddress;

public ServiceDiscovery(String registryAddress) {
this.registryAddress = registryAddress;

ZooKeeper zk = connectServer();
if (zk != null) {
watchNode(zk);
}
}

public String discover() {
String data = null;
int size = dataList.size();
if (size > 0) {
if (size == 1) {
data = dataList.get(0);
LOGGER.debug("using only data: {}", data);
} else {
data = dataList.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random data: {}", data);
}
}
return data;
}

private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
LOGGER.error("", e);
}
return zk;
}

private void watchNode(final ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
LOGGER.debug("node data: {}", dataList);
this.dataList = dataList;
} catch (KeeperException | InterruptedException e) {
LOGGER.error("", e);
}
}
}
2 changes: 2 additions & 0 deletions rpc-client/src/main/resources/client-config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# ZooKeeper \u670d\u52a1\u5668
registry.address=127.0.0.1:2181
Loading

0 comments on commit 41769cd

Please sign in to comment.