2023-08-07  阅读(315)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/264

了解了Feign是如何创建动态代理对象之后,我们就进入最后一个环节:Feign的请求流程。我们在使用Feign时,一般使用@Autowired注入,比如下面这样:

    @FeignClient("ServiceA")    // ServiceA就是服务A的名称
    public interface ServiceAClient extends ServiceAInterface {
    }
    
    @RestController
    @RequestMapping("/ServiceB/user")  
    public class ServiceBController {    
        @Autowired
        private ServiceAClient serviceA;
    
        @RequestMapping(value = "/sayHello/{id}", method = RequestMethod.GET)
        public String greeting(@PathVariable("id") Long id,@RequestParam("name") String name, 
                               @RequestParam("age") Integer age) {
            return serviceA.sayHello(id, name, age);
        }
    }

上面的代码,最终调用了ServiceAClient.sayHello(),那是如何触发代理对象的方法的执行的呢?

一、JDK动态代理

通过上一章的讲解,我们已经知道,Spring为我们注入的是ServiceAClient接口的代理对象,这个对象是通过JDK动态代理方式生成的:

    InvocationHandler handler = factory.create(target, methodToHandler);
    T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[] {target.type()}, handler);

1.1 InvocationHandler

这里我补充下JDK动态代理的知识,代理的横切逻辑就是InvocationHandler的实现类:

    public interface InvocationHandler {
        /**
         * @param   proxy 代理对象,也就是最终实现了例如ServiceAClient接口的某个代理对象
         * @param   method 接口的方法元数据
         * @param   args 方法参数
         */
        public Object invoke(Object proxy, Method method, Object[] args)throws Throwable;
    }

要使用JDK动态代理,我们需要实现上面这个接口,比如Feign中的实现是FeignInvocationHandler,它将接口的方法元数据和处理逻辑保存到一个dispatch属性中:

    static class FeignInvocationHandler implements InvocationHandler {
    
        private final Target target;
        private final Map<Method, MethodHandler> dispatch;
    
        FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
            this.target = checkNotNull(target, "target");
            this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if ("equals".equals(method.getName())) {
                try {
                    Object otherHandler =
                        args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
                    return equals(otherHandler);
                } catch (IllegalArgumentException e) {
                    return false;
                }
            } else if ("hashCode".equals(method.getName())) {
                return hashCode();
            } else if ("toString".equals(method.getName())) {
                return toString();
            }
            // 根据方法元数据,找到对应的MethodHandler
            return dispatch.get(method).invoke(args);
        }
    }

1.2 Proxy

然后使用JDK的Proxy.newProxyInstance()创建代理对象:

    // Proxy.java
    
    public static Object newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h) throws IllegalArgumentException
    {
        Objects.requireNonNull(h);
    
        final Class<?>[] intfs = interfaces.clone();
        final SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
        }
    
        /*
         * Look up or generate the designated proxy class.
         */
        Class<?> cl = getProxyClass0(loader, intfs);
    
        /*
         * Invoke its constructor with the designated invocation handler.
         */
        try {
            if (sm != null) {
                checkNewProxyPermission(Reflection.getCallerClass(), cl);
            }
    
            final Constructor<?> cons = cl.getConstructor(constructorParams);
            final InvocationHandler ih = h;
            if (!Modifier.isPublic(cl.getModifiers())) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        cons.setAccessible(true);
                        return null;
                    }
                });
            }
            return cons.newInstance(new Object[]{h});
        } catch (IllegalAccessException|InstantiationException e) {
            throw new InternalError(e.toString(), e);
        } catch (InvocationTargetException e) {
            Throwable t = e.getCause();
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new InternalError(t.toString(), t);
            }
        } catch (NoSuchMethodException e) {
            throw new InternalError(e.toString(), e);
        }
    }

当我们调用代理对象的任何方法时,都会触发InvocationHandler.invoke()的执行。

二、请求流程

了解JDK动态代理,我们来看下Feign的整个请求调用流程,我用下面这张图表述:

202308072153364461.png

比如,我调用了ServiceAClient.sayHello(),则会触发FeignInvocationHandler.invoke()的执行:

    // FeignInvocationHandler.java
    
    private final Map<Method, MethodHandler> dispatch;
    
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if ("equals".equals(method.getName())) {
            try {
                Object otherHandler =
                    args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
                return equals(otherHandler);
            } catch (IllegalArgumentException e) {
                return false;
            }
        } else if ("hashCode".equals(method.getName())) {
            return hashCode();
        } else if ("toString".equals(method.getName())) {
            return toString();
        }
    
        return dispatch.get(method).invoke(args);
    }

最终由内部的dispatch这个Map,根据方法元数据,找到对应的MethodHandler.invoke()进行处理,而MethodHandler的实现类就是SynchronousMethodHandler

2.1 SynchronousMethodHandler

SynchronousMethodHandler内部封装了Feign的各种组件,它控制了整个Feign的请求调用流程:

  1. 先构造请求模板工具类;
  2. 然后,解析请求URL,比如我们调用ServiceAClient.sayHello(),最终解析出来的URL是:GET http://ServiceA/sayHello/25 HTTP/1.1之类的形式;
  3. 接着,交给LoadBalancerFeignClient执行调用;
  4. 最后,解析响应结果。
    // SynchronousMethodHandler.java
    
    final class SynchronousMethodHandler implements MethodHandler {
      private static final long MAX_RESPONSE_BUFFER_SIZE = 8192L;
    
      private final MethodMetadata metadata;
      private final Target<?> target;
      private final Client client;
      private final Retryer retryer;
      private final List<RequestInterceptor> requestInterceptors;
      private final Logger logger;
      private final Logger.Level logLevel;
      private final RequestTemplate.Factory buildTemplateFromArgs;
      private final Options options;
      private final ExceptionPropagationPolicy propagationPolicy;
      private final Decoder decoder;
      private final AsyncResponseHandler asyncResponseHandler;
    
      @Override
      public Object invoke(Object[] argv) throws Throwable {
        // 构造请求模板工具类
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Options options = findOptions(argv);
        // Regtry组件,默认为Retryer.NEVER_RETRY
        Retryer retryer = this.retryer.clone();
        while (true) {
          try {
            // 核心:调用这个方法进行处理
            return executeAndDecode(template, options);
          } catch (RetryableException e) {
            try {
              retryer.continueOrPropagate(e);
            } catch (RetryableException th) {
              Throwable cause = th.getCause();
              if (propagationPolicy == UNWRAP && cause != null) {
                throw cause;
              } else {
                throw th;
              }
            }
            if (logLevel != Logger.Level.NONE) {
              logger.logRetry(metadata.configKey(), logLevel);
            }
            continue;
          }
        }
      }
    
      Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
        // 1.创建一个请求类,内部会用拦截器对请求URL做各种处理
        // 最终URL为:GET http://ServiceA/sayHello HTTP/1.1之类的形式
        Request request = targetRequest(template);
    
        // 2.记录日志 
        if (logLevel != Logger.Level.NONE) {
          logger.logRequest(metadata.configKey(), logLevel, request);
        }
    
        Response response;
        long start = System.nanoTime();
        try {
          // 3.发送请求,client默认为LoadBalancerFeignClient
          response = client.execute(request, options);
          // 4.解析请求结果
          response = response.toBuilder()
              .request(request)
              .requestTemplate(template)
              .build();
        } catch (IOException e) {
          if (logLevel != Logger.Level.NONE) {
            logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
          }
          throw errorExecuting(request, e);
        }
        long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    
    
        // 5.解码
        if (decoder != null)
          return decoder.decode(response, metadata.returnType());
    
        // 6.异步响应处理
        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
        asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
            metadata.returnType(),
            elapsedTime);
    
        try {
          if (!resultFuture.isDone())
            throw new IllegalStateException("Response handling not done");
    
          return resultFuture.join();
        } catch (CompletionException e) {
          Throwable cause = e.getCause();
          if (cause != null)
            throw cause;
          throw e;
        }
      }
    
      long elapsedTime(long start) {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
      }
    
      Request targetRequest(RequestTemplate template) {
        for (RequestInterceptor interceptor : requestInterceptors) {
          interceptor.apply(template);
        }
        return target.apply(template);
      }
    
      Options findOptions(Object[] argv) {
        if (argv == null || argv.length == 0) {
          return this.options;
        }
        return Stream.of(argv)
            .filter(Options.class::isInstance)
            .map(Options.class::cast)
            .findFirst()
            .orElse(this.options);
      }
    }

2.2 LoadBalancerFeignClient

我们关键看下LoadBalancerFeignClient是如何处理请求的,它的思路如下:

  1. 对请求URL进行处理,构造一个基于Ribbon的LoadBalancer;
  2. 基于Ribbon的负载均衡策略发起请求。
    // LoadBalancerFeignClient.java
    
    private CachingSpringLoadBalancerFactory lbClientFactory;
    
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            // asUri:http://ServiceA/sayHello/25
            URI asUri = URI.create(request.url());
    
            // clientName就是服务名称,比如ServiceA
            String clientName = asUri.getHost();
    
            // uriWithoutHost:http:///sayHello/25
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
    
            // 构造一个RibbonRequest请求类
            FeignLoadBalancer.RibbonRequest请求类 ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                this.delegate, request, uriWithoutHost);
    
            // 获取Client配置
            IClientConfig requestConfig = getClientConfig(options, clientName);
    
            // 关键:构造FeignLoadBalancer,执行请求
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }
    
    private FeignLoadBalancer lbClient(String clientName) {
        return this.lbClientFactory.create(clientName);
    }

上述的lbClient(clientName)就是创建了一个基于Ribbon的LoadBalancer:

    // CachingSpringLoadBalancerFactory.java
    
    public FeignLoadBalancer create(String clientName) {
        // 先看看缓存中有没有
        FeignLoadBalancer client = this.cache.get(clientName);
        if (client != null) {
            return client;
        }
    
        // Ribbon组件
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
                                                                         ServerIntrospector.class);
         // 将Ribbon的LoadBalancer包装成FeignLoadBalancer
         client = this.loadBalancedRetryFactory != null
            ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
                                             this.loadBalancedRetryFactory)
            : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }

FeignLoadBalancer仅仅是对Ribbon的LoadBalancer做了一层很简单的封装。

我们来看下FeignLoadBalancer.executeWithLoadBalancer(),这是执行请求的地方。实际上就是创建了一个LoadBalancerCommand对象,然后提交了一个任务:

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        // 1.创建一个LoadBalancerCommand
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    
        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        // 这里传进来一个Server,是基于Ribbon的负载均衡策略选出来的
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            // 这里使用了RXJava
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
    
    }

2.3 LoadBalancerCommand

我们重点关注LoadBalancerCommand.submit(),里面有一大坨代码, 用了RXJava的各种技术,忽略各种细节,关注selectServer()这个方法即可:

    // LoadBalancerCommand.java
    
    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
    
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }
    
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
    
        // 这里基于Ribbon的负载均衡策略选择一个Server
        Observable<T> o = (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
    
                    // Called for each attempt and retry
                    Observable<T> o = Observable
                        .just(server)
                        .concatMap(new Func1<Server, Observable<T>>() {
                            @Override
                            public Observable<T> call(final Server server) {
                                context.incAttemptCount();
                                loadBalancerContext.noteOpenConnection(stats);
    
                                if (listenerInvoker != null) {
                                    try {
                                        listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                    } catch (AbortExecutionException e) {
                                        return Observable.error(e);
                                    }
                                }
    
                                final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
    
                                return operation.call(server).doOnEach(new Observer<T>() {
                                    private T entity;
                                    @Override
                                    public void onCompleted() {
                                        recordStats(tracer, stats, entity, null);
                                        // TODO: What to do if onNext or onError are never called?
                                    }
    
                                    @Override
                                    public void onError(Throwable e) {
                                        recordStats(tracer, stats, null, e);
                                        logger.debug("Got error {} when executed on server {}", e, server);
                                        if (listenerInvoker != null) {
                                            listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                        }
                                    }
    
                                    @Override
                                    public void onNext(T entity) {
                                        this.entity = entity;
                                        if (listenerInvoker != null) {
                                            listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                        }
                                    }                            
    
                                    private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                        tracer.stop();
                                        loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                    }
                                });
                            }
                        });
    
                    if (maxRetrysSame > 0) 
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
    
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
    
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                                "Number of retries on next server exceeded max " + maxRetrysNext
                                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                                "Number of retries exceeded max " + maxRetrysSame
                                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }

其实就是利用了Ribbon的组件——LoadBalancerContext,最终基于负载均衡策略返回一个可用的Server:

    // LoadBalancerCommand.java
    
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    // 这里用了Ribbon的组件选择Server
                    Server server = loadBalancerContext
                        .getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }

关于Ribbon的源码讲解,我在Ribbon系列中已经详细讲解过了,这里不再赘述。

三、总结

本章,我讲解了Feign的请求调用的整体流程,核心就是基于JDK动态代理,通过SynchronousMethodHandler拦截方法调用,然后构造请求报文,最后基于底层组装的Ribbon核心组件,从应用实例列表中选择一个Server,发起调用。

阅读全文
  • 点赞