手写RPC框架 您所在的位置:网站首页 手写rpc框架遇到的问题 手写RPC框架

手写RPC框架

2024-07-17 01:36| 来源: 网络整理| 查看: 265

RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧) RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)

服务注册 服务注册a.添加服务节点和主机节点b.抽象注册中心c.本地服务列表

服务注册 a.添加服务节点和主机节点

主要完成服务注册和发现的功能,其具体流程如下:

1.服务提供方将服务注册到注册中心中

2.消费端拉取服务列表。

3.消费端简单的选取一个可以服务(后续会进行改造,实现负载均衡)

在这里插入图片描述

1.修改framework/common的Constants类:定义注册中心的路径常量

public class Constant { // 略........ // 服务提供方的在注册中心的基础路径 public static final String BASE_PROVIDERS_PATH = "/dcyrpc-metadata/providers"; // 服务调用方的在注册中心的基础路径 public static final String BASE_CONSUMERS_PATH = "/dcyrpc-metadata/consumers"; }

2.在core中引入common的依赖项

3.修改framework/core的DcyRpcBootstrap类:定义一些相关的基础配置

定义相关变量:应用名称, Config, 默认端口定义Zookeeper实例完善方法代码:application() / registry() / protocol() / publish()publish()发布服务:将接口与匹配的实现注册到服务中心 // 略...... private static final DcyRpcBootstrap dcyRpcBootstrap = new DcyRpcBootstrap(); // 定义一些相关的基础配置 private String applicationName = "default"; private RegistryConfig registryConfig; private ProtocolConfig protocolConfig; private int port = 8088; // 维护一个Zookeeper实例 private ZooKeeper zooKeeper; // 略...... /** * 定义当前应用的名字 * @param applicationName 应用名称 * @return */ public DcyRpcBootstrap application(String applicationName) { this.applicationName = applicationName; return this; } /** * 配置一个注册中心 * @param registryConfig 注册中心 * @return this */ public DcyRpcBootstrap registry(RegistryConfig registryConfig) { // 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合 zooKeeper = ZookeeperUtils.createZookeeper(); this.registryConfig = registryConfig; return this; } /** * 配置当前暴露的服务使用的协议 * @param protocolConfig 协议的封装 * @return this */ public DcyRpcBootstrap protocol(ProtocolConfig protocolConfig) { this.protocolConfig = protocolConfig; if (log.isDebugEnabled()) { log.debug("当前工程使用了:{}协议进行序列化", protocolConfig.toString()); } return this; } /** * --------------------------------服务提供方的相关api-------------------------------- */ /** * 发布服务:将接口与匹配的实现注册到服务中心 * @param service 封装需要发布的服务 * @return */ public DcyRpcBootstrap publish(ServiceConfig service) { // 服务名称的节点 String parentNode = Constant.BASE_PROVIDERS_PATH + "/" + service.getInterface().getName(); // 判断节点是否存在,不存在则创建节点(持久) if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) { ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null); ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT); } // 创建本机的临时节点,ip:port // 服务提供方的端口(一般自己设定),还需要获取ip的方法 // /dcyrpc-metadata/providers/com.dcyrpc.DcyRpc/192.168.195.1:8088 String node = parentNode + "/" + NetUtils.getIp() + ":" + port; if (!ZookeeperUtils.existNode(zooKeeper, node, null)) { ZookeeperNode zookeeperNode = new ZookeeperNode(node, null); ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL); } if (log.isDebugEnabled()) { log.debug("服务{},已经被注册", service.getInterface().getName()); } return this; } // 略...... /** * 启动netty服务 */ public void start() { try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 略...... }

4.修改framework/common的utils.zookeeper.ZookeeperUtils类:添加方法:判断节点是否存在

/** * 判断节点是否存在 * @param zooKeeper * @param node * @param watcher * @return true:存在 false:不存在 */ public static boolean existNode(ZooKeeper zooKeeper, String node, Watcher watcher) { try { return zooKeeper.exists(node, watcher) != null; } catch (KeeperException | InterruptedException e) { log.error("判断节点:{} 是否存在时发生异常:", node, e); throw new ZookeeperException(e); } }

5.修改framework/common的exceptions.ZookeeperException类:完善内容

public class ZookeeperException extends RuntimeException{ public ZookeeperException() { super(); } public ZookeeperException(Throwable cause) { super(cause); } }

6.在framework/common的utils包下,创建NetUtils类:Network工具类

/** * Network utils */ @Slf4j public class NetUtils { public static String getIp() { try { // 获取所有的网卡信息 Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); while (interfaces.hasMoreElements()) { NetworkInterface iface = interfaces.nextElement(); // 过滤非回环接口和虚拟接口 if (iface.isLoopback() || iface.isVirtual() || !iface.isUp()) { continue; } Enumeration addresses = iface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress addr = addresses.nextElement(); // 过滤IPv6地址和回环地址 if (addr instanceof Inet6Address || addr.isLoopbackAddress()) { continue; } String ipAddress = addr.getHostAddress(); System.out.println("局域网IP地址:" + ipAddress); return ipAddress; } } throw new NetworkException(); } catch (SocketException e) { log.error("获取局域网IP时发送异常", e); throw new NetworkException(e); } } }

7.在framework/common的exceptions包下创建NetworkException类:编写自定义异常

public class NetworkException extends RuntimeException{ public NetworkException() { super(); } public NetworkException(String message) { super(message); } public NetworkException(Throwable cause) { super(cause); } } b.抽象注册中心

在当前项目中我们的确使用的是zookeeper作为我们项目的注册中心。但是,我们希望在我们的项目是可以扩展使用其他类型的注册中心的,如nacos,redis,甚至是自己独立开发注册中心。为后来的扩展提供可能性,所以在整个工程中我们再也不能单独的面向具体的对象编程,而是面向抽象,我们将抽象出**【注册中心】**整个抽象的概念

1.在core下com.dcyrpc下创建discovery包,创建Registry接口:抽象注册中心接口:注册服务,发现服务,下线服务

/** * 抽象注册中心:注册服务,发现服务,下线服务 */ public interface Registry { /** * 注册服务 * @param serviceConfig 服务的配置内容 */ public void register(ServiceConfig serviceConfig); }

2.在core下com.dcyrpc.discovery下创建AbstractRegistry抽象类:提炼共享内容,还可以做模板方法 (待开发)

/** * 提炼共享内容,还可以做模板方法 * 所有注册中心都有的公共方法 */ public abstract class AbstractRegistry implements Registry{ }

3.在core下com.dcyrpc.discovery下创建impl包,创建ZookeeperRegistry类继承AbstractRegistry

把DcyRpcBootstrap类里的publish()方法,提炼到该类中 @Slf4j public class ZookeeperRegistry extends AbstractRegistry { private ZooKeeper zooKeeper = ZookeeperUtils.createZookeeper(); @Override public void register(ServiceConfig service) { // 服务名称的节点 String parentNode = Constant.BASE_PROVIDERS_PATH + "/" + service.getInterface().getName(); // 判断节点是否存在,不存在则创建节点(持久) if (!ZookeeperUtils.existNode(zooKeeper, parentNode, null)) { ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode, null); ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT); } // 创建本机的临时节点,ip:port // 服务提供方的端口(一般自己设定),还需要获取ip的方法 // /dcyrpc-metadata/providers/com.dcyrpc.DcyRpc/192.168.195.1:8088 // TODO:后续处理端口问题 String node = parentNode + "/" + NetUtils.getIp() + ":" + 8088; if (!ZookeeperUtils.existNode(zooKeeper, node, null)) { ZookeeperNode zookeeperNode = new ZookeeperNode(node, null); ZookeeperUtils.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL); } if (log.isDebugEnabled()) { log.debug("服务{},已经被注册", service.getInterface().getName()); } } }

4.修改DcyRpcBootstrap

// 略..... private static final DcyRpcBootstrap dcyRpcBootstrap = new DcyRpcBootstrap(); // 定义一些相关的基础配置 private String applicationName = "default"; private RegistryConfig registryConfig; private ProtocolConfig protocolConfig; private int port = 8088; // 注册中心 private Registry zookeeperRegistry; // 略..... /** * 配置一个注册中心 * @param registryConfig 注册中心 * @return this */ public DcyRpcBootstrap registry(RegistryConfig registryConfig) { // 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合 // 使用 registryConfig 获取一个注册中心 this.zookeeperRegistry = registryConfig.getRegistry(); return this; } // 略..... /** * 发布服务:将接口与匹配的实现注册到服务中心 * @param service 封装需要发布的服务 * @return */ public DcyRpcBootstrap publish(ServiceConfig service) { // 抽象了注册中心的概念,使用注册中心的一个实现完成注册 zookeeperRegistry.register(service); return this; } /** * 批量发布服务 * @param services 封装需要发布的服务集合 * @return this */ public DcyRpcBootstrap publish(List services) { for (ServiceConfig service : services) { this.publish(service); } return this; } // 略..... }

5.修改RegistryConfig类,将类放入discovery包下

public class RegistryConfig { // 定义连接的 url private final String connectString; public RegistryConfig(String connectString) { this.connectString = connectString; } /** * 可以使用简单工厂来完成 * @return 具体的注册中心实例 */ public Registry getRegistry() { // 1.获取注册中心 // 1.获取类型 // 2.获取主机地址 String registryType = getRegistryType(connectString, true).toLowerCase().trim(); if (registryType.equals("zookeeper")) { String host = getRegistryType(connectString, false); return new ZookeeperRegistry(host, Constant.TIME_OUT); } throw new DiscoveryException("未发现合适的注册中心"); } private String getRegistryType(String connectString, boolean ifType) { String[] typeAndHost = connectString.split("://"); if (typeAndHost.length != 2) { throw new RuntimeException("给定的注册中心连接url不合法"); } if (ifType){ return typeAndHost[0]; }else { return typeAndHost[1]; } } }

6.在framework/common的exceptions中创建DiscoveryException类:服务注册与发现异常处理

/** * 服务注册与发现异常处理 */ public class DiscoveryException extends RuntimeException{ public DiscoveryException() { super(); } public DiscoveryException(String message) { super(message); } public DiscoveryException(Throwable cause) { super(cause); } } c.本地服务列表

服务调用方需要通过服务中心发现服务列表

1.使用Map进行服务列表的存储2.使用动态代理生成代理对象3.从注册中心,寻找一个可用的服务

1.修改DcyRpcBootstrap部分代码:使用Map进行服务列表的存储

// 维护已经发布且暴露的服务列表 key:interface的全限定名 value:ServiceConfig private static final Map SERVERS_LIST = new HashMap(16); /** * 发布服务:将接口与匹配的实现注册到服务中心 * @param service 封装需要发布的服务 * @return */ public DcyRpcBootstrap publish(ServiceConfig service) { // 抽象了注册中心的概念,使用注册中心的一个实现完成注册 zookeeperRegistry.register(service); // 1.当服务调用方,通过接口、方法名、具体的方法参数列表 发起调用,提供方怎么知道使用哪一个实现 // (1) new 1 个 // (2) spring beanFactory.getBean(Class) // (3) 自己维护映射关系 SERVERS_LIST.put(service.getInterface().getName(), service); return this; }

2.修改ReferenceConfig部分代码

// 略..... private Registry registry; /** * 代理设计模式,生成一个API接口的代理对象 * @return 代理对象 */ public T get() { // 使用动态代理完成工作 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class[] classes = new Class[]{interfaceRef}; // 使用动态代理生成代理对象 Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 1.发现服务,从注册中心,寻找一个可用的服务 // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name) InetSocketAddress address = registry.lookup(interfaceRef.getName()); if (log.isInfoEnabled()){ log.debug("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address); } // 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果 return null; } }); return (T) helloProxy; } public Registry getRegistry() { return registry; } public void setRegistry(Registry registry) { this.registry = registry; }

3.修改Registry接口,添加发现服务的接口

/** * 从注册中心拉取一个可用的服务 * @param serviceName 服务名称 * @return 服务的ip+端口 */ InetSocketAddress lookup(String serviceName);

4.修改ZookeeperRegistry部分代码,实现发现服务的业务逻

@Override public InetSocketAddress lookup(String serviceName) { // 1.找到对应服务的节点 String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName; // 2.从zk中获取它的子节点, List children = ZookeeperUtils.getChildren(zooKeeper, serviceNode, null); // 获取所有的可用的服务列表 List inetSocketAddressList = children.stream().map(ipString -> { String[] ipAndPort = ipString.split(":"); String ip = ipAndPort[0]; int port = Integer.valueOf(ipAndPort[1]); return new InetSocketAddress(ip, port); }).toList(); if (inetSocketAddressList.size() == 0){ throw new DiscoveryException("未发现任何可用的服务主机"); } return inetSocketAddressList.get(0); }

5.修改ZookeeperUtils部分代码,添加与实现获取子节点的方法

/** * 查询一个节点的子元素 * @param zooKeeper * @param serviceNode 服务节点 * @return 子元素列表 */ public static List getChildren(ZooKeeper zooKeeper, String serviceNode, Watcher watcher) { try { return zooKeeper.getChildren(serviceNode, watcher); } catch (KeeperException | InterruptedException e) { log.error("获取节点{}的子元素时发生异常:{}", serviceNode, e); throw new ZookeeperException(e); } }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有