2023-09-15  阅读(110)
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105791536

服务发现任务图

202309152317279421.png

NamingProxy的refreshSrvIfNeed刷新服务地址

这个主要是刷新服务器地址用的,但是是根据域名去请求的,一般不设置域名这里基本都是返回了。

      private void refreshSrvIfNeed() {
            try {
    			
    			//有了服务了直接返回
                if (!CollectionUtils.isEmpty(serverList)) {
                    NAMING_LOGGER.debug("server list provided by user: " + serverList);
                    return;
                }
    			//间隔太短不行,30秒
                if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {
                    return;
                }
    
                List<String> list = getServerListFromEndpoint();
    
                if (CollectionUtils.isEmpty(list)) {
                    throw new Exception("Can not acquire Nacos list");
                }
    
                if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {
                    NAMING_LOGGER.info("[SERVER-LIST] server list is updated: " + list);
                }
    
                serversFromEndpoint = list;
                lastSrvRefTime = System.currentTimeMillis();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("failed to update server list", e);
            }
        }

EventDispatcher的Notifier

其实就是监听服务改变,然后进行通知,changedServices是个BlockingQueue阻塞队列,有改变的才会获取。

      @Override
            public void run() {
                while (true) {
                    ServiceInfo serviceInfo = null;
                    try {
                        serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                    } catch (Exception ignore) {
                    }
    
                    if (serviceInfo == null) {
                        continue;
                    }
    
                    try {
                        List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
    
                        if (!CollectionUtils.isEmpty(listeners)) {
                            for (EventListener listener : listeners) {
                                List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                                listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
                            }
                        }
    
                    } catch (Exception e) {
                        NAMING_LOGGER.error("[NA] notify error for service: "
                            + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
                    }
                }
            }

HostReactor的UpdateTask

这个是给指定的服务做更新,根据服务端发来的缓存事件,一般是10秒,请求更新一次。

      @Override
            public void run() {
                try {
                //获取服务名相关的服务信息
                    ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
    				//不存在就直接获取
                    if (serviceObj == null) {
                    	//直接更新
                        updateServiceNow(serviceName, clusters);
                        executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                        return;
                    }
    				//还没更新过,要更新后比较
                    if (serviceObj.getLastRefTime() <= lastRefTime) {
                        updateServiceNow(serviceName, clusters);
                        serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                    } else {//已经更新过了,就直接把本地的推送上去,不比较差异
                        // if serviceName already updated by push, we should not override it
                        // since the push data may be different from pull through force push
                        refreshOnly(serviceName, clusters);
                    }
    
                    lastRefTime = serviceObj.getLastRefTime();
    
                    if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
                        !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                        // abort the update task:
                        NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                        return;
                    }
    
                    executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
    
    
                } catch (Throwable e) {
                    NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
                }
    
            }

updateServiceNow更新并比较差异

获取老的,然后请求新的,然后解析json后比较,有差异就要通知。

    public void updateServiceNow(String serviceName, String clusters) {
            ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
            try {
    
                String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
    
                if (StringUtils.isNotEmpty(result)) {
                    processServiceJSON(result);
                }
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                if (oldService != null) {
                    synchronized (oldService) {
                        oldService.notifyAll();
                    }
                }
            }
        }

NamingProxy的queryList

设置参数,调用请求服务实例列表uri/nacos/v1/ns/instance/list,返回json结果。

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
            throws NacosException {
    
            final Map<String, String> params = new HashMap<String, String>(8);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put("clusters", clusters);
            params.put("udpPort", String.valueOf(udpPort));
            params.put("clientIP", NetUtils.localIP());
            params.put("healthyOnly", String.valueOf(healthyOnly));
    
            return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
        }

HostReactor的processServiceJSON

解析json为服务信息对象,然后比较新老服务信息,有改变的话就进行通知和保存到本地。

202309152317287362.png

202309152317294803.png

这样基本的任务都讲完了,接下去开始讲点细节,什么时候会进行服务获取和更新,其实一开始只是初始化,只有注册中心地址和服务名,没有任何服务实例的,只有当有请求来的时候,底层用都了ribbon做负载均衡,在创建负载均衡器的时候会进行服务列表请求更新,具体下篇说吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文
  • 点赞