go-kit 服务发现(4) Eureka
基本使用
注册
logger := log.NewLogfmtLogger(os.Stdout)
var fargoConfig fargo.Config
fargoConfig.Eureka.ServiceUrls = []string{"http://localhost:8761/eureka"}
// 订阅服务器应轮询更新的频率。
fargoConfig.Eureka.PollIntervalSeconds = 1
instance := &fargo.Instance{
InstanceId : "实例ID",
//HostName: "127.0.0.1",
Port: 8080,
App: "hello",
IPAddr: "http://127.0.0.1",
//HealthCheckUrl: "http://localhost:8080/hello",
//StatusPageUrl: "http://localhost:8080/hello",
//HomePageUrl: "http://localhost:8080/hello",
Status: fargo.UP,
DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn},
LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1},
}
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
register := eureka.NewRegistrar(&fargoConnection, instance, logger)
register.Register()
defer register.Deregister()
发现
logger := log.NewLogfmtLogger(os.Stdout)
var fargoConfig fargo.Config
fargoConfig.Eureka.ServiceUrls = []string{"http://localhost:8761/eureka"}
fargoConfig.Eureka.PollIntervalSeconds = 1
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
instancer := eureka.NewInstancer(&fargoConnection,"hello",logger)
底层原理
目录结构
.
├── doc.go
├── instancer.go 服务实例
├── instancer_test.go
├── integration_test.go
├── registrar.go 注册器
├── registrar_test.go
└── util_test.go
目录中主要的是这三个文件,instancer.go registrar.go
与sd目录下的其他注册发现组件不同,作者删除了client.go文件,改为使用fargo.EurekaConnection
registrar.go
// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
conn fargoConnection
instance *fargo.Instance
logger log.Logger
quitc chan chan struct{}
sync.Mutex
}
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar
func (r *Registrar) Register()
func (r *Registrar) Deregister()
func (r *Registrar) loop()
func (r *Registrar) heartbeat() error
包含以下3个函数
- NewRegistrar 创建 Registrar
- Register 通过 fargoConnection 注册服务
- Deregister 通过 fargoConnection 注销服务
loop
fargoConfig.Eureka.PollIntervalSeconds
设置的定时器,每隔设定的秒数(默认30秒),调用heartbeat函数- 监听
Registrar.quitc
,退出loop
heartbeat
- 调用
fargo.EurekaConnection.HeartBeatInstance
,向Eureka中注册的服务实例发送心跳请求,具体的发送方式为向指定的节点发送Http PUT请求,如PUT http://Eureka Server(s):8761/eureka/apps/APP名称/实例ID
,请求返回HTTP状态码为200即为成功
- 调用
instancer.go
type Instancer struct {
cache *instance.Cache
conn fargoConnection
app string
logger log.Logger
quitc chan chan struct{}
}
go-kit 服务发现(1) consul
服务发现
一、什么是服务?
OASIS将服务定义为“一种允许访问一个或多个功能的机制,其中使用指定的接口提供访问,并按照服务描述指定的约束和策略执行访问”。😰😰😰
- 业务模块(user/mission/vip)
- 基础组件(ipdb/uuid)
- 缓存服务(redis/memcached)
- 持久化服务(Mysql/ELS/MNS)
- 网络服务(nginx/lb)
- ...
二、什么是服务发现?
调用方无需知道服务提供者的网络位置(ip:端口等),只需通过服务名称(如user/item/mission),即可调用服务
三、为什么需要服务发现?
在现代的基于云计算的微服务应用中,服务实例会被动态地分配网络地址。并且,因为自动伸缩、故障和升级,服务实例会动态地改变。故而,你的客户端代码需要用一种更加精密的服务发现机制。而不是偶尔更新的配置文件中读取到网络地址。
场景1: 需要新上线一个服务:
- 提供者:配置域名、nginx、负载均衡、部署代码
- 调用方:配置服务域名,调用具体业务
场景2: 某个热点事件的出现,导致流量爆增,需要扩容:
- 提供者:配置nginx、负载均衡、部署代码
服务发现场景1: 需要新上线一个服务:
- 提供者:部署代码(包含注册服务)
- 调用方:部署代码(包含查询服务)
服务发现场景2: 某个热点事件的出现,导致流量爆增,需要扩容:
- 提供者:部署代码(可配置自动扩容)
四、服务发现的流程
Title: 客户端发现
提供者->注册中心: 注册服务
提供者-->注册中心: 健康检查
消费者->注册中心: 查询服务提供者网络信息
注册中心->消费者: Ip:192.168.*.* Domain:8787
消费者->>提供者: 访问服务
Title: 服务端发现
提供者->注册中心: 注册服务
提供者-->注册中心: 健康检查
负载均衡器->注册中心: 查询服务提供者网络信息
消费者->负载均衡器: 查询服务提供者网络信息
负载均衡器->消费者: 转发 Ip:192.168.*.* Domain:8787
注册中心->负载均衡器: Ip:192.168.*.* Domain:8787
消费者->>提供者: 访问服务
客户端 | 服务端 | |
---|---|---|
请求数 | 少一次 | 多一次 |
消费者逻辑 | 内置服务发现逻辑 | 无需客户端服务发现逻辑 |
业界使用 | 多一些 | 少一些 |
五、服务发现的现有解决方案
ZooKeeper | Etcd | Eureka | Consul | DNSSrv | |
---|---|---|---|---|---|
多数据中心 | ✅ | ||||
自带服务发现 | ✅ | ✅ | |||
自带健康检查 | ✅ | ✅ | |||
自带WebUi | ✅ | ✅ | |||
分布式Key/Value存储 | ✅ | ✅ | ✅ | ||
开源 | ✅ | ✅ | 2.0闭源 | ✅ | |
一致性 | paxos | raft | raft | ||
监控 | metrics | metrics | metrics | ||
使用接口(多语言能力) | 客户端 | http/grpc | http | http/dns | |
CAP | cp | cp | ap | cp | |
开发语言 | JAVA | GO | JAVA | GO |
源码原理
├── endpoint_cache_test.go
├── endpoint_cache.go
├── endpointer.go
├── endpointer_test.go
├── instancer.go
├── factory.go
├── benchmark_test.go
├── registrar.go
├── doc.go
├── etcd
│ ├── client_test.go
│ ├── client.go
│ ├── integration_test.go
│ ├── registrar.go
│ ├── registrar_test.go
│ ├── example_test.go
│ ├── instancer.go
│ ├── instancer_test.go
│ └── doc.go
├── zk
│ ├── client.go
│ ├── integration_test.go
│ ├── client_test.go
│ ├── instancer_test.go
│ ├── util_test.go
│ ├── instancer.go
│ ├── registrar.go
│ ├── logwrapper.go
│ └── doc.go
├── consul
│ ├── instancer_test.go
│ ├── instancer.go
│ ├── client_test.go
│ ├── integration_test.go
│ ├── client.go
│ ├── registrar.go
│ ├── registrar_test.go
│ └── doc.go
├── etcdv3
│ ├── integration_test.go
│ ├── client.go
│ ├── registrar_test.go
│ ├── registrar.go
│ ├── example_test.go
│ ├── instancer.go
│ ├── instancer_test.go
│ └── doc.go
├── lb(负载均衡)
│ ├── retry_test.go
│ ├── retry.go (多次尝试请求Endpoint)
│ ├── round_robin_test.go
│ ├── random_test.go
│ ├── round_robin.go (轮询调度Endpoint)
│ ├── random.go (随机选择Endpoint)
│ ├── balancer.go (包含一个Endpoint的接口)
│ └── doc.go
├── eureka
│ ├── util_test.go
│ ├── registrar.go
│ ├── integration_test.go
│ ├── registrar_test.go
│ ├── instancer.go
│ ├── instancer_test.go
│ └── doc.go
├── dnssrv(通过net包的dns客户端,通过SRV记录实现服务发现 [DNS SRV介绍](https://www.lijiaocn.com/%E6%8A%80%E5%B7%A7/2017/03/06/dns-srv.html))
│ ├── instancer.go
│ ├── instancer_test.go
│ ├── lookup.go
│ └── doc.go
└── internal(内部通过管道实现的应用内服务发现)
└── instance
consul
基本使用
注册
var client consulsd.Client
{
consulConfig := api.DefaultConfig()
consulConfig.Address = "localhost:8500"
consulClient, err := api.NewClient(consulConfig)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
client = consulsd.NewClient(consulClient)
}
check := api.AgentServiceCheck{
HTTP: "http://127.0.0.1:8080/health",
Interval: "10s",
Timeout: "1s",
Notes: "基础监控检查",
}
num := rand.Intn(100) // to make service ID unique
register := consulsd.NewRegistrar(client, &api.AgentServiceRegistration{
ID: "hello" + strconv.Itoa(num),
Name: "hello",
Tags: []string{"hello", "hi"},
Port: 8080,
Address: "http://127.0.0.1",
Check: &check,
}, logger)
register.Register()
发现
var client consulsd.Client
{
consulConfig := api.DefaultConfig()
consulConfig.Address = "http://localhost:8500"
consulClient, err := api.NewClient(consulConfig)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
client = consulsd.NewClient(consulClient)
}
tags := []string{}
passingOnly := true
duration := 500 * time.Millisecond
ctx := context.Background()
factory := helloFactory(ctx, "GET", "hello")
instancer := consulsd.NewInstancer(client, logger, "hello", tags, passingOnly)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(1, duration, balancer)
res, _ := retry(ctx, struct{}{})
底层原理
目录结构
.
├── client.go
├── client_test.go
├── doc.go
├── instancer.go
├── instancer_test.go
├── integration_test.go
├── registrar.go
└── registrar_test.go
目录中主要的是这三个文件,client.go instancer.go registrar.go
client.go
// Client is a wrapper around the Consul API.
type Client interface {
// Register a service with the local agent.
Register(r *consul.AgentServiceRegistration) error
// Deregister a service with the local agent.
Deregister(r *consul.AgentServiceRegistration) error
// Service
Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
}
type client struct {
consul *consul.Client
}
// NewClient returns an implementation of the Client interface, wrapping a
// concrete Consul client.
func NewClient(c *consul.Client) Client {
return &client{consul: c}
}
func (c *client) Register(r *consul.AgentServiceRegistration) error {
return c.consul.Agent().ServiceRegister(r)
}
func (c *client) Deregister(r *consul.AgentServiceRegistration) error {
return c.consul.Agent().ServiceDeregister(r.ID)
}
func (c *client) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
return c.consul.Health().Service(service, tag, passingOnly, queryOpts)
}
主要包含以下4个函数
- NewClient 创建consul客户端
- Register 注册服务
- Deregister 注销服务
- Service 获取服务/发现服务
registrar.go
// Registrar registers service instance liveness information to Consul.
type Registrar struct {
client Client
registration *stdconsul.AgentServiceRegistration
logger log.Logger
}
// NewRegistrar returns a Consul Registrar acting on the provided catalog
// registration.
func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar {
return &Registrar{
client: client,
registration: r,
logger: log.With(logger, "service", r.Name, "tags", fmt.Sprint(r.Tags), "address", r.Address),
}
}
// Register implements sd.Registrar interface.
func (p *Registrar) Register() {
if err := p.client.Register(p.registration); err != nil {
p.logger.Log("err", err)
} else {
p.logger.Log("action", "register")
}
}
// Deregister implements sd.Registrar interface.
func (p *Registrar) Deregister() {
if err := p.client.Deregister(p.registration); err != nil {
p.logger.Log("err", err)
} else {
p.logger.Log("action", "deregister")
}
}
包含以下3个函数
- NewRegistrar 创建 Registrar
- Register 调用 client.go中 Register 方法
- Deregister 调用 client.go中 Deregister 方法
instancer.go
// Instancer yields instances for a service in Consul.
type Instancer struct {
cache *instance.Cache
client Client
logger log.Logger
service string
tags []string
passingOnly bool
quitc chan struct{}
}
// NewInstancer returns a Consul instancer that publishes instances for the
// requested service. It only returns instances for which all of the passed tags
// are present.
func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer
// Stop terminates the instancer.
func (s *Instancer) Stop()
func (s *Instancer) loop(lastIndex uint64)
func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error)
// Register implements Instancer.
func (s *Instancer) Register(ch chan<- sd.Event)
// Deregister implements Instancer.
func (s *Instancer) Deregister(ch chan<- sd.Event)
func makeInstances(entries []*consul.ServiceEntry) []string
主要包含以下5个函数
- NewInstancer
- 调用 getInstances 函数,获取对应的一组服务地址,构造 Instancer 结构体
- loop
- 循环查询调用 getInstances 函数,默认10毫秒调用一次
- Stop
- 关闭服务监听, getInstances 函数获得一个 errStopped,停止loop
- Register
- Deregister
UUID那些事儿(中)
不遵守UUID协议规范的唯一识别码生成
twitter/snowflake
【此节转载】
Snowflake算法核心
把时间戳,工作机器id,序列号组合在一起。
除了最高位bit标记为不可用以外,其余三组bit占位均可浮动,看具体的业务需求而定。默认情况下41bit的时间戳可以支持该算法使用到2082年,10bit的工作机器id可以支持1023台机器,序列号支持1毫秒产生4095个自增序列id。下文会具体分析。
Snowflake – 时间戳
这里时间戳的细度是毫秒级,具体代码如下,建议使用64位linux系统机器,因为有vdso,gettimeofday()在用户态就可以完成操作,减少了进入内核态的损耗。
uint64_t generateStamp()
{
timeval tv;
gettimeofday(&tv, 0);
return (uint64_t)tv.tv_sec * 1000 + (uint64_t)tv.tv_usec / 1000;
}
默认情况下有41个bit可以供使用,那么一共有T(1llu << 41)毫秒供你使用分配,年份 = T / (3600 * 24 * 365 * 1000) = 69.7年。如果你只给时间戳分配39个bit使用,那么根据同样的算法最后年份 = 17.4年。
Snowflake – 工作机器id
严格意义上来说这个bit段的使用可以是进程级,机器级的话你可以使用MAC地址来唯一标示工作机器,工作进程级可以使用IP+Path来区分工作进程。如果工作机器比较少,可以使用配置文件来设置这个id是一个不错的选择,如果机器过多配置文件的维护是一个灾难性的事情。
这里的解决方案是需要一个工作id分配的进程,可以使用自己编写一个简单进程来记录分配id,或者利用Mysql auto_increment机制也可以达到效果。
工作进程与工作id分配器只是在工作进程启动的时候交互一次,然后工作进程可以自行将分配的id数据落文件,下一次启动直接读取文件里的id使用。
PS:这个工作机器id的bit段也可以进一步拆分,比如用前5个bit标记进程id,后5个bit标记线程id之类
Snowflake – 序列号
序列号就是一系列的自增id(多线程建议使用atomic),为了处理在同一毫秒内需要给多条消息分配id,若同一毫秒把序列号用完了,则“等待至下一毫秒”。
uint64_t waitNextMs(uint64_t lastStamp)
{
uint64_t cur = 0;
do {
cur = generateStamp();
} while (cur <= lastStamp);
return cur;
}
总体来说,是一个很高效很方便的GUID产生算法,一个int64_t字段就可以胜任,不像现在主流128bit的GUID算法,即使无法保证严格的id序列性,但是对于特定的业务,比如用做游戏服务器端的GUID产生会很方便。另外,在多线程的环境下,序列号使用atomic可以在代码实现上有效减少锁的密度。
语言实现
- (官方实现-不维护)scala:https://github.com/twitter/snowflake/releases/tag/snowflake-2010
- 不同语言github均有各自实现,下面主要介绍PHP实现
PHP实现
PHP的劣势
snowflake算法需求在同一毫秒生成的code的序列号自增,由于PHP在语言级别上没有办法让某个对象常驻内存,所以需要借助的其他的方法实现。
扩展
php_snowflake,通过扩展实现了让PHP支持全局变量,原理和php的生命周期有关
PHP的生命周期
多进程SAPI生命周期
通常PHP是编译为apache的一个模块来处理PHP请求。Apache一般会采用多进程模式, Apache启动后会fork出多个子进程,每个进程的内存空间独立,每个子进程都会经过开始和结束环节, 不过每个进程的开始阶段只在进程fork出来以来后进行,在整个进程的生命周期内可能会处理多个请求。 只有在Apache关闭或者进程被结束之后才会进行关闭阶段,在这两个阶段之间会随着每个请求重复请求开始-请求关闭的环节。 如图2.2所示:
多线程的SAPI生命周期
多线程模式和多进程中的某个进程类似,不同的是在整个进程的生命周期
实现
PHP扩展开发提供了注册module全局变量的功能
http://php.net/manual/zh/internals2.structure.globals.php
小细节:
由于进程关闭后序列会重置,所以算法使用的进程号+序列号保证唯一
提供了service_no可以分布式部署
此扩展的缺点
- 使用的的32位的字符串,占用空间更大
- 机器码使用的是进程号,分布式系统下有概率重复(使用不同的service_no则不会)
- 和snowflake的官方算法有差异,时间戳直接使用当前毫秒数,而非当前时间戳减去固定值(其实使用字符串形式这样更加易读,snowflake官方算法是在为了用更小的位数支持更长的时间)
参考
UUID那些事儿(下)
其他的方法
业务属性
- 滴滴:时间+起点编号+车牌号
- 淘宝订单:时间戳+用户ID
- 其他电商:时间戳+下单渠道+用户ID,有的会加上订单第一个商品的ID。
举例roll业务场景--用户参与roll房间记录
时间+roll房间id+用户id
数据库方案-Flicker的解决方案
自增id | 调用方ip |
---|---|
5 | 192.168.2.13 |
3 | 192.168.2.15 |
6 | 192.168.3.15 |
Redis分布式方案
推荐
美团leaf
是现有大部分方案的优化方案
其他
参考
UUID那些事儿(上)
注意
UUID生成算法不保证返回值绝对唯一
170亿分之1的重复概率(数据来源维基百科)
UUID
UUID 是 通用唯一识别码(Universally Unique Identifier),UUID的目的,是让分散式系统中的所有元素,都能有唯一的辨识资讯,而不需要透过中央控制端来做辨识资讯的指定。
目前最广泛应用的UUID,是微软公司的全局唯一标识符(GUID)
GUID
全局唯一标识符,简称GUID(Globally Unique Identifier),通常表示成32个16进制数字(0-9,A-F)组成的字符串,如:{21EC2020-3AEA-1069-A2DD-08002B30309D},它实质上是一个128位长的二进制整数。
UUID的格式
xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx
8位 4位 4位 4位 12位
M那个位置,代表版本号,由于UUID的标准实现有5个版本,所以只会是1,2,3,4,5
N那个位置,代表变种(Variant),一般只会是8,9,a,b
rfc4122协议10XX四位只能表示成8,9,A(16进制的10),B(16进制的11)
VariantNCS(已经过时的老古董,NCS=Apollo Network Computing System) 0xxx
VariantRFC4122(本文档) 10XX
VariantMicrosoft(微软) 110x
VariantFuture(未来) 111X
各个版本简介
http://www.uuid.online/ 可以演示不同版本的UUID
UUID目前协议规定的版本有5个,其中1、3、4版本较为常用
版本1
基于时间戳+MAC地址
通过当前时间戳、机器MAC地址生成;
由于在算法中使用了MAC地址,这个版本的UUID可以保证在全球范围的唯一性。
但与此同时,因为它暴露了电脑的MAC地址和生成这个UUID的时间,这就是这个版本UUID被诟病的地方。
版本2(大部分不使用)
基于时间戳+MAC地址
DCE安全的UUID和基于时间的UUID算法相同,但会把时间戳的前4位置换为POSIX的UID或GID。
不过,在UUID的规范里面没有明确地指定,所以基本上所有的UUID实现都不会实现这个版本。
版本3
基于命名空间的UUID(使用MD5散列化命名空间)
由用户指定1个namespace和1个具体的字符串,通过MD5散列,来生成1个UUID;
其中namespace根据协议规定,一般
domain name system, URLs, ISO Object IDs (OIDs), X.500 Distinguished
Names (DNs), and reserved words in a programming language.
版本4
基于随机数的UUID
最简单常用的一种
版本5
基于命名空间的UUID(使用SHA1散列化命名空间)
代码示例
import uuid
print "v1=",uuid.uuid1()
print "v3=",uuid.uuid3(uuid.NAMESPACE_DNS, "myString")
print "v4=",uuid.uuid4()
print "v5=",uuid.uuid5(uuid.NAMESPACE_DNS, "myString")
//java只实现了v3和v4
import java.util.UUID;
public class code
{
public static void main(String[] args)
{
System.out.println("v3="+UUID.nameUUIDFromBytes("myString".getBytes()).toString());
System.out.println("v4="+UUID.randomUUID());
}
}
UUID和各个编程语言
注:PHP内置函数uniqid并不是uuid协议的实现
- PHP:https://github.com/ramsey/uuid
- Java:http://docs.oracle.com/javase/7/docs/api/java/util/UUID.html
- Golang:https://godoc.org/github.com/satori/go.uuid
- Android:http://developer.android.com/reference/java/util/UUID.html
- IOS:https://developer.apple.com/documentation/foundation/uuid
- nodejs - https://www.npmjs.com/package/uuid
- 微软:http://msdn.microsoft.com/en-us/library/system.guid(v=vs.110).aspx
- Linux:http://en.wikipedia.org/wiki/Util-linux
- MySQL:http://dev.mysql.com/doc/refman/5.1/en/miscellaneous-functions.html#function_uuid
PHP uniqid
//prefix 自定义字符串,如服务器的mac地址,ip等,保证多台机器同一微秒生成的值不重复
//more_entropy 会在返回的字符串结尾增加额外的熵,增加随机性
string uniqid ([ string $prefix = "" [, bool $more_entropy = false ]] )
获取一个带前缀、基于当前时间微秒数的唯一ID。
//代码示例
echo uniqid();//5a2cf3024ee36
//部分PHP源码
//uniqid.c
gettimeofday((struct timeval *) &tv, (struct timezone *) NULL);
sec = (int) tv.tv_sec;
usec = (int) (tv.tv_usec % 0x100000);
if (more_entropy) {
spprintf(&uniqid, 0, "%s%08x%05x%.8F", prefix, sec, usec, php_combined_lcg(TSRMLS_C) * 10);
} else {
spprintf(&uniqid, 0, "%s%08x%05x", prefix, sec, usec);
}
参考
Copyright © 2015 Powered by MWeb, Theme used GitHub CSS.