2023-08-17  阅读(340)
原文作者:LifeIsForSharing 原文地址:https://solang.blog.csdn.net/article/details/105101316

并发:功能强大而简单的抽象,让编写正确的并发代码更加容易。

ListenableFuture:完成后回调的Future
Service:启动和关闭的服务,为你处理复杂的状态逻辑。

1.ListenableFuture

并发是一个困难的问题,但是通过使用功能强大且简单的抽象可以大大简化并发。为了简化问题,Guava使用ListenableFuture扩展了JDK的Future接口。

我们强烈建议你在所有代码中始终使用ListenableFuture而不是Future ,因为:

  • 大多数Futures方法都需要它。
  • 比以后更改为ListenableFuture更容易。
  • 工具方法的提供者无需提供其方法的FutureListenableFuture变体。

1.1接口

传统的Future表示异步计算的结果:可能已经或可能尚未完成产生结果的计算。Future可以作为正在进行的计算的句柄,是服务向我们提供结果的承诺。

ListenableFuture允许你在计算完成后或在计算已经完成时立即注册要执行的回调。这个简单的附加功能使它可以有效地支持基本Future接口无法支持的许多操作。

ListenableFuture添加的基本操作是addListener(Runnable, Executor),它指定当此Future表示的计算完成时,指定的Runnable将在指定的Executor上运行。

1.2添加回调

大多数用户更喜欢使用Futures.addCallback(ListenableFuture, FutureCallback, Executor)FutureCallback实现两种方法:

1.3创建

对应于JDK的ExecutorService.submit(Callable)方法来启动异步计算,Guava提供了ListeningExecutorService接口,该接口在ExecutorService返回正常Future的任何地方都返回ListenableFuture。要将ExecutorService转换为ListeningExecutorService,只需使用MoreExecutors.listeningDecorator(ExecutorService)

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture<Explosion> explosion = service.submit(
        new Callable<Explosion>() {
          public Explosion call() {
            return pushBigRedButton();
          }
        });
    Futures.addCallback(
        explosion,
        new FutureCallback<Explosion>() {
          // we want this handler to run immediately after we push the big red button!
          public void onSuccess(Explosion explosion) {
            walkAwayFrom(explosion);
          }
          public void onFailure(Throwable thrown) {
            battleArchNemesis(); // escaped the explosion!
          }
        },
        service);

另外,如果你要从基于FutureTask的API进行转换,则Guava提供了ListenableFutureTask.create(Callable)ListenableFutureTask.create(Runnable, V)。与JDK不同,ListenableFutureTask不能直接扩展。

如果你更喜欢抽象的方式设置future值,而不是实现一种计算该值的方法,请考虑扩展AbstractFuture或直接使用SettableFuture

如果必须将另一个API提供的Future转换为ListenableFuture,则别无选择,只能使用重量级的JdkFutureAdapters.listenInPoolThread(Future)Future转换为ListenableFuture。只要有可能,最好修改原始代码以返回ListenableFuture

1.4应用

使用ListenableFuture的最重要原因是可以拥有复杂的异步操作链。

    ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
    AsyncFunction<RowKey, QueryResult> queryFunction =
      new AsyncFunction<RowKey, QueryResult>() {
        public ListenableFuture<QueryResult> apply(RowKey rowKey) {
          return dataService.read(rowKey);
        }
      };
    ListenableFuture<QueryResult> queryFuture =
        Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);

ListenableFuture可以有效地支持许多其他操作,而单独的Future不能支持。不同的执行者可以执行不同的操作,并且单个ListenableFuture可以有多个操作在等待它。

当多个操作应该在另一个操作启动时立即开始时——“扇出”——ListenableFuture只起作用:它触发所有请求的回调。稍微多做一些工作,我们可以“扇入”或触发一个ListenableFuture,以便在其他几个future都完成后立即进行计算:有关示例,请参见Futures.allAsList的实现

方法 描述 参见
transformAsync(ListenableFuture,AsyncFunction,Executor)* 返回一个新的ListenableFuture,其结果是将给定的AsyncFunction应用于给定ListenableFuture的结果的产物。 transformAsync(ListenableFuture,AsyncFunction)
transform(ListenableFuture,Function,Executor) 返回一个新的ListenableFuture,其结果是将给定的Function应用于给定ListenableFuture的结果的产物。 transform(ListenableFuture,Function)
allAsList(Iterable>) 返回一个ListenableFuture,其值是按顺序包含每个输入future的值的列表。如果任何一个输入future失败或被取消,则该future失败或被取消。 allAsList(ListenableFuture...)
successfulAsList(Iterable>) 返回一个ListenableFuture,其值是按顺序包含每个成功输入future的值的列表。与失败或取消的future相对应的值将替换为null。 successfulAsList(ListenableFuture...)

* AsyncFunction<A, B>提供一个方法ListenableFuture<B> apply(A input)。它可以用于异步转换值。

    List<ListenableFuture<QueryResult>> queries;
    // The queries go to all different data centers, but we want to wait until they're all done or failed.
    
    ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries);
    
    Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);

1.5避免嵌套Future

在代码调用通用接口并返回Future的情况下,可能会以嵌套的Futures结尾。例如:

    executorService.submit(new Callable<ListenableFuture<Foo>() {
      @Override
      public ListenableFuture<Foo> call() {
        return otherExecutorService.submit(otherCallable);
      }
    });

将返回一个ListenableFuture<ListenableFuture<Foo>>。这段代码是不正确的,因为如果外部future的取消与外部future的完成进行竞争,则该取消将不会传播到内部future。使用get()或监听器检查另一个future是否失败也是常见的错误,但是除非特别小心,否则从otherCallable抛出的异常将被抑制。为了避免这种情况,Guava的所有future处理方法(以及JDK中的某些方法)都具有*Async版本,可以安全地解开此嵌套——transform(ListenableFuture, Function, Executor)transformAsync(ListenableFuture, AsyncFunction, Executor)ExecutorService.submit(Callable)submitAsync(AsyncCallable, Executor)等等。

2.Service

Guava Service接口表示一个具有操作状态的对象,并带有启动和停止的方法。例如,Web服务器,RPC服务器和计时器可以实现Service接口。管理这些服务的状态(需要适当的启动和关闭管理)并非易事,特别是在涉及多线程或日程调度schedule的情况下。Guava提供了一些框架来为你管理状态逻辑和同步细节。

2.1使用Service

服务Service的正常生命周期是

已停止的服务无法重新启动。如果服务在启动、运行或停止的地方失败,它将进入Service.State.FAILED状态。

如果服务是NEW,则可以使用startAsync()异步启动服务。因此,你应该将应用程序结构化为在每个服务启动时都有唯一的位置(统一)。

使用异步stopAsync()方法来停止服务也是类似的。但是与startAsync()不同,多次调用此方法是安全的。这使得处理关闭服务时可能发生的竞争成为可能。

服务还提供了几种方法来等待服务转换完成。

  • 异步使用addListener()addListener()允许你添加一个Service.Listener,它将在服务的每个状态转换时调用。注意:如果在添加监听器时服务不是NEW新建的,那么任何已经发生的状态转换都不会在监听器上重新触发。
  • 同步使用awaitRunning()。这是不中断的,不会抛出已检查的异常,并在服务启动完成后返回。如果服务启动失败,则会抛出IllegalStateException。同样,awaitTerminated()等待服务达到终端状态(TERMINATEDFAILED)。两种方法都具有重载的允许指定超时时间。

Service接口是微妙而复杂的。我们不建议直接实现它。相反,请使用guava中的抽象基类之一作为实现的基础。每个基类都支持特定的线程模型。

2.2实现

2.2.1AbstractIdleService

AbstractIdleService框架实现了Service,该服务在处于“运行”状态时不需要执行任何操作——因此在运行时不需要线程——但具有要执行的启动和关闭操作。实现这样的服务与扩展AbstractIdleService以及实现startUp()shutDown()方法一样容易。

    protected void startUp() {
      servlets.add(new GcStatsServlet());
    }
    protected void shutDown() {}

请注意,对GcStatsServlet的任何查询都已经有一个在运行的线程。在服务运行时,我们不需要该服务自行执行任何操作。

2.2.2AbstractExecutionThreadService

AbstractExecutionThreadService在单个线程中执行启动、运行和关闭操作。你必须重写run()方法,并且它必须响应停止请求。例如,你可以在工作循环中执行操作:

    public void run() {
      while (isRunning()) {
        // perform a unit of work
      }
    }

或者,你可以以任何方式重写,从而使run()返回。

重写startUp()shutDown()是可选的,但是将为你管理服务状态。

    protected void startUp() {
      dispatcher.listenForConnections(port, queue);
    }
    protected void run() {
      Connection connection;
      while ((connection = queue.take() != POISON)) {
        process(connection);
      }
    }
    protected void triggerShutdown() {
      dispatcher.stopListeningForConnections(queue);
      queue.put(POISON);
    }

请注意,start()调用你的startUp()方法,为你创建一个线程,并在该线程中调用run()stop()调用triggerShutdown()方法并等待线程死亡。

2.2.3AbstractScheduledService

AbstractScheduledService在运行时执行一些周期性任务。子类实现runOneIteration()来指定任务的一次迭代,以及熟悉的startUp()shutDown()方法。

要描述执行日程调度schedule,你必须实现scheduler()方法。通常,你将使用AbstractScheduledService.Scheduler提供的日程schedule之一,newFixedRateSchedule(initialDelay, delay, TimeUnit)newFixedDelaySchedule(initialDelay, delay, TimeUnit),与ScheduledExecutorService中熟悉的方法相对应。可以使用CustomScheduler来实现自定义日程调度schedule;有关详细信息,请参见Javadoc。

2.2.4AbstractService

当你需要执行自己的手动线程管理时,请直接重写AbstractService。通常,上述实现之一应该可以为你提供良好的服务,但是当你在建模某种提供自己的线程语义作为Service时,建议你实现AbstractService,因为你有自己特定的线程需求。

要实现AbstractService,必须实现2个方法。

  • doStart()doStart()是第一次调用startAsync()直接调用的,你的doStart()方法应执行所有的初始化,如果启动成功,则最终调用notifyStarted(),如果启动失败,则最终调用notifyFailed()
  • doStop()doStop()是由第一次调用stopAsync()直接调用的,你的doStop()方法应关闭服务,如果关闭成功,则最终调用notifyStopped(),如果关闭失败,则最终调用notifyFailed()

你的doStartdoStop方法应该是快速的。如果你需要进行昂贵的初始化,例如读取文件、打开网络连接或任何可能阻塞的操作,则应考虑将该工作移至另一个线程。

2.3使用ServiceManager

除了Service框架实现之外,Guava还提供了ServiceManager类,它使涉及多个服务实现的某些操作更加容易。使用Services集合创建一个新的ServiceManager。然后,你可以管理它们:

或检查它们:

  • 如果所有服务都是RUNNING,则isHealthy()返回true。
  • servicesByState()返回按状态索引的所有服务的一致快照。
  • startupTimes()返回管理下的Service到该服务启动所需的时间(以毫秒为单位)的映射。返回的映射保证按启动时间排序。

虽然建议通过ServiceManager管理服务生命周期,但是通过其他机制启动的状态转换 不会影响其方法的正确性 。例如,如果服务是由startAsync()之外的某种机制启动的,则监听器将在适当的时候被调用,而awaitHealthy()仍将按预期工作。ServiceManager强制执行的唯一要求是,在构造ServiceManager时,所有Service都必须是NEW

本文参考:
ListenableFutureExplained
ServiceExplained
guava-tests-concurrent

阅读全文
  • 点赞