Eureka 服务注册源码探秘——图解、源码级解析
作者:mmseoamin日期:2024-02-06

🍊 Java学习:社区快速通道


🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想


🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年5月2日


🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 引言
  • Eureka 服务注册源码
    • 寻找配置类
    • 寻找服务注册的元数据
    • register方法
    • 下一个流程
    • 继续execute

      引言

      服务注册是为了解决各个微服务的“你是谁”这个问题,即获取所有服务节点的身份信息和服务名称,站在注册中心的角度来看,有以下两种比较直观的解决方案:

      1. 由注册中心主动访问网络节点中所有机器
      2. 注册中心等待服务节点主动进行注册

      Eureka 服务注册源码探秘——图解、源码级解析,在这里插入图片描述,第1张


      目前主流的注册中心(Nacos、Eureka)都选择了第二种方案,主要原因是第一种方案有很多弊端:

      • 模型复杂: 网络结点构成了一张复杂的网,结点与结点之间的关系错综复杂,轮询每个节点的做法通常是注册中心发局域网广播,客户端响应的方式。现实中对于跨局域网的分布式系统来说,响应模型会更加复杂。

      • 网络开销大: 整个网络环境里会掺杂大量非服务节点,这些节点无需对送达的广播请求做出响应,这种广播的模式无疑增加了网络通信成本。

      • 服务端压力增加: 不仅要求注册中心向网络中所有节点主动发送广播请求,还需要对客户端的应答做出响应。考虑到注册中心的节点数远远少于服务节点,所以要尽可能地减轻服务中心承载的业务。

        一一对照着看,第二种实现方案就有如下优点:

        1. 注册中心压力小: 网络中其它非服务节点不会产生任何无效请求,也就不用做额外的判断
        2. 效率高: 省去了广播环节的时间,使注册效率大大提高
        3. 节省成本: 节省了大量网络请求的开销

        下面就来探索一下经典注册中心微服务 Eureka 服务注册源码。

        Eureka 服务注册源码

        寻找配置类

        要使用Eureka,就需要在SpringBoot的启动类上添加 @EnableDiscoveryClient 注解,所以我们的源码解析,从启动类上的 @EnableDiscoveryClient 注解开始:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第2张

        在Eureka已经启动的状态下,以debug模式启动EurekaClientApplication,会来到这里面的断点:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第3张

        其中metadata是main函数里挂的注解:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第4张

        attributes是会获得@EnableDiscoveryClient的EnableDiscoveryClient注解,接下来读取注解里面的autoRegister属性,如果是true的话,会发现之后导入了一个配置类:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第5张


        寻找服务注册的元数据

        进入到该配置类:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第6张

        继续进入到AutoServiceRegistrationProperties类里:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第7张

        这些个属性一定会在某些配置项加载的流程中应用到,大家尝试找一下哪些类会引用它。

        其中你会找到AbstractAutoServiceRegistration,发现其在初始化的流程里使用到:

        protected AbstractAutoServiceRegistration(ServiceRegistry serviceRegistry, AutoServiceRegistrationProperties properties) {
            this.serviceRegistry = serviceRegistry;
            this.properties = properties;
        }
        

        同时还发现了一个服务注册属性serviceRegistry:

        private final ServiceRegistry serviceRegistry;
        

        进入到ServiceRegistry的实现类EurekaServiceRegistry里

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第8张

        进入到第一行的方法maybeInitializeClient里:

        private void maybeInitializeClient(EurekaRegistration reg) {
            reg.getApplicationInfoManager().getInfo();
            reg.getEurekaClient().getApplications();
        }
        

        继续进入到getInfo里

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第9张

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第10张

        发现这里面的信息其实就是我们要向服务中心注册的东西。

        register方法

        接下来继续执行EurekaServiceRegistry的register方法:

        reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
        

        首先设置了instance的状态,这里reg.getInstanceConfig().getInitialStatus()是UP

        这里的register并没有发起服务调用请求,所以还要通过调用栈来继续寻找。来到上一层EurekaAutoServiceRegistration的start方法里:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第11张

        停留在的这一行往上下文中发布了一个事件InstanceRegisteredEvent,但此时我们会发现服务其实并没有注册

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第12张

        说明在event发布前后肯定发生了什么事,让eureka服务提供者向注册中心发送了请求,既然event发布之后running的状态变为了true,那确实是运行起来了。

        下一个流程

        下一个流程在DiscoveryClient里,它封装了我们服务的client和注册中心之间的各种交互,里面有一个register方法

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第13张

        跟着断点继续往下走:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第14张

        发现这里用的是SessionedEurekaHttpClient,接下来去找它的源码:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第15张

        register方法在其父类EurekaHttpClientDecorator中

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第16张

        通过一个子类实现的execute方法,参数是由父类传入的一个代理delegate,execute是由SessionedEurekaHttpClient子类实现的

        protected  EurekaHttpResponse execute(RequestExecutor requestExecutor) {
            long now = System.currentTimeMillis();
            long delay = now - this.lastReconnectTimeStamp;
            if (delay >= this.currentSessionDurationMs) {
                logger.debug("Ending a session and starting anew");
                this.lastReconnectTimeStamp = now;
                this.currentSessionDurationMs = this.randomizeSessionDuration(this.sessionDurationMs);
                TransportUtils.shutdown((EurekaHttpClient)this.eurekaHttpClientRef.getAndSet((Object)null));
            }
            EurekaHttpClient eurekaHttpClient = (EurekaHttpClient)this.eurekaHttpClientRef.get();
            if (eurekaHttpClient == null) {
                eurekaHttpClient = TransportUtils.getOrSetAnotherClient(this.eurekaHttpClientRef, this.clientFactory.newClient());
            }
            return requestExecutor.execute(eurekaHttpClient);
        }
        

        这一段代码尝试从HttpClient里拿实例,如果实例为空则会调用一个工具类的方法getOrSetAnotherClient去获取一个新的实例,但这里我们会发现其实并不为空:

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第17张

        这里调用了另一个httpclient。其中SessionedEurekaHttpClient用到了装饰器模式,主要装饰的功能是delay时间过长时重新启动一个session

        进入到下一层RetryableEurekaHttpClient,这一层装饰的功能是可以重试,默认最大重试次数为3:

        protected  EurekaHttpResponse execute(RequestExecutor requestExecutor) {
            List candidateHosts = null;
            int endpointIdx = 0;
            for(int retry = 0; retry < this.numberOfRetries; ++retry) {
                EurekaHttpClient currentHttpClient = (EurekaHttpClient)this.delegate.get();
                EurekaEndpoint currentEndpoint = null;
                if (currentHttpClient == null) {
                    if (candidateHosts == null) {
                        candidateHosts = this.getHostCandidates();
                        if (candidateHosts.isEmpty()) {
                            throw new TransportException("There is no known eureka server; cluster server list is empty");
                        }
                    }
                    if (endpointIdx >= candidateHosts.size()) {
                        throw new TransportException("Cannot execute request on any known server");
                    }
                    currentEndpoint = (EurekaEndpoint)candidateHosts.get(endpointIdx++);
                    currentHttpClient = this.clientFactory.newClient(currentEndpoint);
                }
                try {
                    EurekaHttpResponse response = requestExecutor.execute(currentHttpClient);
                    if (this.serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                        this.delegate.set(currentHttpClient);
                        if (retry > 0) {
                            logger.info("Request execution succeeded on retry #{}", retry);
                        }
                        return response;
                    }
                    logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
                } catch (Exception var8) {
                    logger.warn("Request execution failed with message: {}", var8.getMessage());
                }
                this.delegate.compareAndSet(currentHttpClient, (Object)null);
                if (currentEndpoint != null) {
                    this.quarantineSet.add(currentEndpoint);
                }
            }
            throw new TransportException("Retry limit reached; giving up on completing the request");
        }
        

        其中this.getHostCandidates();获取的是注册中心:

        private List getHostCandidates() {
            List candidateHosts = this.clusterResolver.getClusterEndpoints();
            this.quarantineSet.retainAll((Collection)candidateHosts);
            int threshold = (int)((double)((List)candidateHosts).size() * this.transportConfig.getRetryableClientQuarantineRefreshPercentage());
            if (threshold > ((List)candidateHosts).size()) {
                threshold = ((List)candidateHosts).size();
            }
            if (!this.quarantineSet.isEmpty()) {
                if (this.quarantineSet.size() >= threshold) {
                    logger.debug("Clearing quarantined list of size {}", this.quarantineSet.size());
                    this.quarantineSet.clear();
                } else {
                    List remainingHosts = new ArrayList(((List)candidateHosts).size());
                    Iterator var4 = ((List)candidateHosts).iterator();
                    while(var4.hasNext()) {
                        EurekaEndpoint endpoint = (EurekaEndpoint)var4.next();
                        if (!this.quarantineSet.contains(endpoint)) {
                            remainingHosts.add(endpoint);
                        }
                    }
                    candidateHosts = remainingHosts;
                }
            }
            return (List)candidateHosts;
        }
        

        如果坏注册中心节点的数量超过了阈值(66%),则要重启。quarantineSet存储的是失败的注册中心,remainingHosts存储的是成功的注册中心。

        继续execute

        回到上面的execute方法里,如果重试的索引大于候选注册中心的size时,就表示已知的所有注册中心都不能处理注册请求,此时会抛一个异常出来:

        if (endpointIdx >= candidateHosts.size()) {
            throw new TransportException("Cannot execute request on any known server");
        }
        currentEndpoint = (EurekaEndpoint)candidateHosts.get(endpointIdx++);
        currentHttpClient = this.clientFactory.newClient(currentEndpoint);
        

        如果在某一层execute成功了,则会将deligate设置为当前的client,如果不成功则会通过CAS操作将currentHttpClient设置为空,然后放置到失效的EurekaEndpoint加入到quarantineSet,下次不用了

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第18张

        此时还有好多层装饰器,这里直接快进跳到最后一层AbstractJerseyEurekaHttpClient中的register方法:

        public EurekaHttpResponse register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            EurekaHttpResponse var5;
            try {
                Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
                this.addExtraHeaders(resourceBuilder);
                response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);
                var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});
                }
                if (response != null) {
                    response.close();
                }
            }
            return var5;
        }
        

        在这里发送了http请求,info里面存的是当前服务的所有信息

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第19张

        这一步结束之后就注册成功了

        Eureka 服务注册源码探秘——图解、源码级解析,请添加图片描述,第20张