草庐IT

Rpc-实现Zookeeper注册中心

zko0 2023-04-16 原文

1.前言

本文章是笔主在声哥的手写RPC框架的学习下,对注册中心的一个拓展。因为声哥某些部分没有保留拓展性,所以本文章的项目与声哥的工程有部分区别,核心内容在Curator的注册发现与注销,思想看准即可。

本文章Git仓库:zko0/zko0-rpc

声哥的RPC项目写的确实很详细,跟学一遍受益匪浅:

何人听我楚狂声的博客

在声哥的项目里使用Nacos作为了服务注册中心。本人拓展添加了ZooKeeper实现服务注册。

Nacos的服务注册和发现,设计的不是非常好,每次服务的发现都需要去注册中心拉取。本人实现ZooKeeper注册中心时,参考了Dubbo的设计原理,结合本人自身想法,添加了本地缓存:

  • Client发现服务后缓存在本地,维护一个服务——实例列表
  • 当监听到注册中心的服务列表发生了变化,Client更新本地列表
  • 当注册中心宕机,Client能够依靠本地的服务列表继续提供服务

问题:

  1. 实现服务注册的本地缓存,还需要实现注册中心的监听,当注册中心的服务发生更改时能够实现动态更新。或者用轮训的方式,定时更新,不过这种方式的服务实时性较差
  2. 当Server宕机,非临时节点注册容易出现服务残留无法清除的问题。所以我建议全部使用临时节点去注册。

2.内容

zookeeper需要简单学一下,知识内容非常简单,搭建也很简单,在此跳过。

如果你感兴趣,可以参考我的ZooKeeper的文章:Zookeeper学习笔记 - zko0

①添加依赖

Curator:(简化ZooKeeper客户端使用)(Netfix研发,捐给Apache,是Apache顶级项目)

这里排除slf4j依赖,因为笔主使用的slf4j存在冲突

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

②代码编写

1.首先创建一个连接类:

@Slf4j
public class ZookeeperUtil {

    //内部化构造方法
    private ZookeeperUtil(){
    }

    private static final String SERVER_HOSTNAME= RegisterCenterConfig.getHostName();

    private static final Integer SERVER_PORT=RegisterCenterConfig.getServerPort();

    private static CuratorFramework zookeeperClient;

    public static CuratorFramework getZookeeperClient(){
        if (zookeeperClient==null){
            synchronized (ZookeeperUtil.class){
                if (zookeeperClient==null){
                    RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
                    zookeeperClient = CuratorFrameworkFactory.builder()
                            .connectString(SERVER_HOSTNAME+":"+SERVER_PORT)
                            .retryPolicy(retryPolic)
                            //  zookeeper根目录为/serviceRegister,不为/
                            .namespace("serviceRegister")
                            .build();
                    zookeeperClient.start();
                }
            }
        }
        return zookeeperClient;
    }

    public static String getServerHostname(){
        return SERVER_HOSTNAME;
    }

    public static Integer getServerPort(){
        return SERVER_PORT;
    }

}

其中HOST,PORT信息我保存在regiserCenter.properties配置文件夹中,使用类读取:

public class RpcConfig {


    //注册中心类型
    private static String registerCenterType;

    //序列化类型
    private static String serializerType;

    //负载均衡类型
    private static String loadBalanceType;

    //配置Nacos地址
    private static String registerCenterHost;

    private static Integer registerCenterPort;


    private static boolean zookeeperDestoryIsEphemeral;

    private static String serverHostName;

    private static Integer serverPort;

    static {
        ResourceBundle bundle = ResourceBundle.getBundle("rpc");
        registerCenterType=bundle.getString("registerCenter.type");
        loadBalanceType=bundle.getString("loadBalance.type");
        registerCenterHost=bundle.getString("registerCenter.host");
        registerCenterPort = Integer.parseInt(bundle.getString("registerCenter.port"));
        try {
            zookeeperDestoryIsEphemeral="true".equals(bundle.getString("registerCenter.destory.isEphemeral"));
        } catch (Exception e) {
            zookeeperDestoryIsEphemeral=false;
        }
        serializerType=bundle.getString("serializer.type");
        serverHostName=bundle.getString("server.hostName");
        serverPort=Integer.parseInt(bundle.getString("server.port"));
    }

    public static String getRegisterCenterType() {
        return registerCenterType;
    }

    public static String getSerializerType() {
        return serializerType;
    }

    public static String getLoadBalanceType() {
        return loadBalanceType;
    }

    public static String getRegisterCenterHost() {
        return registerCenterHost;
    }

    public static Integer getRegisterCenterPort() {
        return registerCenterPort;
    }


    public static String getServerHostName() {
        return serverHostName;
    }

    public static Integer getServerPort() {
        return serverPort;
    }

    public static boolean isZookeeperDestoryIsEphemeral() {
        return zookeeperDestoryIsEphemeral;
    }

}

下面的代码我和声哥有些不同,我将服务注册,注销方法放在ServerUtils中,服务发现方法放在ClientUtils中:

服务的高一致性存在两种做法:

  • 因为ZooKeeper存在临时节点,注册中心可以实现Client(RPC的Server)断开,注册服务信息的自动丢失
  • 不设置为临时节点,手动的服务注册清除

我这里两种都实现了,虽然做两种方式不同但是功能相同的代码放在一起看起来很奇怪,这里只是做演示。选择其中一种即可。(我建议使用临时节点,当Server宕机,残留的服务信息也能及时清除)

注册实现原理图:

接口:

public interface ServiceDiscovery {
    InetSocketAddress searchService(String serviceName);

    void cleanLoaclCache(String serviceName);
}
public interface ServiceRegistry {
    //服务注册
    void register(String serviceName, InetSocketAddress inetAddress);

    void cleanRegistry();
}

ZooKeeper接口实现:

public class ZookeeperServiceDiscovery implements ServiceDiscovery{

    private final LoadBalancer loadBalancer;

    public ZookeeperServiceDiscovery(LoadBalancer loadBalancer) {
        this.loadBalancer = loadBalancer;
    }
    
    @Override
    public InetSocketAddress searchService(String serviceName) {
        return ZookeeperClientUtils.searchService(serviceName,loadBalancer);
    }

    @Override
    public void cleanLoaclCache(String serviceName) {
        ZookeeperClientUtils.cleanLocalCache(serviceName);
    }
}
public class ZookeeperServiceRegistry implements ServiceRegistry{
    @Override
    public void register(String serviceName, InetSocketAddress inetAddress) {
        ZookeeperServerUitls.register(serviceName,inetAddress);
    }

    @Override
    public void cleanRegistry() {
        ZookeeperServerUitls.cleanRegistry();
    }
}

Factory工厂:

public class ServiceFactory {

    private static String center = RpcConfig.getRegisterCenterType();
    private static String lb= RpcConfig.getLoadBalanceType();

    private static  ServiceRegistry registry;

    private static  ServiceDiscovery discovery;

    private static Object registerLock=new Object();

    private static Object discoveryLock=new Object();

    public static ServiceDiscovery getServiceDiscovery(){
        if (discovery==null){
            synchronized (discoveryLock){
                if (discovery==null){
                    if ("nacos".equalsIgnoreCase(center)){
                        discovery= new NacosServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
                    }else if ("zookeeper".equalsIgnoreCase(center)){
                        discovery= new ZookeeperServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
                    }
                }
            }
        }
        return discovery;
    }

    public static ServiceRegistry getServiceRegistry(){
        if (registry==null){
            synchronized (registerLock){
                if (registry==null){
                    if ("nacos".equalsIgnoreCase(center)){
                        registry=  new NacosServiceRegistry();
                    }else if ("zookeeper".equalsIgnoreCase(center)){
                        registry= new ZookeeperServiceRegistry();
                    }
                }
            }
        }
        return registry;
    }

}

使用Gson序列化InetSocketAddress存在问题,编写Util:

public class InetSocketAddressSerializerUtil {
    public static String getJsonByInetSockerAddress(InetSocketAddress address){
        HashMap<String, String> map = new HashMap<>();
        map.put("host",address.getHostName());
        map.put("port",address.getPort()+"");
        return new Gson().toJson(map);
    }

    public static InetSocketAddress getInetSocketAddressByJson(String json){
        HashMap<String,String> hashMap = new Gson().fromJson(json, HashMap.class);
        String host = hashMap.get("host");
        Integer port=Integer.parseInt(hashMap.get("port"));
        return new InetSocketAddress(host,port);
    }

}

上面主要是注册,发现的逻辑,我把主要方法写在了Utils中:

@Slf4j
public class ZookeeperServerUitls {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    private static final Set<String> instances=new ConcurrentHashSet<>();

    public static void register(String serviceName, InetSocketAddress inetSocketAddress){

        serviceName=ZookeeperUtil.serviceName2Path(serviceName);;
        String uuid = UUID.randomUUID().toString();
        serviceName=serviceName+"/"+uuid;
        String json = InetSocketAddressSerializerUtil.getJsonByInetSockerAddress(inetSocketAddress);
        try {
            if (RpcConfig.isZookeeperDestoryIsEphemeral()){
                //会话结束节点,创建消失
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(serviceName,json.getBytes());
            } else {
                client.create()
                        .creatingParentsIfNeeded()
                        .forPath(serviceName,json.getBytes());
            }
        }
            catch (Exception e) {
            log.error("服务注册失败");
            throw  new RpcException(RpcError.REGISTER_SERVICE_FAILED);
        }
        //放入map
        instances.add(serviceName);
    }

    public static void cleanRegistry(){
        log.info("注销所有注册的服务");
        //如果自动销毁,不需要清除
        if (RpcConfig.isZookeeperDestoryIsEphemeral()) return;
        if (ZookeeperUtil.getServerHostname()!=null&&ZookeeperUtil.getServerPort()!=null&&!instances.isEmpty()){
            for (String path:instances) {
                try {
                    client.delete().forPath(path);
                } catch (Exception e) {
                    log.error("服务注销失败");
                    throw new RpcException(RpcError.DESTORY_REGISTER_FALL);
                }
            }
        }
    }
}
@Slf4j
public class ZookeeperClientUtils {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    private static final Map<String,  List<InetSocketAddress>> instances=new ConcurrentHashMap<>();

    public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
        InetSocketAddress address;
        //本地缓存查询
        if (instances.containsKey(serviceName)){
            List<InetSocketAddress> addressList = instances.get(serviceName);
            if (!addressList.isEmpty()){
                //使用lb进行负载均衡
                return loadBalancer.select(addressList);
            }
        }
        try {
            String path = ZookeeperUtil.serviceName2Path(serviceName);
            //获取路径下所有的实现
            List<String> instancePaths = client.getChildren().forPath(path);
            List<InetSocketAddress> addressList = new ArrayList<>();
            for (String instancePath : instancePaths) {
                byte[] bytes = client.getData().forPath(path+"/"+instancePath);
                String json = new String(bytes);
                InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
                addressList.add(instance);
            }
            addLocalCache(serviceName,addressList);
            return loadBalancer.select(addressList);
        } catch (Exception e) {
            log.error("服务获取失败====>{}",e);
            throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
        }
    }

    public static void cleanLocalCache(String serviceName){
        log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);
        instances.remove(serviceName);
    }

    public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
        //直接替换原本的缓存
        instances.put(serviceName,addressList);
    }
}

③配置文件

rpc.properties放在resources下

#nacos    zookeeper
#registerCenter.type=nacos
registerCenter.type=zookeeper

#registerCenter.host=127.0.0.1
registerCenter.host=101.43.244.40

#zookeeper port default 2181
#registerCenter.port=9000
registerCenter.port=2181

registerCenter.destory.isEphemeral=false

#??random?roundRobin
loadBalance.type=random

#kryo json jdk
serializer.type=kryo

server.hostName=127.0.0.1
server.port=9999

④更多

声哥的代码我做了很多修改,如果上述代码和你参考的项目代码出入比较大,可以查看本文章的工程阅读。

有关Rpc-实现Zookeeper注册中心的更多相关文章

  1. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  2. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  3. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  4. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  5. 阿里云国际版免费试用:如何注册以及注意事项 - 2

    作为新的阿里云用户,您可以50免费试用多种优惠,价值高达1,700美元(或8,500美元)。这将让您了解和体验阿里云平台上提供的一系列产品和服务。如果您以个人身份注册免费试用,您将获得价值1,700美元的优惠。但是,如果您是注册公司,您可以选择企业免费试用,提交基本信息通过企业实名注册验证,即可开始价值$8,500的免费试用!本教程介绍了如何设置您的帐户并使用您的免费试用版。​关于免费试用在我们开始此试用之前,您还必须遵守以下条款和条件才能访问您的免费试用:只有在一年内创建的账户才有资格获得阿里云免费试用。通过此免费试用优惠,用户可以免费试用免费试用活动页面上列出的每种产品一次。如果您有多个帐

  6. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  7. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

  8. ruby-on-rails - 设计注册确认 - 2

    我在我的项目中有一个用户和一个管理员角色。我使用Devise创建了身份验证。在我的管理员角色中,我没有任何确认。在我的用户模型中,我有以下内容:devise:database_authenticatable,:confirmable,:recoverable,:rememberable,:trackable,:validatable,:timeoutable,:registerable#Setupaccessible(orprotected)attributesforyourmodelattr_accessible:email,:username,:prename,:surname,:

  9. ruby - "public/protected/private"方法是如何实现的,我该如何模拟它? - 2

    在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定

  10. ruby - 实现k最近邻需要哪些数据? - 2

    我目前有一个reddit克隆类型的网站。我正在尝试根据我的用户之前喜欢的帖子推荐帖子。看起来K最近邻或k均值是执行此操作的最佳方法。我似乎无法理解如何实际实现它。我看过一些数学公式(例如k表示维基百科页面),但它们对我来说并没有真正意义。有人可以推荐一些伪代码,或者可以查看的地方,以便我更好地了解如何执行此操作吗? 最佳答案 K最近邻(又名KNN)是一种分类算法。基本上,您采用包含N个项目的训练组并对它们进行分类。如何对它们进行分类完全取决于您的数据,以及您认为该数据的重要分类特征是什么。在您的示例中,这可能是帖子类别、谁发布了该项

随机推荐