2023-09-15  阅读(223)
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105615896

大致流程图

202309152311102431.png

服务端接受服务注册

ApplicationResourceaddInstance用来接受客户端发来的POST请求:

202309152311110862.png
主要的是最后的注册到注册表,第二个参数是表示是否要进行集群同步复制,因为如果是其他结点同步过来的信息,就不需要再同步给其他结点了,就重复了,如果是客户端发来的信息,那就要给其他节点同步了。

202309152311119223.png

PeerAwareInstanceRegistryImpl的register

先进行注册,然后同步到其他结点。

        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            super.register(info, leaseDuration, isReplication);
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);//同步给其他结点
        }

AbstractInstanceRegistry的register

首先看注册表有没有,没有的话就创建一个,然后获取是否有相同的实例在了,有的话就进行数据的更新,然后创建一个新的续约,更新事件,最后让读写缓存失效。

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                read.lock();
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//注册表获取
                REGISTER.increment(isReplication);
                if (gMap == null) {//第一次为空
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//不存在就放入gNewMap
                    if (gMap == null) {//没映射
                        gMap = gNewMap;
                    }
                }
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//获取实例id对应的租约信息
                // Retain the last dirty timestamp without overwriting it, if there is already a lease
                if (existingLease != null && (existingLease.getHolder() != null)) {//存在的话
                    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    
                    // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                    // InstanceInfo instead of the server local copy.
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                                " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                        registrant = existingLease.getHolder();//取最新的
                    }
                } else {
                    // The lease does not exist and hence it is a new registration
                    synchronized (lock) {//更新续约数量,注册一个新的实例
                        if (this.expectedNumberOfClientsSendingRenews > 0) {
                            // Since the client wants to register it, increase the number of clients sending renews
                            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                            updateRenewsPerMinThreshold();
                        }
                    }
                    logger.debug("No previous lease information found; it is new registration");
                }
                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);//创建租约
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                gMap.put(registrant.getId(), lease);//放进实例名和租约的映射集合
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));//加入到租约注册队列中
                // This is where the initial state transfer of overridden status happens
                if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {//覆盖信息不为unknow的要添加到overriddenInstanceStatusMap
                    logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                    + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                    if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                        logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                        overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                    }
                }
                InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
                if (overriddenStatusFromMap != null) {
                    logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                    registrant.setOverriddenStatus(overriddenStatusFromMap);//设置覆盖信息
                }
    
                // Set the status based on the overridden status rules
                InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
                registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
                // If the lease is registered with UP status, set lease service up timestamp
                if (InstanceStatus.UP.equals(registrant.getStatus())) {
                    lease.serviceUp();//记录启动时间
                }
                registrant.setActionType(ActionType.ADDED);//标记为添加
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                registrant.setLastUpdatedTimestamp();//设置最后更新时间
                invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());//失效缓存
                logger.info("Registered instance {}/{} with status {} (replication={})",
                        registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
            } finally {
                read.unlock();
            }
        }

invalidateCache失效缓存

缓存是存在在ResponseCache中的。

202309152311124704.png
失效很多中缓存,比如增量,全量,服务等。

202309152311130225.png
这些信息都存在读写锁里readWriteCacheMap,这个是一个谷歌缓存框架提供的类:

202309152311138266.png

PeerAwareInstanceRegistryImpl的replicateToPeers

会同步到除去自己的其他结点中,最后调用replicateInstanceActionsToPeers

202309152311149767.png
放在一个批处理的处理器当中发送,最后还是jersey去完成发送:

202309152311157138.png

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文
  • 点赞