草庐IT

实现etcd服务注册与发现

liuyuede123 2023-03-28 原文

转载自:实现etcd服务注册与发现

0.1、目录结构

.
├── api
│   └── main.go
├── common
│   └── common.go
├── docker-compose.yml
├── etcd
│   └── Dockerfile
├── go.mod
├── go.sum
├── rpc
│   ├── courseware
│   │   ├── courseware.pb.go
│   │   └── courseware_grpc.pb.go
│   ├── courseware.proto
│   └── main.go
└── server
    ├── service_discovery.go
    └── service_registration.go

1、docker-compose部署一个3节点的集群

项目根目录下创建etcd目录,并在目录下新增Dockerfile文件

FROM bitnami/etcd:latest

LABEL maintainer="liuyuede123 <liufutianoppo@163.com>"

项目根目录下新增docker-compose.yml

version: '3.5'
# 网络配置
networks:
  backend:
    driver: bridge

# 服务容器配置
services:
  etcd1:                                  # 自定义容器名称
    build:
      context: etcd                    # 指定构建使用的 Dockerfile 文件
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_NAME=etcd1
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    ports:                               # 设置端口映射
      - "12379:2379"
      - "12380:2380"
    networks:
      - backend
    restart: always

  etcd2: # 自定义容器名称
    build:
      context: etcd                    # 指定构建使用的 Dockerfile 文件
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_NAME=etcd2
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    ports: # 设置端口映射
      - "22379:2379"
      - "22380:2380"
    networks:
      - backend
    restart: always

  etcd3: # 自定义容器名称
    build:
      context: etcd                    # 指定构建使用的 Dockerfile 文件
    environment:
      - TZ=Asia/Shanghai
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_NAME=etcd3
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    ports: # 设置端口映射
      - "32379:2379"
      - "32380:2380"
    networks:
      - backend
    restart: always

相关参数概念:

  1. ETCD_INITIAL_ADVERTISE_PEER_URLS:该成员节点在整个集群中的通信地址列表,这个地址用来传输集群数据的地址。因此这个地址必须是可以连接集群中所有的成员的。
  2. ETCD_LISTEN_PEER_URLS:该节点与其他节点通信时所监听的地址列表,多个地址使用逗号隔开,其格式可以划分为scheme://IP:PORT,这里的scheme可以是http、https
  3. ETCD_LISTEN_CLIENT_URLS:该节点与客户端通信时监听的地址列表
  4. ETCD_ADVERTISE_CLIENT_URLS:广播给集群中其他成员自己的客户端地址列表
  5. ETCD_INITIAL_CLUSTER_TOKEN:初始化集群token
  6. ETCD_INITIAL_CLUSTER:配置集群内部所有成员地址,其格式为:ETCD_NAME=ETCD_INITIAL_ADVERTISE_PEER_URLS,如果有多个使用逗号隔开
  7. ETCD_INITIAL_CLUSTER_STATE:初始化集群状态,new表示新建

启动集群

docker-compose up -d
Creating network "etcd_backend" with driver "bridge"
Creating etcd_etcd1_1 ... done
Creating etcd_etcd2_1 ... done
Creating etcd_etcd3_1 ... done

测试集群可用性

# 登录其中一个节点
docker exec -it 5f97bf0b446f6e6514576fc1eb46c2f60d2c2b3e3f3ee3b1ad6219414fa915c8 /bin/sh
# 写入一个键值
etcdctl put name "liuyuede"
OK
# 查看
etcdctl get name
name
liuyuede

# 登录另外俩个节点
docker exec -it a6ccc9b6e5cc81ee7c779e2b9e7235cd6d814e92fbc66b7e4846798acff8ee2a /bin/sh
etcdctl get name
name
liuyuede

docker exec -it 6817fa89e3e9e422628e0049910b672df389c62d41bf2349a0f77e22c99e5270 /bin/sh
etcdctl get name
name
liuyuede

etcd集群采用的是raft协议,一般至少为俩个集群,只有一个master,如果删除到只剩一个节点当前节点也不能提供服务

查看集群情况

etcdctl --endpoints=http://0.0.0.0:12379,0.0.0.0:22379,0.0.0.0:32379 endpoint status --write-out=table

+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
|       ENDPOINT       |        ID        | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| http://0.0.0.0:12379 | ade526d28b1f92f7 |   3.5.4 |   20 kB |      true |      false |         3 |         13 |                 13 |        |
|        0.0.0.0:22379 | d282ac2ce600c1ce |   3.5.4 |   20 kB |     false |      false |         3 |         13 |                 13 |        |
|        0.0.0.0:32379 | bd388e7810915853 |   3.5.4 |   20 kB |     false |      false |         3 |         13 |                 13 |        |
+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+

2、增加服务注册功能

服务注册的流程

  1. 向etcd新增一个包含rpc服务信息的键值对,并设置租约(比如5秒过期)
  2. 利用保活函数KeepAlive不断续约
package server

import (
	"context"
	"encoding/json"
	"errors"
	clientv3 "go.etcd.io/etcd/client/v3"
	"time"
)

type ServiceInfo struct {
	Name string
	Ip   string
}

type Service struct {
	ServiceInfo ServiceInfo
	stop        chan error
	leaseId     clientv3.LeaseID
	client      *clientv3.Client
}

func NewService(serviceInfo ServiceInfo, endpoints []string) (service *Service, err error) {
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: time.Second * 10,
	})
	if err != nil {
		return nil, err
	}

	service = &Service{
		ServiceInfo: serviceInfo,
		client:      client,
	}
	return
}

func (s *Service) Start(ctx context.Context) (err error) {
	alive, err := s.KeepAlive(ctx)
	if err != nil {
		return
	}

	for {
		select {
		case err = <-s.stop: // 服务端关闭返回错误
			return err
		case <-s.client.Ctx().Done(): // etcd关闭
			return errors.New("server closed")
		case _, ok := <-alive:
			if !ok { // 保活通道关闭
				return s.revoke(ctx)
			}
		}
	}
}

func (s *Service) KeepAlive(ctx context.Context) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
	info := s.ServiceInfo
	key := s.getKey()
	val, _ := json.Marshal(info)

	// 创建租约
	leaseResp, err := s.client.Grant(ctx, 5)
	if err != nil {
		return nil, err
	}

	// 写入etcd
	_, err = s.client.Put(ctx, key, string(val), clientv3.WithLease(leaseResp.ID))
	if err != nil {
		return nil, err
	}

	s.leaseId = leaseResp.ID
	return s.client.KeepAlive(ctx, leaseResp.ID)
}

// 取消租约
func (s *Service) revoke(ctx context.Context) error {
	_, err := s.client.Revoke(ctx, s.leaseId)
	return err
}

func (s *Service) getKey() string {
	return s.ServiceInfo.Name + "/" + s.ServiceInfo.Ip
}

3、增加服务发现

服务发现流程

  1. 实现grpc中resolver.Builder接口的Build方法
  2. 通过etcdclient获取并监听grpc服务(是否有新增或者删除)
  3. 更新到resolver.State,State 包含与 ClientConn 相关的当前 Resolver 状态,包括grpc的地址resolver.Address
package server

import (
	"context"
	"encoding/json"
	"fmt"
	"go.etcd.io/etcd/api/v3/mvccpb"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc/resolver"
)

type Discovery struct {
	endpoints  []string
	service    string
	client     *clientv3.Client
	clientConn resolver.ClientConn
}

func NewDiscovery(endpoints []string, service string) resolver.Builder {
	return &Discovery{
		endpoints: endpoints,
		service:   service,
	}
}

func (d *Discovery) ResolveNow(rn resolver.ResolveNowOptions) {

}

func (d *Discovery) Close() {

}

func (d *Discovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	var err error
	d.client, err = clientv3.New(clientv3.Config{
		Endpoints: d.endpoints,
	})
	if err != nil {
		return nil, err
	}

	d.clientConn = cc

	go d.watch(d.service)

	return d, nil
}

func (d *Discovery) Scheme() string {
	return "etcd"
}

func (d *Discovery) watch(service string) {
	addrM := make(map[string]resolver.Address)
	state := resolver.State{}

	update := func() {
		addrList := make([]resolver.Address, 0, len(addrM))
		for _, address := range addrM {
			addrList = append(addrList, address)
		}
		state.Addresses = addrList
		err := d.clientConn.UpdateState(state)
		if err != nil {
			fmt.Println("更新地址出错:", err)
		}
	}
	resp, err := d.client.Get(context.Background(), service, clientv3.WithPrefix())
	if err != nil {
		fmt.Println("获取地址出错:", err)
	} else {
		for i, kv := range resp.Kvs {
			info := &ServiceInfo{}
			err = json.Unmarshal(kv.Value, info)
			if err != nil {
				fmt.Println("解析value失败:", err)
			}
			addrM[string(resp.Kvs[i].Key)] = resolver.Address{
				Addr:       info.Ip,
				ServerName: info.Name,
			}
		}
	}

	update()

	dch := d.client.Watch(context.Background(), service, clientv3.WithPrefix(), clientv3.WithPrevKV())
	for response := range dch {
		for _, event := range response.Events {
			switch event.Type {
			case mvccpb.PUT:
				info := &ServiceInfo{}
				err = json.Unmarshal(event.Kv.Value, info)
				if err != nil {
					fmt.Println("监听时解析value报错:", err)
				} else {
					addrM[string(event.Kv.Key)] = resolver.Address{Addr: info.Ip}
				}
				fmt.Println(string(event.Kv.Key))
			case mvccpb.DELETE:
				delete(addrM, string(event.Kv.Key))
				fmt.Println(string(event.Kv.Key))
			}
		}
		update()
	}
}

4、grpc课件服务

common参数

package common

const CoursewareRpc = "rpc.courseware"

var Endpoints = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}

生成课件服务grpc

syntax = "proto3";

package rpc;
option go_package = "./courseware";

message GetRequest {
  uint64 Id = 1;
}
message GetResponse {
  uint64 Id = 1;
  string Code = 2;
  string Name = 3;
  uint64 Type = 4;
}


service Courseware {
  rpc Get(GetRequest) returns(GetResponse);
}
protoc --go_out=./ --go-grpc_out=./ courseware.proto

课件服务入口

package main

import (
	"context"
	"fmt"
	"go-demo/etcd/common"
	"go-demo/etcd/rpc/courseware"
	"go-demo/etcd/server"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
	"net"
	"os"
	"strings"
	"time"
)

var Port string

type service struct {
	courseware.UnsafeCoursewareServer
}

func (s *service) Get(ctx context.Context, req *courseware.GetRequest) (res *courseware.GetResponse, err error) {
	fmt.Println("获取课件详情 port:", Port, " time:", time.Now())
	return &courseware.GetResponse{
		Id:   1,
		Code: "HD4544",
		Name: "多媒体课件",
		Type: 4,
	}, nil
}
func main() {
	args := os.Args[1:]
	if len(args) == 0 {
		panic("缺少port参数:port=8400")
	}
	for _, arg := range args {
		ports := strings.Split(arg, "=")
		if len(ports) < 2 || ports[0] != "port" {
			panic("port参数格式错误:port=8400")
		}
		Port = ports[1]
	}
	listen, err := net.Listen("tcp", ":"+Port)
	if err != nil {
		fmt.Println("failed to listen", err)
		return
	}
	s := grpc.NewServer()
	courseware.RegisterCoursewareServer(s, &service{})

	reflection.Register(s)

  // 注册到etcd
	newService, err := server.NewService(server.ServiceInfo{
		Name: common.CoursewareRpc,
		Ip:   "127.0.0.1:" + Port,
	}, common.Endpoints)
	if err != nil {
		fmt.Println("添加到etcd失败:", err)
		return
	}

	go func() {
		err = newService.Start(context.Background())
		if err != nil {
			fmt.Println("开启服务注册失败:", err)
		}
	}()

	if err = s.Serve(listen); err != nil {
		fmt.Println("开启rpc服务失败:", err)
	}
}

5、api服务

package main

import (
	"context"
	"fmt"
	"go-demo/etcd/common"
	"go-demo/etcd/rpc/courseware"
	"go-demo/etcd/server"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/resolver"
	"time"
)

func main() {
	d := server.NewDiscovery(common.Endpoints, common.CoursewareRpc)
	resolver.Register(d)

	for {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    // 通过etcd注册中心和grpc服务建立连接
		conn, err := grpc.DialContext(ctx,
			fmt.Sprintf(d.Scheme()+":///"+common.CoursewareRpc),
			grpc.WithTransportCredentials(insecure.NewCredentials()),
			grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
			grpc.WithBlock(),
		)
		if err != nil {
			fmt.Println("和rpc建立连接失败:", err)
			return
		}

		client := courseware.NewCoursewareClient(conn)
		get, err := client.Get(ctx, &courseware.GetRequest{Id: 1})
		if err != nil {
			fmt.Println("获取课件失败:", err)
			return
		}

		fmt.Println(get)

		time.Sleep(3 * time.Second)
		cancel()
	}

}

6、测试

开启3个服务,可以看到客户端通过负载均衡随机到一个服务请求

go run main.go port=8400
获取课件详情 port: 8400  time: 2022-08-25 18:47:43.784942 +0800 CST m=+78.228450885
获取课件详情 port: 8400  time: 2022-08-25 18:47:52.925858 +0800 CST m=+87.369721731
获取课件详情 port: 8400  time: 2022-08-25 18:48:02.001177 +0800 CST m=+96.445393312
获取课件详情 port: 8400  time: 2022-08-25 18:48:05.060066 +0800 CST m=+99.504401028
获取课件详情 port: 8400  time: 2022-08-25 18:48:14.154148 +0800 CST m=+108.598836458
go run main.go port=8500
获取课件详情 port: 8500  time: 2022-08-25 18:47:46.832479 +0800 CST m=+62.822399701
获取课件详情 port: 8500  time: 2022-08-25 18:47:49.844536 +0800 CST m=+65.834573960
获取课件详情 port: 8500  time: 2022-08-25 18:47:55.955638 +0800 CST m=+71.945912584
获取课件详情 port: 8500  time: 2022-08-25 18:48:17.168293 +0800 CST m=+93.159391485
获取课件详情 port: 8500  time: 2022-08-25 18:48:20.182787 +0800 CST m=+96.174002796
go run main.go port=8600
获取课件详情 port: 8600  time: 2022-08-25 18:47:58.968283 +0800 CST m=+1.317052360
获取课件详情 port: 8600  time: 2022-08-25 18:48:08.106493 +0800 CST m=+10.455617422
获取课件详情 port: 8600  time: 2022-08-25 18:48:11.125212 +0800 CST m=+13.474453269

有关实现etcd服务注册与发现的更多相关文章

  1. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  2. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  3. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  4. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

  5. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  6. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  7. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  8. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  9. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  10. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

随机推荐