go-kit 服务发现(4) Eureka

基本使用

注册

logger := log.NewLogfmtLogger(os.Stdout)
var fargoConfig fargo.Config
fargoConfig.Eureka.ServiceUrls = []string{"http://localhost:8761/eureka"}
// 订阅服务器应轮询更新的频率。
fargoConfig.Eureka.PollIntervalSeconds = 1

instance := &fargo.Instance{
    InstanceId : "实例ID",
    //HostName:         "127.0.0.1",
    Port:   8080,
    App:    "hello",
    IPAddr: "http://127.0.0.1",
    //HealthCheckUrl: "http://localhost:8080/hello",
    //StatusPageUrl:  "http://localhost:8080/hello",
    //HomePageUrl:    "http://localhost:8080/hello",
    Status:         fargo.UP,
    DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn},
    LeaseInfo:      fargo.LeaseInfo{RenewalIntervalInSecs: 1},
}
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
register := eureka.NewRegistrar(&fargoConnection, instance, logger)

register.Register()
defer register.Deregister()

发现

logger := log.NewLogfmtLogger(os.Stdout)
var fargoConfig fargo.Config
fargoConfig.Eureka.ServiceUrls = []string{"http://localhost:8761/eureka"}
fargoConfig.Eureka.PollIntervalSeconds = 1

fargoConnection := fargo.NewConnFromConfig(fargoConfig)
instancer := eureka.NewInstancer(&fargoConnection,"hello",logger)

底层原理

目录结构

.
├── doc.go
├── instancer.go 服务实例 
├── instancer_test.go
├── integration_test.go
├── registrar.go 注册器
├── registrar_test.go
└── util_test.go

目录中主要的是这三个文件,instancer.go registrar.go
与sd目录下的其他注册发现组件不同,
作者删除了client.go文件,改为使用fargo.EurekaConnection

registrar.go

// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
    conn     fargoConnection
    instance *fargo.Instance
    logger   log.Logger
    quitc    chan chan struct{}
    sync.Mutex
}

func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar
func (r *Registrar) Register()
func (r *Registrar) Deregister()
func (r *Registrar) loop()
func (r *Registrar) heartbeat() error 

包含以下3个函数

  • NewRegistrar 创建 Registrar
  • Register 通过 fargoConnection 注册服务
  • Deregister 通过 fargoConnection 注销服务
  • loop

    • fargoConfig.Eureka.PollIntervalSeconds 设置的定时器,每隔设定的秒数(默认30秒),调用heartbeat函数
    • 监听Registrar.quitc,退出loop
  • heartbeat

    • 调用fargo.EurekaConnection.HeartBeatInstance,向Eureka中注册的服务实例发送心跳请求,具体的发送方式为向指定的节点发送Http PUT请求,如PUT http://Eureka Server(s):8761/eureka/apps/APP名称/实例ID,请求返回HTTP状态码为200即为成功

instancer.go

type Instancer struct {
    cache  *instance.Cache
    conn   fargoConnection
    app    string
    logger log.Logger
    quitc  chan chan struct{}
}

2019/6/22 posted in  gokit

go-kit 服务发现(3) etcd

基本使用

注册

//Etcd客户端
client, _ := etcdv3.NewClient(context.Background(),[]string{"http://localhost:2379"},etcdv3.ClientOptions{})

//注册器
register := etcdv3.NewRegistrar(client,etcdv3.Service{
    Key: "/services/hello/",
    Value: "http://127.0.0.1:8080",
},logger)
register.Register()
defer register.Deregister()

发现

//Etcd客户端
client,_ := etcdv3.NewClient(context.Background(),[]string{"http://localhost:2379"},etcdv3.ClientOptions{})

//服务实例
instancer , _ := etcdv3.NewInstancer(client,"/services/hello/",logger)

底层原理

目录结构

.
├── client.go 客户端
├── doc.go
├── example_test.go
├── instancer.go 服务实例
├── instancer_test.go
├── integration_test.go
├── registrar.go 注册器
└── registrar_test.go

目录中主要的是这三个文件,client.go instancer.go registrar.go

client.go

type Client interface {
    //获取一组value通过key前缀
    GetEntries(prefix string) ([]string, error)
    //watch指定前缀的key
    WatchPrefix(prefix string, ch chan struct{})
    //注册服务
    Register(s Service) error
    //注销服务
    Deregister(s Service) error
    //etcd 
    LeaseID() int64
}

type client struct {
    //etcd客户端使用v3版本api
    cli *clientv3.Client
    ctx context.Context
    //etcd key/value 操作实例
    kv clientv3.KV
    // etcd watcher 操作实例
    watcher clientv3.Watcher
    // watcher context
    wctx context.Context
    // watcher cancel func
    wcf context.CancelFunc
    // leaseID will be 0 (clientv3.NoLease) if a lease was not created
    leaseID clientv3.LeaseID

    //etcdKeepAlive实现心跳检测
    hbch <-chan *clientv3.LeaseKeepAliveResponse
    // etcd Lease 操作实例
    leaser clientv3.Lease
}
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error)

主要包含以下6个函数

  • NewClient 创建etcd客户端,赋值给 client.cli
  • GetEntries 通过 client.kv 获取value
  • WatchPrefix 通过 client.watcher 监听key
  • Deregister 通过 client.cli 服务绑定的key
  • LeaseID return client.leaseID
  • Register
    • 初始化 client.leaser
    • 初始化 client.watcher
    • 初始化 client.kv
    • 通过 client.kv 操作写入etcd,服务注册的key和value
    • 创建 client.leaseID,默认心跳3秒,lease TTL9秒
    • client.leaser调用KeepAlive

registrar.go

type Registrar struct {
    //etcd客户端
    client  Client
    //注册的服务
    service Service
    logger  log.Logger

    //服务Deregister并发锁
    quitmtx sync.Mutex
    //服务退出通道
    quit    chan struct{}
}

//服务的key和地址
type Service struct {
    Key   string // unique key, e.g. "/service/foobar/1.2.3.4:8080"
    Value string // returned to subscribers, e.g. "http://1.2.3.4:8080"
    TTL   *TTLOption
}

//服务心跳检测
type TTLOption struct {
    heartbeat time.Duration // e.g. time.Second * 3
    ttl       time.Duration // e.g. time.Second * 10
}

func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption 
func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar
func (r *Registrar) Register()
func (r *Registrar) Deregister()

主要包含以下4个函数

  • NewTTLOption 心跳检测参数
  • NewRegistrar 创建 Registrar
  • Register 调用 client.go中 Register 方法
  • Deregister 调用 client.go中 Deregister 方法

instancer.go

type Instancer struct {
  //实例缓存
    cache  *instance.Cache
    //etcd客户端
    client Client
    //实例前缀
    prefix string
    logger log.Logger
    //Instancer 主动退出 通道
    quitc  chan struct{}
}

func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) 
func (s *Instancer) loop() 
func (s *Instancer) Stop()
func (s *Instancer) Register(ch chan<- sd.Event) 
func (s *Instancer) Deregister(ch chan<- sd.Event)

主要包含以下5个函数

  • NewInstancer
    • 调用 client.go GetEntries函数,获取对应的一组服务地址
    • 查询到的服务地址写入缓存 Instancer.cache
  • loop
    • 监听服务对应的key
  • Stop
    • 关闭服务监听
  • Register
  • Deregister
2019/6/22 posted in  gokit

go-kit 服务发现(2) zookeeper

基本使用

注册

client, err := zk.NewClient([]string{"localhost:2181"},logger)
if err != nil{
    panic(err)
}

register := zk.NewRegistrar(client,zk.Service{
    Path: "/services/hello",
    Name: "abc",
    Data: []byte("http://127.0.0.1:8080"),
},logger)

register.Register()

发现

client, err := zk.NewClient([]string{"localhost:2181"},logger)
if err != nil{
    panic(err)
}

instancer , err := zk.NewInstancer(client,"/services/hello/abc",logger)
if err != nil {
    panic(err)
}
duration := 500 * time.Millisecond
ctx := context.Background()
factory := helloFactory(ctx, "GET", "hello")
endpointer := sd.NewEndpointer(instancer, factory, logger)

endpointers,_ := endpointer.Endpoints()

底层原理

目录结构

.
├── client.go 客户端
├── client_test.go
├── doc.go
├── instancer.go 服务实例
├── instancer_test.go
├── integration_test.go
├── logwrapper.go
├── registrar.go 注册器
└── util_test.go

目录中主要的是这三个文件,client.go instancer.go registrar.go

client.go

type Client interface {
    //获取一组value通过key前缀
    GetEntries(path string) ([]string, <-chan zk.Event, error)
    //watch指定前缀的key
    CreateParentNodes(path string) error
    //注册服务
    Register(s *Service) error
    //注销服务
    Deregister(s *Service) error
    //停止zk链接
    Stop()
}


type client struct {
    *zk.Conn //组合 github.com/samuel/go-zookeeper/zk struct Conn
    clientConfig
    active bool
    quit   chan struct{}
}

func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error)
func (c *client) CreateParentNodes(path string)

主要包含以下5个函数

  • NewClient 创建zk客户端,client.Conn
  • GetEntries client.Get 获取value
  • Deregister client.Delete 删除zk中指定的key
  • CreateParentNodes 保证key中的父节点都已经被创建。由于zk创建子节点时父节点必须都存在,如:/a/b/c,当/a/b节点不存在时,/a/b/c节点无法创建

  • Register

    • CreateParentNodes函数创建所有父节点,已存在则跳过
    • client.CreateProtectedEphemeralSequential函数,创建一个保护 临时 顺序 节点(ProtectedEphemeralSequential),同时将value存在此节点中。
    • 保护顺序临时节点
      • 示例:_c_c83db041ac654566228b72cbd541bcb5-abc0000000006,其中加粗字体为GUID,_c_为默认前缀,abc0000000006为后缀
      • 临时节点,当zk链接会话关闭后,该节点就会被删除。
      • 顺序节点,创建的节点的名称以GUID作为前缀。如果节点创建失败,则会发生正常的重试机制。在重试过程中,首先搜索父路径,寻找包含GUID的节点。如果找到该节点,则假定它是第一次尝试成功创建并返回给调用者的丢失节点。
      • 保护节点,key自增后缀确保节点名称的唯一性

registrar.go

type Registrar struct {
  //zk客户端
    client  Client
    //注册的服务
    service Service
    logger  log.Logger
}

//服务的key和地址
type Service struct {
    Path string // 服务发现命名空间: /service/hello/
    Name string // 服务名称, example: abc
    Data []byte // 服务实例数据存在, 如: 10.0.2.10:80
    node string // 存储 ProtectedEphemeralSequential(保护临时顺序)节点的名称,便于Deregister函数注销服务
}

func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar
func (r *Registrar) Register() 
func (r *Registrar) Deregister()

包含以下3个函数

  • NewRegistrar 创建 Registrar
  • Register 调用 client.go中 Register 方法
  • Deregister 调用 client.go中 Deregister 方法

instancer.go

type Instancer struct {
  //实例缓存
    cache  *instance.Cache
    //zk客户端
    client Client
    //服务zk节点值
    path   string
    logger log.Logger
    //Instancer 主动退出 通道
    quitc  chan struct{}
}

func NewInstancer(c Client, path string, logger log.Logger) (*Instancer, error)
func (s *Instancer) loop(eventc <-chan zk.Event)
func (s *Instancer) Stop()
func (s *Instancer) Register(ch chan<- sd.Event) 
func (s *Instancer) state() sd.Event {

主要包含以下5个函数

  • NewInstancer
    • 调用 client.go GetEntries函数,获取对应的一组服务地址
  • loop
    • 监听服务对应的key
  • Stop
    • 关闭服务监听
  • Register
  • Deregister
2019/6/22 posted in  gokit

go-kit 服务发现(1) consul

服务发现

一、什么是服务?

OASIS将服务定义为“一种允许访问一个或多个功能的机制,其中使用指定的接口提供访问,并按照服务描述指定的约束和策略执行访问”。😰😰😰

  • 业务模块(user/mission/vip)
  • 基础组件(ipdb/uuid)
  • 缓存服务(redis/memcached)
  • 持久化服务(Mysql/ELS/MNS)
  • 网络服务(nginx/lb)
  • ...

二、什么是服务发现?

调用方无需知道服务提供者的网络位置(ip:端口等),只需通过服务名称(如user/item/mission),即可调用服务

三、为什么需要服务发现?

在现代的基于云计算的微服务应用中,服务实例会被动态地分配网络地址。并且,因为自动伸缩、故障和升级,服务实例会动态地改变。故而,你的客户端代码需要用一种更加精密的服务发现机制。而不是偶尔更新的配置文件中读取到网络地址

  1. 场景1: 需要新上线一个服务:

    • 提供者:配置域名、nginx、负载均衡、部署代码
    • 调用方:配置服务域名,调用具体业务
  2. 场景2: 某个热点事件的出现,导致流量爆增,需要扩容:

    • 提供者:配置nginx、负载均衡、部署代码
  3. 服务发现场景1: 需要新上线一个服务:

    • 提供者:部署代码(包含注册服务)
    • 调用方:部署代码(包含查询服务)
  4. 服务发现场景2: 某个热点事件的出现,导致流量爆增,需要扩容:

    • 提供者:部署代码(可配置自动扩容)

四、服务发现的流程

Title: 客户端发现
提供者->注册中心: 注册服务
提供者-->注册中心: 健康检查
消费者->注册中心: 查询服务提供者网络信息
注册中心->消费者: Ip:192.168.*.* Domain:8787
消费者->>提供者: 访问服务
Title: 服务端发现
提供者->注册中心: 注册服务
提供者-->注册中心: 健康检查
负载均衡器->注册中心: 查询服务提供者网络信息
消费者->负载均衡器: 查询服务提供者网络信息
负载均衡器->消费者: 转发 Ip:192.168.*.* Domain:8787
注册中心->负载均衡器: Ip:192.168.*.* Domain:8787
消费者->>提供者: 访问服务
客户端 服务端
请求数 少一次 多一次
消费者逻辑 内置服务发现逻辑 无需客户端服务发现逻辑
业界使用 多一些 少一些

五、服务发现的现有解决方案

stackshare对比页面

ZooKeeper Etcd Eureka Consul DNSSrv
多数据中心
自带服务发现
自带健康检查
自带WebUi
分布式Key/Value存储
开源 2.0闭源
一致性 paxos raft raft
监控 metrics metrics metrics
使用接口(多语言能力) 客户端 http/grpc http http/dns
CAP cp cp ap cp
开发语言 JAVA GO JAVA GO

源码原理

├── endpoint_cache_test.go 
├── endpoint_cache.go
├── endpointer.go
├── endpointer_test.go
├── instancer.go
├── factory.go
├── benchmark_test.go
├── registrar.go
├── doc.go
├── etcd
│   ├── client_test.go
│   ├── client.go 
│   ├── integration_test.go 
│   ├── registrar.go 
│   ├── registrar_test.go 
│   ├── example_test.go
│   ├── instancer.go  
│   ├── instancer_test.go
│   └── doc.go
├── zk
│   ├── client.go
│   ├── integration_test.go
│   ├── client_test.go
│   ├── instancer_test.go
│   ├── util_test.go
│   ├── instancer.go
│   ├── registrar.go
│   ├── logwrapper.go
│   └── doc.go
├── consul
│   ├── instancer_test.go
│   ├── instancer.go
│   ├── client_test.go
│   ├── integration_test.go
│   ├── client.go 
│   ├── registrar.go 
│   ├── registrar_test.go
│   └── doc.go
├── etcdv3
│   ├── integration_test.go
│   ├── client.go
│   ├── registrar_test.go
│   ├── registrar.go
│   ├── example_test.go
│   ├── instancer.go
│   ├── instancer_test.go
│   └── doc.go
├── lb(负载均衡)
│   ├── retry_test.go
│   ├── retry.go (多次尝试请求Endpoint)
│   ├── round_robin_test.go
│   ├── random_test.go
│   ├── round_robin.go (轮询调度Endpoint)
│   ├── random.go (随机选择Endpoint)
│   ├── balancer.go (包含一个Endpoint的接口)
│   └── doc.go
├── eureka
│   ├── util_test.go
│   ├── registrar.go
│   ├── integration_test.go
│   ├── registrar_test.go
│   ├── instancer.go
│   ├── instancer_test.go
│   └── doc.go
├── dnssrv(通过net包的dns客户端,通过SRV记录实现服务发现 [DNS SRV介绍](https://www.lijiaocn.com/%E6%8A%80%E5%B7%A7/2017/03/06/dns-srv.html))
│   ├── instancer.go
│   ├── instancer_test.go
│   ├── lookup.go
│   └── doc.go
└── internal(内部通过管道实现的应用内服务发现)
    └── instance

consul

基本使用

注册

var client consulsd.Client
{
    consulConfig := api.DefaultConfig()
    consulConfig.Address = "localhost:8500"
    consulClient, err := api.NewClient(consulConfig)
    if err != nil {
        logger.Log("err", err)
        os.Exit(1)
    }
    client = consulsd.NewClient(consulClient)
}

check := api.AgentServiceCheck{
    HTTP:     "http://127.0.0.1:8080/health",
    Interval: "10s",
    Timeout:  "1s",
    Notes:    "基础监控检查",
}

num := rand.Intn(100) // to make service ID unique
register := consulsd.NewRegistrar(client, &api.AgentServiceRegistration{
    ID:      "hello" + strconv.Itoa(num),
    Name:    "hello",
    Tags:    []string{"hello", "hi"},
    Port:    8080,
    Address: "http://127.0.0.1",
    Check:   &check,
}, logger)

register.Register()

发现

var client consulsd.Client
{
    consulConfig := api.DefaultConfig()

    consulConfig.Address = "http://localhost:8500"
    consulClient, err := api.NewClient(consulConfig)
    if err != nil {
        logger.Log("err", err)
        os.Exit(1)
    }
    client = consulsd.NewClient(consulClient)
}

tags := []string{}
passingOnly := true
duration := 500 * time.Millisecond
ctx := context.Background()
factory := helloFactory(ctx, "GET", "hello")
instancer := consulsd.NewInstancer(client, logger, "hello", tags, passingOnly)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(1, duration, balancer)
res, _ := retry(ctx, struct{}{})

底层原理

目录结构

.
├── client.go 
├── client_test.go
├── doc.go
├── instancer.go
├── instancer_test.go
├── integration_test.go
├── registrar.go
└── registrar_test.go

目录中主要的是这三个文件,client.go instancer.go registrar.go

client.go

// Client is a wrapper around the Consul API.
type Client interface {
    // Register a service with the local agent.
    Register(r *consul.AgentServiceRegistration) error

    // Deregister a service with the local agent.
    Deregister(r *consul.AgentServiceRegistration) error

    // Service
    Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
}

type client struct {
    consul *consul.Client
}

// NewClient returns an implementation of the Client interface, wrapping a
// concrete Consul client.
func NewClient(c *consul.Client) Client {
    return &client{consul: c}
}

func (c *client) Register(r *consul.AgentServiceRegistration) error {
    return c.consul.Agent().ServiceRegister(r)
}

func (c *client) Deregister(r *consul.AgentServiceRegistration) error {
    return c.consul.Agent().ServiceDeregister(r.ID)
}

func (c *client) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
    return c.consul.Health().Service(service, tag, passingOnly, queryOpts)
}

主要包含以下4个函数

  • NewClient 创建consul客户端
  • Register 注册服务
  • Deregister 注销服务
  • Service 获取服务/发现服务

registrar.go

// Registrar registers service instance liveness information to Consul.
type Registrar struct {
    client       Client
    registration *stdconsul.AgentServiceRegistration
    logger       log.Logger
}

// NewRegistrar returns a Consul Registrar acting on the provided catalog
// registration.
func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar {
    return &Registrar{
        client:       client,
        registration: r,
        logger:       log.With(logger, "service", r.Name, "tags", fmt.Sprint(r.Tags), "address", r.Address),
    }
}

// Register implements sd.Registrar interface.
func (p *Registrar) Register() {
    if err := p.client.Register(p.registration); err != nil {
        p.logger.Log("err", err)
    } else {
        p.logger.Log("action", "register")
    }
}

// Deregister implements sd.Registrar interface.
func (p *Registrar) Deregister() {
    if err := p.client.Deregister(p.registration); err != nil {
        p.logger.Log("err", err)
    } else {
        p.logger.Log("action", "deregister")
    }
}

包含以下3个函数

  • NewRegistrar 创建 Registrar
  • Register 调用 client.go中 Register 方法
  • Deregister 调用 client.go中 Deregister 方法

instancer.go

// Instancer yields instances for a service in Consul.
type Instancer struct {
    cache       *instance.Cache
    client      Client
    logger      log.Logger
    service     string
    tags        []string
    passingOnly bool
    quitc       chan struct{}
}

// NewInstancer returns a Consul instancer that publishes instances for the
// requested service. It only returns instances for which all of the passed tags
// are present.
func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer 

// Stop terminates the instancer.
func (s *Instancer) Stop()

func (s *Instancer) loop(lastIndex uint64)

func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error)

// Register implements Instancer.
func (s *Instancer) Register(ch chan<- sd.Event)

// Deregister implements Instancer.
func (s *Instancer) Deregister(ch chan<- sd.Event)

func makeInstances(entries []*consul.ServiceEntry) []string

主要包含以下5个函数

  • NewInstancer
    • 调用 getInstances 函数,获取对应的一组服务地址,构造 Instancer 结构体
  • loop
    • 循环查询调用 getInstances 函数,默认10毫秒调用一次
  • Stop
    • 关闭服务监听, getInstances 函数获得一个 errStopped,停止loop
  • Register
  • Deregister
2019/6/22 posted in  gokit