大家好 我是寸铁👊
总结了一篇【Consul】基于Go实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制✨
这应该是目前全网最全的使用golang手搓Consul服务信息机制✨
喜欢的小伙伴可以点点关注 💝
consul常常被用来作服务注册与服务发现,而它的watch机制则可被用来监控一些数据的更新,包括:nodes, KV pairs, health checks等。另外,在监控到数据变化后,还可以调用外部处理程序,此处理程序可以是任何可执行文件或HTTP调用,具体说明可见官网。
consul中的watch可以监听service,k-v,check,event等事件的变化,实时获取最新的数据。
consul支持以下watch类型:
key 监听一个consul kv中的key
keyprefix 监听consul kv中的key的前缀
services 监听有效服务的变化
nodes 监听节点的变化
service 监听服务的变化
checks 监听check的变化
event 监听自定义事件的变化
从以上可以看出consul提供非常丰富的监听类型,通过这些类型我们可以实时观测到consul整个集群中的变化,从而实现一些特别的需求,比如:服务告警,配置实时更新等功能。
Windows启动命令如下:
consul agent -dev
后台运行结果展示如下:
1.Kind:服务类型,是一个自定义类型 ServiceKind。 2.ID:服务的唯一标识符。(一个ID只能对应一个服务) 3.Service:服务的名称(一个服务可以创建多个ID) 4.Tags:服务的标签,以字符串数组形式存储。 5.Meta:服务的元数据,以键值对的形式存储。 6.Port:服务的端口号。 7.Address:服务的地址。 8.SocketPath:服务的套接字路径,可选的,以字符串形式存储。 9.TaggedAddresses:带标签的地址,以键值对的形式存储,键是标签,值是 ServiceAddress 类型。 10.Weights:服务的权重,类型为 AgentWeights。 11.EnableTagOverride:是否启用标签覆盖。 12.CreateIndex:创建索引,无符号整数类型,用于 JSON 序列化时忽略。 13.ModifyIndex:修改索引,无符号整数类型,用于 JSON 序列化时忽略。 14.ContentHash:内容哈希,字符串类型,用于 JSON 序列化时忽略。 15.Proxy:代理配置,类型为 AgentServiceConnectProxyConfig,可选的。 16.Connect:连接信息,类型为 AgentServiceConnect,可选的。 17.PeerName:对等名称,字符串类型,可选的。 18.Namespace:命名空间,字符串类型,用于 JSON 序列化时忽略。 19.Partition:分区,字符串类型,用于 JSON 序列化时忽略。 20.Datacenter:数据中心,字符串类型,用于 JSON 序列化时忽略。 21.Locality:地点信息,类型为 Locality,用于 JSON 序列化时忽略。
打印的结果如下:
服务的类型Kind: 服务的ID: him-service-1 服务的名字Service: him-service 服务的标签Tags: [tag1 tag2] 服务的元数据Meta: map[] 服务的端口Port: 8082 服务的地址Address: 127.0.0.4 服务的套接字路径SocketPath: 服务的带标签的地址TaggedAddresses: map[lan_ipv4:{127.0.0.4 8082} wan_ipv4:{127.0.0.4 8082}] 服务的权重Weights: {1 1} 服务是否启用标签覆盖EnableTagOverride: false 服务创建的索引CreateIndex: 5235 服务修改的索引ModifyIndex: 5235 服务内容哈希ContentHash: 服务代理配置Proxy: &{[] 0map[] [] {} {false []} } 服务连接信息Connect: &{false } 服务对等名称PeerName: 服务的命名空间Namespace: 服务的数据中心Datacenter: 服务的分区Partition: 服务的地点信息Locality:
同下:
package main import ( "fmt" "github.com/hashicorp/consul/api" ) func main() { //写api的配置信息 config := api.DefaultConfig() //注册到consul上的地址 config.Address = "127.0.0.1:8500" // Consul 服务器地址 //将config注册到客户端,由客户端实现 client, err := api.NewClient(config) if err != nil { panic(err) } // 创建一个新的服务条目 registration := new(api.AgentServiceRegistration) registration.ID = "my-service-3" registration.Name = "my-service" registration.Port = 8083 registration.Address = "127.0.0.1" registration.Tags = []string{"tag1", "tag2"} reg := &api.AgentServiceRegistration{ Name: registration.Name, // 服务名称 ID: registration.ID, // 服务 ID,必须唯一 Address: registration.Address, //服务的地址 Port: registration.Port, // 服务端口 服务所在的监听端口 Tags: registration.Tags, // 可选:服务标签 } // 将服务注册到 Consul err = client.Agent().ServiceRegister(reg) if err != nil { panic(err) } fmt.Println("Service registered successfully") }
package main import ( "fmt" "log" "github.com/hashicorp/consul/api" ) func main() { // 创建Consul客户端 config := api.DefaultConfig() client, err := api.NewClient(config) if err != nil { log.Fatal(err) } // 创建注销的服务ID serviceID := "my-service-2" // 注销服务 agent := client.Agent() if err := agent.ServiceDeregister(serviceID); err != nil { log.Fatal(err) } fmt.Println("Service deregistered successfully") }
package main import ( "fmt" "github.com/hashicorp/consul/api" "time" ) type Service struct { Name string ID string Address string Port int Tags []string } var serviceMap = map[string][]map[string]Service{} func main() { serviceMap = make(map[string][]map[string]Service) // 创建Consul客户端 config := api.DefaultConfig() client, err := api.NewClient(config) if err != nil { panic(err) } for { // 查询Consul客户端的服务目录,得到所有的服务名称。 catalog := client.Catalog() //得到consul上的所有服务services allServices, _, err := catalog.Services(nil) if err != nil { panic(err) } fmt.Println("所有服务services的名称:", allServices) //创建当前的ServiceMap记录当前的Service 出现则标记为true var currentServiceMap map[string]bool currentServiceMap = make(map[string]bool, 0) //遍历所有的Consul服务,将所有的Consul服务的信息存入map中。 //如服务: my-service first-service for serviceName, _ := range allServices { currentServiceMap[serviceName] = true // 通过服务名称查询服务实例 如my-service下有两个服务实例: my-service1 my-service2 // services是每个服务下的服务实例集合 services, _, err := client.Health().Service(serviceName, "", true, nil) if err != nil { panic(err) } var servcieSlice []map[string]Service servcieSlice = make([]map[string]Service, 0) // 遍历服务实例集合 for _, service := range services { fmt.Printf("Service %s:%d\n", service.Service.Service, service.Service.Port) //map先定义 var instanceMap map[string]Service //定义后用make进行创建 instanceMap = make(map[string]Service) //每一个服务的实例ID对应该服务的信息 instanceMap[service.Service.ID] = Service{ Name: service.Service.Service, ID: service.Service.ID, Address: service.Service.Address, Port: service.Service.Port, Tags: service.Service.Tags, } fmt.Println(instanceMap) servcieSlice = append(servcieSlice, instanceMap) 将serviceMap全局map之前出现过的service.Service.Service不存在则赋值为nil //if _, ok := serviceMap[service.Service.Service]; !ok { // serviceMap[service.Service.Service] = nil //} //service.Service.Service作为serviceMap的键,map[string]Service{}作为值存储入map中。 //这里最后一个值会覆盖掉同一个键前面多个值,采用一个值对应一个map的数组 serviceMap[service.Service.Service] = servcieSlice } fmt.Println(serviceMap) } //serviceMap为全局的Map,最后遍历一遍serviceMap,看一下里面的serviceName在当前的currentServiceMap中能否找到。 //如果说找不到,则把serviceMap中这个serviceName服务给删除掉。 for serviceName := range serviceMap { //遍历一遍全局的serviceMap,把不存在的服务删除掉。 //如果说currentServiceMap不存在serviceName,则把serviceName从serviceMap中移除。 if _, ok := currentServiceMap[serviceName]; !ok { delete(serviceMap, serviceName) } } time.Sleep(10 * time.Second) } }
package main import ( "fmt" "github.com/hashicorp/consul/api" ) func main() { // 创建Consul客户端配置 config := api.DefaultConfig() client, err := api.NewClient(config) if err != nil { panic(err) } // 创建WatchParams params := &api.QueryOptions{ WaitIndex: 0, // 初始的索引,设置为0表示从最新的变更开始监听 WaitTime: 1000, // 设置长轮询的等待时间,单位为秒 } // 循环监听服务变化 for { // 查询Consul客户端的服务目录,得到所有的服务名称 catalog := client.Catalog() allServices, _, err := catalog.Services(params) if err != nil { panic(err) } fmt.Println("所有服务的名称:", allServices) // 查询服务健康状态 //for serviceName := range allServices { services, _, err := client.Health().Service("my-service", "", true, params) if err != nil { panic(err) } for _, service := range services { fmt.Printf("服务: %s, 端口号: %v\n", service.Service.Service, service.Service.Port) } //} // 更新WaitIndex,以便下次长轮询从更新后的索引开始 params.WaitIndex = 0 // 使用长轮询时,将WaitIndex设置为0,以获取最新的变更 } }
consul官方提供了Golang版的watch包。其实际上也是对watch机制进行了一层封装,最终代码实现的还是对consul HTTP API 的 endpoints的使用。 文章开始说过,“在监控到数据变化后,还可以调用外部处理程序”。是了,数据变化后调用外部处理程序才是有意义的,Golang的watch包中对应的外部处理程序是一个函数handler。因为业务的关系,这里只实现了watch对service的变化的监控,其主要创建了一个plan 来对整个服务的变化做一个监控,以及再为每个服务创建一个 plan,对单个服务变化作监控。话不多说,上代码:
//Watch机制同步 package main import ( "fmt" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" "log" ) var serviceMap map[string][]map[string]Service type Service struct { Name string ID string Address string Port int Tags []string } func main() { serviceMap = make(map[string][]map[string]Service) // 创建Consul客户端 config := api.DefaultConfig() client, err := api.NewClient(config) if err != nil { panic(err) } // 初始化监视器计划 params := map[string]interface{}{"type": "services"} plan, err := watch.Parse(params) if err != nil { log.Fatal(err) } // 设置监视器的处理函数 plan.Handler = func(idx uint64, data interface{}) { services, ok := data.(map[string][]string) if !ok { log.Println("Error: Data format unexpected") return } // 重置服务映射 serviceMap = make(map[string][]map[string]Service) // 在这里阻塞住了 // 遍历服务列表 for serviceName := range services { // 查询服务实例 instances, _, err := client.Health().Service(serviceName, "", true, nil) if err != nil { log.Printf("Error retrieving instances for service %s: %v\n", serviceName, err) continue } // 创建服务实例切片 var serviceInstances []map[string]Service // 遍历服务实例 for _, instance := range instances { service := Service{ Name: instance.Service.Service, ID: instance.Service.ID, Address: instance.Service.Address, Port: instance.Service.Port, Tags: instance.Service.Tags, } instanceMap := map[string]Service{instance.Service.ID: service} serviceInstances = append(serviceInstances, instanceMap) } // 更新服务映射 serviceMap[serviceName] = serviceInstances } // 输出服务映射 fmt.Println("Updated Service Map:") for serviceName, instances := range serviceMap { fmt.Println("Service:", serviceName) for _, instance := range instances { for id, service := range instance { fmt.Printf("Instance ID: %s, Address: %s, Port: %d, Tags: %v\n", id, service.Address, service.Port, service.Tags) } } } } // 启动监视器 plan.Run("http://localhost:8500") // 保持程序运行,直到手动中断 select {} }
长时间没有响应,则进程结束。
package main import ( "context" "fmt" "github.com/hashicorp/consul/api" ) // 定义服务信息 type Service struct { Name string ID string Address string Port int Tags []string } // 全部服务信息的字典 var servicesMap = map[string]map[string]Service{} // 记录本次目录存在的服务的字典,布尔型,用于和lastServiceMap进行判断 var currentServicesMap = map[string]bool{} // 记录上次目录存在的服务的字典,布尔型,用于和currentServicesMap进行判断 var lastServiceMap = map[string]bool{} // 取消服务的协程的字典,存的是服务名和上下文cancel方法的映射,用于删除指定的goroutine var withCancelMap = map[string]context.CancelFunc{} func main() { //初始化map currentServicesMap = make(map[string]bool) lastServiceMap = make(map[string]bool) servicesMap = make(map[string]map[string]Service) withCancelMap = make(map[string]context.CancelFunc) // 创建Consul客户端 config := api.DefaultConfig() client, err := api.NewClient(config) if err != nil { panic(err) } // 创建WatchParams //params := &api.QueryOptions{ // WaitIndex: 0, // 初始的索引,设置为0表示从最新的变更开始监听 //} //配置参数放在外面,则达到获取目录信息阻塞的效果 queryOptions := &api.QueryOptions{ WaitIndex: 0, // 初始索引 } for { catalog := client.Catalog() // 查询Consul客户端的服务目录,得到所有的服务名称。 //得到consul上的所有服务services //目录发生变化再更新,不发生变化则不更新,需要在这里阻塞。 allServices, meta, err := catalog.Services(queryOptions) fmt.Println(err) if err != nil { panic(err) } //currentServicesMap记录allServices出现过的服务 for name := range allServices { currentServicesMap[name] = true } //遍历一遍上次服务的哈希表lastServiceMap for lastServiceName := range lastServiceMap { //如果说上次服务的哈希表中存在这个服务,现在遍历目录没有这个服务 if allServices[lastServiceName] == nil { //将lastServiceName从当前的currentServicesMap移除 delete(currentServicesMap, lastServiceName) } } //如果没有一个检查机制的话,这里相当于一直去读取目录,读完目录后再不断启协程 //这里就会造成不断的for死循环,所以需要一个检查机制,控制目录的更新。 //当目录没发生更新的时候则阻塞,目录发生更新了则进行检查。 //这样就确保了你目录没更新时,我启动的go func就一直在监听服务的变化即可 fmt.Println("所有服务services的名称:", allServices) //如果说协程中不启动go routine则相当于每次遍历服务时都去创建go routine 导致启动的go routine数量比较多 //每次目录发生变化后,这里就会创建新的协程,就会导致多了几个协程。 //正确的话,这里应该是先阻塞,然后如果说哪个协程监听到变化,则这个协程发消息即可 //其他的协程不用动,需要编写一个服务的检查机制,让之前启动的goroutine去监控对应的服务即可,其他不变化的go routine不用动。 fmt.Println("lastServiceMap:", lastServiceMap) fmt.Println("currentServicesMap:", currentServicesMap) for serviceName := range allServices { if currentServicesMap[serviceName] && lastServiceMap[serviceName] { //如果说上次和这次都存在该服务,说明之前已经创建过了,则跳过 fmt.Println("跳过……", serviceName) continue } else if currentServicesMap[serviceName] && !lastServiceMap[serviceName] { //如果说上次不存在,这次存在则说明要进行创建 //如果说目录多增加了一个服务,则启动多一个服务。 fmt.Println("启动协程……", serviceName) //使用上下文进行协程的注销 ctx, cancel := context.WithCancel(context.Background()) //存储一个serviceName为键、cancel()方法为值的withCancelMap withCancelMap[serviceName] = cancel go syncInfo(ctx, serviceName, client, queryOptions) //range上次的lastServiceMap } } //比较上次目录的服务信息和这次目录的服务信息 //执行删除指定go routine的操作 for lastName := range lastServiceMap { if !currentServicesMap[lastName] && lastServiceMap[lastName] { //如果说上次该服务存在,这次该服务不存在则说明要进行销毁 fmt.Println("进入删除协程函数……", lastName) //当走到这里时,执行serviceName对应的cancel方法 cacnelService := withCancelMap[lastName] cacnelService() //取消掉该服务的协程 fmt.Println("删除协程……", lastName) //由于该服务lastName已经取消了,则从serviceMap中移除掉 delete(servicesMap, lastName) //由于该服务lastName已经取消了,则从lastServiceMap中移除掉 delete(lastServiceMap, lastName) //由于该服务lastName已经取消了,则从withCancelMap中移除掉 delete(withCancelMap, lastName) } } // 赋值 currentServicesMap 的值给 lastServiceMap,用于下一次的目录信息检查。 for key, value := range currentServicesMap { lastServiceMap[key] = value } //fmt.Println(serviceMap) // 更新长轮询参数中的索引 queryOptions.WaitIndex = meta.LastIndex } } /* 将服务的各个实例的信息同步更新到全部服务信息的字典中,对开启服务的协程持续监控服务的信息变化。 当服务不存在时,执行取消服务所在的协程的操作。 */ func syncInfo(ctx context.Context, serviceName string, client *api.Client, params *api.QueryOptions) { for { select { //执行ctx绑定cancel //执行cancel时,绑定对应的ctx执行Done()方法,取消掉协程。 case <-ctx.Done(): //会有一点延迟 fmt.Println("协程取消了……") return default: //params要作为参数传入函数,这样 params := &api.QueryOptions{ WaitIndex: 0, // 初始的索引,设置为0表示从最新的变更开始监听 } fmt.Println("1111111111", serviceName) services, meta, err := client.Health().Service(serviceName, "", true, params) fmt.Println("2222222222", serviceName) if err != nil { panic(err) } //var servcieSlice []map[string]Service //servcieSlice = make([]map[string]Service, 0) // 遍历服务实例集合 //map先定义 var instanceMap map[string]Service //定义后用make进行创建 instanceMap = make(map[string]Service) for _, service := range services { //fmt.Printf("Service %s:%d\n", service.Service.Service, service.Service.Port) //每一个服务的实例ID对应该服务的信息 instanceMap[service.Service.ID] = Service{ Name: service.Service.Service, ID: service.Service.ID, Address: service.Service.Address, Port: service.Service.Port, Tags: service.Service.Tags, } //fmt.Println(instanceMap)打印出服务中每个实例的信息 servicesMap[service.Service.Service] = instanceMap } fmt.Println(instanceMap) params.WaitIndex = meta.LastIndex // 更新版本,根据和上次的不同进行变化。 } } }
https://vearne.cc/archives/13983
https://juejin.cn/post/6984378158347157512
https://juejin.cn/post/6883095345623597064
https://zhuanlan.zhihu.com/p/111673886
https://developer.hashicorp.com/consul/api-docs/catalog
看到这里的小伙伴,恭喜你又掌握了一个技能👊
希望大家能取得胜利,坚持就是胜利💪
我是寸铁!我们下期再见💕
【保姆级教程】Windows11下go-zero的etcd安装与初步使用
【保姆级教程】Windows11安装go-zero代码生成工具goctl、protoc、go-zero
【Go-Zero】手把手带你在goland中创建api文件并设置高亮
【Go-Zero】Error: user.api 27:9 syntax error: expected ‘:‘ | ‘IDENT‘ | ‘INT‘, got ‘(‘ 报错解决方案及api路由注意事项
【Go-Zero】Error: only one service expected goctl一键转换生成rpc服务错误解决方案
【Go-Zero】【error】 failed to initialize database, got error Error 1045 (28000):报错解决方案
【Go-Zero】Error 1045 (28000): Access denied for user ‘root‘@‘localhost‘ (using password: YES)报错解决方案
【Go-Zero】type mismatch for field “Auth.AccessSecret“, expect “string“, actual “number“报错解决方案
【Go-Zero】Error: user.api 30:2 syntax error: expected ‘)‘ | ‘KEY‘, got ‘IDENT‘报错解决方案
【Go-Zero】Windows启动rpc服务报错panic:context deadline exceeded解决方案
【Go面试向】defer与time.sleep初探
【Go面试向】defer与return的执行顺序初探
【Go面试向】Go程序的执行顺序
【Go面试向】rune和byte类型的认识与使用
【Go面试向】实现map稳定的有序遍历的方式