《Java源码分析》:线程池 Future、RunnableFuture、FutureTask
在使用ThreadPoolExecutor使用submit提交任务后然后交给线程池中的线程去执行,是吧
在ThreadPoolExecutor(其实是在AbstractExecutorService中)有如下几个submit方法,
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
submit然后调用executor方法,executor方法的内部实现在上篇博文已经分析过了哈。
这篇博文并不是想探讨submit方法,而是想讨论下submit的返回值Future对象.
在submit方法中我们看见,有一行这样的代码
RunnableFuture<T> ftask = newTaskFor(task);
这行代码的功能为:对我们的task进行了类型的转化,task类型是Runnable/Callable.转化成为了一个RunnableFuture对象.
根据task类型由于有两种Runnable/Callable,分别有两种不同的重载方法newTaskFor.如下:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
从newTaskFor函数中可以看到,就是直接调用了FutureTask的有参构造函数.
FutureTask是继承了RunnableFuture类来实现的.如下:
public class FutureTask<V> implements RunnableFuture<V>
下面来看下RunnableFuture类的内容,如下:
/*
作为 Runnable 的 Future。成功执行 run 方法可以完成 Future 并允许访问其结果。
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture接口比较简单,继承了Runnable、Future接口。并只有一个run方法
回到上面的所谈论的newTaskFor函数,如下:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
看下FutureTask类的构造方法的内部实现如下:
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/*
* Creates FutureTask that will, upon running, execute the
* given Runnable, and arrange that get will return the
* given result on successful completion.
创建一个FutureTask对象,执行的还是里面所包含的Runnable对象,
如果Runnable对象正常执行完成,则此FutureTask对象调用get方法的时候就会得到结果reulst。
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
在第二个构造函数中,我们看到了
this.callable = Executors.callable(runnable, result);
这行代码是将Ruunbale类型转换为了Callable类型。因此,我们看下Executors.callable(runnable, result)方法的实现,到底是如何转化的呢??
/*
* Returns a {@link Callable} object that, when
* called, runs the given task and returns {@code null}.
*
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
将Runnable适配为一个Callable对象, 转化为的对象虽然是Callable对象了,但是调用此对象的call方法其实就是调用了Runnable接口的run方法并且返回值是null。
继续往下面看
看下RunnableAdapter类,此类实现了Callable接口。
// Non-public classes supporting the public methods
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
总结一下上面的逻辑
1、首先,在我们写程序的时候,我们可能在线程池中的submit方法来提交一个任务task,这个任务task可能是Runnable对象,也可能是Callable对象。为便于处理,我们需要将Runnable、Callable统一起来,因此,就借助了Executors类中的RunnableAdapter类(此类为一个适配器)来将Runnable对象适配为一个Callable对象。这一适配过程在FutureTask类的构造方法中完成。如何适配的呢?看上面的源代码。
2、而submit方法要求返回一个Future对象,我们可以通过这个对象来获取任务的运行结果。而FutureTask作为Future的实现类实现了对任务task的封装,并且可以通过封装后的对象获取返回值。
上面的关系理清楚之后,我们就重点来看下FutureTask类。
FutureTask类分析
Future及其相关的类结构图如下:
即FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable, Future接口,因此FutureTask也是实现了Future接口的哈,即具有Future的所有特性。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
不仅线程池有声明周期,回顾下,线程池的生命周期有:RUNNING SHUTDOWN STOP TIDYING TERMINATED 五个状态。
而FutureTask也存在生命周期。如下:
1、NEW(开始状态)
2、COMPLETING(正在运行状态)
3、NORMAL (正常运行完结束状态)
4、EXCEPTIONAL (异常状态)
5、CANCELLED (取消状态)
6、INTERRUPTING (中断)
7、INTERRUPTED (中断结束状态)
源码中对这几个状态以及状态之间的转换关系如下:
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
FutureTask类中get方法介绍
当我们些如下的代码时:
Future f = pool.submit(new Runnable(){...});
...
Object obj = f.get();//获取任务的返回结果
Future中的get方法获取结果时里面的内部实现是什么呢?下面一起来看看
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)//说明任务还正在执行,需要等待
//返回值为任务执行后的状态值,可能是正常执行完,也可能是中断抛出异常返回,也可能是超时返回
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/*
* Awaits completion or aborts on interrupt or timeout.
*等待完成或者是抛出异常或者是等待时间到了
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {//检测线程是否中断
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {//执行完了,可能是正常执行完,也可能是取消、中断了,等等
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet 正在执行
Thread.yield();//等待
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {//检查等待是否超时了,如果超时,则自己返回此时的状态,否则继续挂起等待
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {//如果超时,则自己返回此时的状态
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);//上面条件如果都不满足,则唤醒该线程
}
}
/*
如果正常执行完,则返回结果,否则根据任务的状态抛出相应的异常
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
get方法里面的逻辑相当的简单,就是检查任务是否已经执行完毕,如果没有,则等待期指定完毕,否则根据任务的状态来决定是否是抛相应的异常还是返回正确的结果。
FutureTask类中其它一些和任务状态有关的方法
//任务是否已经取消了
public boolean isCancelled() {
return state >= CANCELLED;
}
//只要不是开始状态都是返回true
public boolean isDone() {
return state != NEW;
}
FutureTask类中的cancel方法如下,
1、任务当前状态不为初始状态,则返回false
2、任务当前装填为初始状态,但是可能刚刚中断或者是取消而CAS操作不成功,也是返回false。
3、如果允许中断,则中断任务,返回true。
此方法返回后任务要么处于运行结束状态,要么处于取消状态。isDone()将永远返回true,如果cancel()方法返回true,isCancelled()始终返回true。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {//唤醒等待的所有线程节点
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
以上就是关于FutureTask一点点的介绍。
小结
需要注意的是以下几点
1、Runnable对象可以通过RunnableAdapter适配器适配到Callable对象
2、Future对象可以通过get方法获取任务的返回值
3、FutureTask可以简单来看是对任务Runnable/Callable的封装。
参考资料
1、http://www.blogjava.net/xylz/archive/2011/02/13/344207.html