准备知识:
1 java 网络编程(这里使用的bio)
2 java动态代理
3 反射
=================================
通俗来说rpc就是:
1. 客户端持有的是接口(但是没有持有实现); 2.服务端放的是接口的具体实现以及接口; 3.客户端把方法和方法的参数 以及其他参数 通过socket发送给服务端; 4.然后服务端执行相对应的方法,最后再把执行结果返回给客户端。RPC的架构一般分为三部分:
1.服务提供者,运行在服务器端,提供服务接口定义与服务实现类,在本项目主要是(ITestService 以及 TestServiceImpl)
2.服务中心,运行在服务器端,负责将本地服务注册成远程服务,管理远程服务,提供给服务消费者使用,在本项目主要是(IServerCenter以及BioServiceCenter)
3.服务消费者,运行在客户端,通过远程代理对象调用远程服务,在本项目主要是(RpcProxy 返回代理对象)
核心代码如下: 服务提供者的接口定义:
public interface ITestService { String sayHi(String name); }服务提供者的接口实现:
public class TestServiceImpl implements ITestService { public String sayHi(String name) { return "Hi, " + name; } }服务中心接口抽象:
public interface IServerCenter { //结束服务 public void stop(); /** * 开启服务 * * @throws IOException */ public void start() throws IOException; /** * 服务提供者注册到服务中心上 * * @param servInterfaceName * 服务接口名字 * @param servImplClazz * 具体服务实现class * @throws IOException */ public void register(String servInterfaceName, Class<?> servImplClazz); }服务中心接口实现:
public class BioServiceCenter implements IServerCenter { // 端口 private int port; /* * string 接口名 Class<?> 接口实现类 */ private static HashMap<String, Class<?>> serRegMap = new HashMap<>(); private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public BioServiceCenter(int port) { super(); this.port = port; } public void stop() { // TODO Auto-generated method stub } public void start() throws IOException { ServerSocket service = new ServerSocket(port); // 使用线程池来处理接收到的请求 while (true) { ServerTask task = new ServerTask(service.accept()); executor.execute(task); } } public void register(String servInterfaceName, Class<?> servImplClazz) { serRegMap.put(servInterfaceName, servImplClazz); } private class ServerTask implements Runnable { Socket accept = null; public ServerTask(Socket accept) { this.accept = accept; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { input = new ObjectInputStream(accept.getInputStream()); /* * 获取传递过来的数据 注意:这里的顺序必须和发送的时候一致 */ // 接口名 String serviceName = input.readUTF(); // 请求的方法名字 String methodName = input.readUTF(); // 参数类型列表 Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); // 参数具体的值 Object[] arguments = (Object[]) input.readObject(); Class serverImlClazz = serRegMap.get(serviceName); if (serverImlClazz == null) { throw new Exception("该类没有实现类!"); } Method invokeMethod = serverImlClazz.getMethod(methodName, parameterTypes); Object result=invokeMethod.invoke(serverImlClazz.newInstance(), arguments); output=new ObjectOutputStream(accept.getOutputStream()); output.writeObject(result); } catch (Exception e) { e.printStackTrace(); }finally { if(output!=null) { try { output.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(input!=null) { try { input.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(accept!=null) { try { accept.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } }客户端的远程代理类(主要是生成代理对象,发送请求给服务器 并获取返回结果):
public class RpcProxy { public static Object getRpcProxyObject(final Class<?> serviceInterface, final InetSocketAddress inetAddress) { return Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectInputStream input = null; ObjectOutputStream output = null; Object result=null; try { socket = new Socket(); socket.connect(inetAddress); output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceInterface.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); // 阻塞等待数据回来 input = new ObjectInputStream(socket.getInputStream()); result=input.readObject(); } catch (Exception e) { e.printStackTrace(); } finally { if (output != null) { try { output.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (input != null) { try { input.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } return result; } }); } }测试类:
public class Test { public static void main(String[] args) { final int port = 6666; new Thread(new Runnable() { @Override public void run() { BioServiceCenter bsc = new BioServiceCenter(port); bsc.register(ITestService.class.getName(), TestServiceImpl.class); try { bsc.start(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); ITestService iTestService = (ITestService) RpcProxy.getRpcProxyObject(ITestService.class, new InetSocketAddress("localhost", port)); System.out.println(iTestService.sayHi("nihao ")); } }运行结果如下: 注意:这个例子只是模拟了核心处理流程,还有以下几个点可以优化
1 bio相对来说性能不如nio ,可以考虑使用netty等高性能网络通信框架
2 使用更加高效的序列化方式,比如Google protobu ,kryo等
3 服务注册与发现 可以使用zookeeper实现,更加稳定
