RPC是什么
所谓的RPC其实是为了不同主机的两个进程间通信而产生的,通常不同的主机之间的进程通信,程序编写需要考虑到网络通信的功能,这样程序的编写将会变得复杂。RPC就来解决这一问题的,一台主机上的进程对另外一台主机的进程发起请求时,内核会将请求转交给RPC client,RPC client经过报文的封装转交给目标主机的RPC server,RPC server就将报文进行解析,还原成正常的请求,转交给目标主机上的目标进程。在我们看来在就像是在同一台主机上的两个进程通信一样,完全没有意识到是在不同的主机上。因此RPC其实也可以看做是一种协议或者是编程框架,目的是为了简化分布式程序的编写。
RPC基本流程
大致过程如下:
Rpc Client通过传入的IP、端口号、调用类以及方法的参数,通过动态代理找到具体的调用类的方法,将请求的类、方法序列化,传输到服务端;
当Rpc Service收到请求以后,将传入类和方法反序列化,通过反射找到对应的类的方法进行调用,最后将返回结果进行序列化,返回给客户端;
Rpc Client收到返回值以后,进行反序列化,最后将结果展示;
DEMO
假如我现在有一个HelloService服务,有一个say的方法,如下:
package com.cjian.rpc.provider;/*** @Author: cjian* @Date: 2023/6/21 10:24* @Des:*/
public interface HelloService {String say(String name);
}
实现类如下:
package com.cjian.rpc.provider;/*** @Author: cjian* @Date: 2023/6/21 10:25* @Des:*/
public class HelloServiceImpl implements HelloService {@Overridepublic String say(String name) {return "您好, " + name;}
}
现在另一个服务需要调用say方法,如何使用rpc思想实现呢?
模拟注册中心:
package com.cjian.rpc.registercenter;import java.io.IOException;/*** @Author: cjian* @Date: 2023/6/21 10:25* @Des:*/
public interface Server {void stop();void start() throws IOException;void register(Class serviceInterface, Class impl);boolean isRunning();int getPort();
}
package com.cjian.rpc.registercenter;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @Author: cjian* @Date: 2023/6/21 10:26* @Des:*/
public class ServiceCenter implements Server {private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static final HashMap<String, Class> serviceRegistryCenter = new HashMap<String, Class>();private static boolean isRunning = false;private static int port;public ServiceCenter(int port) {this.port = port;}public void stop() {isRunning = false;executor.shutdown();}public void start() throws IOException {ServerSocket server = new ServerSocket();server.bind(new InetSocketAddress(port));System.out.println("start server");try {while (true) {// 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行executor.execute(new ServiceTask(server.accept()));}} finally {server.close();}}public void register(Class serviceInterface, Class impl) {serviceRegistryCenter.put(serviceInterface.getName(), impl);}public boolean isRunning() {return isRunning;}public int getPort() {return port;}private static class ServiceTask implements Runnable {Socket client = null;public ServiceTask(Socket client) {this.client = client;}public void run() {ObjectInputStream input = null;ObjectOutputStream output = null;try {// 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果input = new ObjectInputStream(client.getInputStream());String serviceName = input.readUTF();String methodName = input.readUTF();Class<?>[] parameterTypes = (Class<?>[]) input.readObject();Object[] arguments = (Object[]) input.readObject();Class serviceClass = serviceRegistryCenter.get(serviceName);if (serviceClass == null) {throw new ClassNotFoundException(serviceName + " not found");}Method method = serviceClass.getMethod(methodName, parameterTypes);Object result = method.invoke(serviceClass.newInstance(), arguments);// 3.将执行结果反序列化,通过socket发送给客户端output = new ObjectOutputStream(client.getOutputStream());output.writeObject(result);} catch (Exception e) {e.printStackTrace();} finally {if (output != null) {try {output.close();} catch (IOException e) {e.printStackTrace();}}if (input != null) {try {input.close();} catch (IOException e) {e.printStackTrace();}}if (client != null) {try {client.close();} catch (IOException e) {e.printStackTrace();}}}}}
}
客户端:
package com.cjian.rpc.client;import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;/*** @Author: cjian* @Date: 2023/6/21 10:32* @Des:*/
public class RPCClient<T> {public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {// 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},new InvocationHandler() {public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Socket socket = null;ObjectOutputStream output = null;ObjectInputStream input = null;try {// 2.创建Socket客户端,根据指定地址连接远程服务提供者socket = new Socket();socket.connect(addr);// 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者output = new ObjectOutputStream(socket.getOutputStream());output.writeUTF(serviceInterface.getName());output.writeUTF(method.getName());output.writeObject(method.getParameterTypes());output.writeObject(args);// 4.同步阻塞等待服务器返回应答,获取应答后返回input = new ObjectInputStream(socket.getInputStream());return input.readObject();} finally {if (socket != null) socket.close();if (output != null) output.close();if (input != null) input.close();}}});}
}
测试:
package com.cjian.rpc;import com.cjian.rpc.client.RPCClient;
import com.cjian.rpc.provider.HelloService;
import com.cjian.rpc.provider.HelloServiceImpl;
import com.cjian.rpc.registercenter.Server;
import com.cjian.rpc.registercenter.ServiceCenter;import java.io.IOException;
import java.net.InetSocketAddress;/*** @Author: cjian* @Date: 2023/6/21 10:33* @Des:*/
public class Test {public static void main(String[] args) {// 模拟注册中心new Thread(() -> {try {Server serviceServer = new ServiceCenter(8088);// 将服务注册到注册中心serviceServer.register(HelloService.class, HelloServiceImpl.class);// 开启注册中心serviceServer.start();} catch (IOException e) {e.printStackTrace();}}).start();// 客户端远程调用HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));System.out.println(service.say("test"));}
}