Java实现一个简易版RPC

it2022-05-08  8

准备知识:

1 java 网络编程(这里使用的bio)

2 java动态代理

3 反射

=================================

通俗来说rpc就是:

1. 客户端持有的是接口(但是没有持有实现); 2.服务端放的是接口的具体实现以及接口; 3.客户端把方法和方法的参数 以及其他参数 通过socket发送给服务端; 4.然后服务端执行相对应的方法,最后再把执行结果返回给客户端。

RPC的架构一般分为三部分:

1.服务提供者,运行在服务器端,提供服务接口定义与服务实现类,在本项目主要是(ITestService 以及 TestServiceImpl)

2.服务中心,运行在服务器端,负责将本地服务注册成远程服务,管理远程服务,提供给服务消费者使用,在本项目主要是(IServerCenter以及BioServiceCenter)

3.服务消费者,运行在客户端,通过远程代理对象调用远程服务,在本项目主要是(RpcProxy 返回代理对象)

核心代码如下: 服务提供者的接口定义:

public interface ITestService { String sayHi(String name); }

服务提供者的接口实现:

public class TestServiceImpl implements ITestService { public String sayHi(String name) { return "Hi, " + name; } }

服务中心接口抽象:

public interface IServerCenter { //结束服务 public void stop(); /** * 开启服务 * * @throws IOException */ public void start() throws IOException; /** * 服务提供者注册到服务中心上 * * @param servInterfaceName * 服务接口名字 * @param servImplClazz * 具体服务实现class * @throws IOException */ public void register(String servInterfaceName, Class<?> servImplClazz); }

服务中心接口实现:

public class BioServiceCenter implements IServerCenter { // 端口 private int port; /* * string 接口名 Class<?> 接口实现类 */ private static HashMap<String, Class<?>> serRegMap = new HashMap<>(); private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public BioServiceCenter(int port) { super(); this.port = port; } public void stop() { // TODO Auto-generated method stub } public void start() throws IOException { ServerSocket service = new ServerSocket(port); // 使用线程池来处理接收到的请求 while (true) { ServerTask task = new ServerTask(service.accept()); executor.execute(task); } } public void register(String servInterfaceName, Class<?> servImplClazz) { serRegMap.put(servInterfaceName, servImplClazz); } private class ServerTask implements Runnable { Socket accept = null; public ServerTask(Socket accept) { this.accept = accept; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { input = new ObjectInputStream(accept.getInputStream()); /* * 获取传递过来的数据 注意:这里的顺序必须和发送的时候一致 */ // 接口名 String serviceName = input.readUTF(); // 请求的方法名字 String methodName = input.readUTF(); // 参数类型列表 Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); // 参数具体的值 Object[] arguments = (Object[]) input.readObject(); Class serverImlClazz = serRegMap.get(serviceName); if (serverImlClazz == null) { throw new Exception("该类没有实现类!"); } Method invokeMethod = serverImlClazz.getMethod(methodName, parameterTypes); Object result=invokeMethod.invoke(serverImlClazz.newInstance(), arguments); output=new ObjectOutputStream(accept.getOutputStream()); output.writeObject(result); } catch (Exception e) { e.printStackTrace(); }finally { if(output!=null) { try { output.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(input!=null) { try { input.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(accept!=null) { try { accept.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } }

客户端的远程代理类(主要是生成代理对象,发送请求给服务器 并获取返回结果):

public class RpcProxy { public static Object getRpcProxyObject(final Class<?> serviceInterface, final InetSocketAddress inetAddress) { return Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectInputStream input = null; ObjectOutputStream output = null; Object result=null; try { socket = new Socket(); socket.connect(inetAddress); output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceInterface.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); // 阻塞等待数据回来 input = new ObjectInputStream(socket.getInputStream()); result=input.readObject(); } catch (Exception e) { e.printStackTrace(); } finally { if (output != null) { try { output.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (input != null) { try { input.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } return result; } }); } }

测试类:

public class Test { public static void main(String[] args) { final int port = 6666; new Thread(new Runnable() { @Override public void run() { BioServiceCenter bsc = new BioServiceCenter(port); bsc.register(ITestService.class.getName(), TestServiceImpl.class); try { bsc.start(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); ITestService iTestService = (ITestService) RpcProxy.getRpcProxyObject(ITestService.class, new InetSocketAddress("localhost", port)); System.out.println(iTestService.sayHi("nihao ")); } }

运行结果如下: 注意:这个例子只是模拟了核心处理流程,还有以下几个点可以优化

1 bio相对来说性能不如nio ,可以考虑使用netty等高性能网络通信框架

2 使用更加高效的序列化方式,比如Google protobu ,kryo等

3 服务注册与发现 可以使用zookeeper实现,更加稳定


最新回复(0)