易妖游戏网
您的当前位置:首页自己动手实现RPC框架

自己动手实现RPC框架

来源:易妖游戏网

源码:
参考视频:

1.跨进程数据交换

1.1 依赖中间件做数据交互

2.1 直接交互

http和rpc的区别:

相同点:
    都是基于socket通信,都可以实现远程调用
不同点:
    rpc: 自定义tcp协议,报文更小,调用快,处理快。
    http: 通用性更强,实现较为复杂
总结: 如果局域网内服务的调用,最好使用rpc,因为这样更快。如果需要提供对外的环境,如浏览器调用、APP调用、第三方服务调用,则最好使用http

2.RPC架构

服务中心(Registry): 运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。

服务消费者(RPC Client): 运行在客户端,通过远程代理对象调用远程服务。

3.现有RPC框架对比

4.自己动手实现RPC框架

核心主要有5个主要的模块组成.

4.0 共同模块(common)

提供反射工具类
ReflectionUtils.java

/**
 * 反射工具类
 */
public class ReflectionUtils {

    //根据类clazz创建对象
    public static <T> T newInstance(Class<T> clazz) {
        T instance = null;
        try {
            instance = clazz.newInstance();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return instance;
    }

    //获得该类自身声明的所有public方法
    public static Method[] getPublicMethods(Class clazz) {
        List<Method> pMethods = new ArrayList<>();
        //返回该类声明的所有方法
        Method[] methods = clazz.getDeclaredMethods();
        //过滤到public方法
        pMethods = Arrays
                .stream(methods)
                .filter(method ->
                        Modifier.isPublic(method.getModifiers())
                ).collect(Collectors.toList());
        return pMethods.toArray(new Method[0]);
    }

    //调用指定对象的指定方法
    public static Object invoke(Object obj, Method method, Object... args) {
        Object object = null;
        try {
            object = method.invoke(obj, args);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return object;
    }
}
4.1 协议模块(proto)

主要是封装了RPC的请求,响应和服务描述类。

ServiceDescriptor.java

public class ServiceDescriptor {
    private String clazz;
    private String method;
    private String returnType;
    private String[] paramTypes;
 }

Request.java

public class Request {
    private ServiceDescriptor service;
    private Object[] params;
}

Response.java

public class Response {
    private Integer code = 0; // 0 success
    private String msg = "success";
    private Object data;
}
4.2 序列化模块(codec)

使用fastjson,基于json进行序列化和反序列化,将java对象转为json形式进行网络传输,接受到json后再转为java对象。
JsonEncoder.java

/**
 * 基于Json的序列化
 */
public class JsonEncoder implements Encoder {
    @Override
    public byte[] encode(Object obj) {
        return JSON.toJSONBytes(obj);
    }
}

JsonDecoder.java

/**
 * 基于Json的反序列化
 */
public class JsonDecoder implements Decoder {
    @Override
    public <T> T decode(byte[] bytes, Class<T> clazz) {
        return JSON.parseObject(bytes, clazz);
    }
}
4.3 网络传输模块(transport)

客户端: 基于java原生的HttpURLConnection,建立短连接。
HttpTransportClient.java

/**
 * 基于Http(HttpURLConnection)的客户端:短连接
 */
@Slf4j
public class HttpTransportClient implements TransportClient {
    private String url;

    @Override
    public void connect(Peer peer) {
        this.url = "http://" + peer.getHost() + ":" + peer.getPort();
    }

    @Override
    public InputStream write(InputStream data) {
        try {
            log.info("URL:" + this.url);
            HttpURLConnection connection = (HttpURLConnection) new URL(this.url).openConnection();
            //是否能够向connection中输入,如发送post请求,默认是false
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setRequestMethod("POST");
            connection.connect();
            //将data写入connection
            IOUtils.copy(data, connection.getOutputStream());

            int code = connection.getResponseCode();
            if (code == HttpURLConnection.HTTP_OK) {
                return connection.getInputStream();
            } else {
                return connection.getErrorStream();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void close() {
    }
}

服务器端: 基于jetty,构建嵌入式web服务器。
HttpTransportServer.java

/**
 * 基于http(jetty)的服务器端
 */
@Slf4j
public class HttpTransportServer implements TransportServer {
    private Server server;
    private RequestHandler handler;

    @Override
    public void init(int port, RequestHandler handler) {
        this.server = new Server(port);
        this.handler = handler;

        //servlet接收请求
        ServletContextHandler ctx = new ServletContextHandler();
        server.setHandler(ctx);

        //ServletHolder:网络请求抽象
        ServletHolder holder = new ServletHolder(new RequestServlet());
        ctx.addServlet(holder, "/*");
    }

    @Override
    public void start() {
        try {
            server.start();
            //让server一直挂起
            server.join();
        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.getMessage(), e);
        }
    }

    @Override
    public void stop() {
        try {
            server.stop();
        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.getMessage(), e);
        }
    }

    class RequestServlet extends HttpServlet {
        @Override
        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
            log.info("get request success");
            ServletInputStream in = req.getInputStream();
            ServletOutputStream out = resp.getOutputStream();

            if (handler != null) {
                handler.onRequest(in, out);
            }
            out.flush();
        }
    }
}
4.4 服务端模块(server)

ServiceManager:以ConcurentHashMap作为容器,提供服务注册和服务查找功能。

ServiceManager.java

@Slf4j
public class ServiceManager {
    private Map<ServiceDescriptor, ServiceInstance> services;

    public ServiceManager() {
        this.services = new ConcurrentHashMap<>();
    }

    //注册服务
    public <T> void register(Class<T> interfaceClass, T bean) {
        Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass);
        for (Method method : methods) {
            ServiceInstance serviceInstance = new ServiceInstance(bean, method);
            ServiceDescriptor sdp = ServiceDescriptor.from(interfaceClass, method);
            services.put(sdp, serviceInstance);
            log.info("register service: {}:{}", sdp.getClazz(), sdp.getMethod());
        }
    }

    //查找服务
    public ServiceInstance lookup(Request request) {
        ServiceDescriptor sdp = request.getService();
        return services.get(sdp);
    }
}

ServiceInvoker:通过动态代理,调用具体服务
ServiceInvoker.java

/**
 * 调用具体服务
 */
public class ServiceInvoker {
    public Object invoke(ServiceInstance service, Request request) {
        return ReflectionUtils.invoke(service.getTarget(), service.getMethod(), request.getParams());
    }
}
4.5 客户端模块(server)

RemoteInvoker: 使用jdk默认的动态代理,通过代理对象,进程远程方法调用。
RemoteInvoker.java

/**
 * 调用远程服务的代理类
 */
@Slf4j
public class RemoteInvoker implements InvocationHandler {
    private Class clazz;
    private Encoder encoder;
    private Decoder decoder;
    private TransportSelector selector;

    RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder, TransportSelector selector) {
        this.clazz = clazz;
        this.encoder = encoder;
        this.decoder = decoder;
        this.selector = selector;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setService(ServiceDescriptor.from(clazz, method));
        request.setParams(args);

        Response response = invokeRemote(request);
        if (response == null || response.getCode() != 0) {
            throw new IllegalStateException("fail to invoke remote: " + response);
        }
        return response.getData();
    }

    private Response invokeRemote(Request request) {
        TransportClient client = null;
        Response response = null;
        try {
            client = selector.select();
            byte[] bytes = encoder.encode(request);
            InputStream in = client.write(new ByteArrayInputStream(bytes));
            byte[] inBytes = IOUtils.readFully(in, in.available(), true);
            response = decoder.decode(inBytes, Response.class);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            response.setCode(-1);
            response.setMsg("RpcClient got error:" + e.getClass() + ":" + e.getMessage());
        } finally {
            if (client != null) {
                selector.release(client);
            }
        }
        return response;
    }
}
4.6 使用案例模块(example)

eg:使用rpc完成add方法的调用

调用接口和具体实现:

public interface CalcService {
    int add(int a,int b);
}
public class CalcServiceImpl implements CalcService {
    @Override
    public int add(int a, int b) {
        return a+b;
    }
}

客户端:

public class Client {
    public static void main(String[] args) {
        RpcClient client = new RpcClient();
        CalcService service = client.getProxy(CalcService.class);
        int result = service.add(1, 2);
        System.out.println(result);
    }
}

服务器端:

public class Server {
    public static void main(String[] args) {
        RpcServer server = new RpcServer();
        server.register(CalcService.class, new CalcServiceImpl());
        server.start();
    }
}

执行结果:

源码:

参考视频:

因篇幅问题不能全部显示,请点此查看更多更全内容