2023-09-15  阅读(264)
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105750251

自动同步配置的实现

NacosContextRefresher会监听应用程序上下文完成的事件,在事件中做配置监听器的注册:

202309152315579961.png
只注册一次:

202309152315584972.png

registerNacosListenersForApplications给所有配置设置监听器

从注册表里获取所有的NacosPropertySource属性源,也就是尝试过获取配置的配置源,有3个。如果是支持刷新的,就注册监听器。

    	private void registerNacosListenersForApplications() {
    		if (isRefreshEnabled()) {
    			for (NacosPropertySource propertySource : NacosPropertySourceRepository
    					.getAll()) {
    				if (!propertySource.isRefreshable()) {
    					continue;
    				}
    				String dataId = propertySource.getDataId();
    				registerNacosListener(propertySource.getGroup(), dataId);
    			}
    		}
    	}

比如这三个,虽然可能只有一个是有内容的,但是源码里会去尝试3个,都会封装成属性源放进来。

202309152315591503.png

registerNacosListener注册监听器到配置服务

其实就是创建一个监听器,放入注册服务,如果有刷新了,就会调用innerReceive方法,进行刷新历史的添加和刷新事件的通知,也就是你自己可以接受到这个事件后做点扩展。

    private void registerNacosListener(final String groupKey, final String dataKey) {
    		//生成一个key,用逗号连接dataid和组
    		String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
    		//放进listenerMap中,如果不存在的话就创建一个AbstractSharedListener并返回
    		Listener listener = listenerMap.computeIfAbsent(key,
    				lst -> new AbstractSharedListener() {
    					@Override
    					public void innerReceive(String dataId, String group,
    							String configInfo) {
    							//刷新个数
    						refreshCountIncrement();
    						//添加刷新历史
    						nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
    						//广播刷新事件						
    						applicationContext.publishEvent(
    								new RefreshEvent(this, null, "Refresh Nacos config"));
    						...
    					}
    				});
    		try {
    			//把监听器到添加到配置服里务
    			configService.addListener(dataKey, groupKey, listener);
    		}
    		catch (NacosException e) {
    			...
    		}
    	}

获取的key

202309152315598754.png

NacosConfigService的addListener配置服务添加监听器

其实是封装成集合添加到ClientWorker中。

     @Override
        public void addListener(String dataId, String group, Listener listener) throws NacosException {
            worker.addTenantListeners(dataId, group, Arrays.asList(listener));
        }

ClientWorker的addTenantListeners

会获取一个CacheData ,然后把监听器添加进去,其中dataId, group, tenant作为参数传进去,后面会用做key

        public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
            group = null2defaultGroup(group);
            String tenant = agent.getTenant();
            CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
        }

addCacheDataIfAbsent(服务器挂起的原因猜想)

里面做的就是先去缓存里获取,如果没有就创建一个CacheData,里面封装了监听器集合,过滤器,还有配置信息。最后重新创建一个HashMap,然后放进去,再把HashMap放入cacheMap里。这里有个cache.setInitializing(true);表示是第一次初始化的,所以请求不会被服务器挂起,如果不是第一次,那就会被挂起,为什么要这样设计,我猜是因为这样既可以避免无效的空轮询,也可以进行动态配置更新,因为在服务器挂起的那段时间里,大致是10秒钟,客户端设置超时是45秒,如果有配置修改的话,就可以立即响应,把修改后的配置取过来。具体服务器是怎么实现的,后面我们分析到了就知道啦。

        private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
            new HashMap<String, CacheData>());
    
      public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
            CacheData cache = getCache(dataId, group, tenant);//从缓存中获取
            if (null != cache) {
                return cache;//存在直接返回
            }
            String key = GroupKey.getKeyTenant(dataId, group, tenant);//拼接成key
            synchronized (cacheMap) {//
                CacheData cacheFromMap = getCache(dataId, group, tenant);
              
                if (null != cacheFromMap) {//双重检测,有缓存了
                    cache = cacheFromMap;
                    // reset so that server not hang this check
                    cache.setInitializing(true);//不让服务器挂起请求,因为有正在始化呢,这里涉及到请求头Long-Pulling-Timeout-No-Hangup,如果有在初始化的就会有这个请求头,服务器看到估计就会立即处理,不会挂起,在轮询任务的处理中会看到。这里我猜是为了让CacheData快速重新获取一次,好让新的监听器起作用,否则服务器会挂起,就要等好久响应了
                } else {//没缓存就创建
                    cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
                    // fix issue # 1317
                    if (enableRemoteSyncConfig) {//是否需要同步一次
                        String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                        cache.setContent(ct[0]);
                    }
                }
                //创建一个新的
                Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
                copy.put(key, cache);//放入cache
                cacheMap.set(copy);//设置回去
            }
            LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
            //统计信息
            MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
    
            return cache;
        }

CacheData的addListener添加监听器

根据传入的类型,调用不同的ManagerListenerWrap构造函数,添加到监听器集合里。监听器集合是CopyOnWriteArrayList,这个可以在写的时候提高性能,写的时候是复制一份去改的,原来的数据也能读,但是是旧的值,不过没关系,一般只修改一个元素,不影响到其他元素,其他元素照样可以读,旧的更新的是一样的数据。

        public void addListener(Listener listener) {
            if (null == listener) {
                throw new IllegalArgumentException("listener is null");
            }
            ManagerListenerWrap wrap = (listener instanceof AbstractConfigChangeListener) ?//是配置改变监听器还要传入内容
                new ManagerListenerWrap(listener, md5, content) : new ManagerListenerWrap(listener, md5);
    
            if (listeners.addIfAbsent(wrap)) {
                LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                    listeners.size());
            }
        }

下篇继续,看看监听器怎么被调用到的。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵

阅读全文
  • 点赞