Java CompletableFuture:allOf等待所有线程任务结束(4)
private void method() throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
return "f1";
f1.whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println(System.currentTimeMillis() + ":" + s);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
return "f2";
f2.whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println(System.currentTimeMillis() + ":" + s);
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
//阻塞,直到所有任务结束。
System.out.println(System.currentTimeMillis() + ":阻塞");
all.join();
System.out.println(System.currentTimeMillis() + ":阻塞结束");
//一个需要耗时2秒,一个需要耗时3秒,只有当最长的耗时3秒的完成后,才会结束。
System.out.println("任务均已完成。");
06-12 20:16:37.400 31142-31142/zhangphil.test I/System.out: 1528805797400:阻塞
06-12 20:16:39.406 31142-31171/zhangphil.test I/System.out: 1528805799406:f2
06-12 20:16:40.404 31142-31170/zhangphil.test I/System.out: 1528805800404:f1
06-12 20:16:40.404 31142-31142/zhangphil.test I/System.out: 1528805800404:阻塞结束
任务均已完成。
可以看到f2很快就返回,是因为f2仅耗时2秒。f1需要耗时3秒,因此在f2结束后一秒,f1也返回。此时才执行join后的代码。
CompletableFuture:allOf等待所有线程任务结束(4) private void method() throws ExecutionException, InterruptedException { CompletableFuture&lt;String&gt; f1 = CompletableFuture.supplyAsync(() -&gt; { ...
ExecutorService pool = Executors.newFixedThreadPool(1000);
CompletableFuture[] futureList = list.parallelStream().map(item-> CompletableFuture.supplyAsync(() -> {
for(int i=0;i<24;i++){
for(int j=0;j<count;j++){
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Random;
import java.util.concurrent.*;
public clas
private void method() throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Time...
static void allOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = mes
场景:当有一批任务交给线程池执行,我们需要获取所有线程的返回结果。
Future的get()时阻塞的,如果循环get()每一个线程的结果,一个线程会卡住后面所有线程
CompletionService的take().get()虽然不会因为某个线程阻塞后面的线程,但是功能不丰富
CompletableFuture提供的功能丰富,使用简单,代码优雅
二、测试案例
一串数字1, 2, 3, 4, 5, 6, 7, 8, 9, 10
开启线程,执行乘以
//import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
@Test
void contextLoads() throws InterruptedException {
RestClient client = new RestClient("http://121.36.151.190:9090");
client.login("admin@abcd.com
int nThreads = 5;
int unit = quotaSettleList.size() % nThreads > 0 ? quotaSettleList.size() / nThreads : quotaSettleList.size() / nThreads + 1;
Li...
同步:就是在发出一个功能调用时,在没有得到结果之前,将一直处于
等待中 即阻塞状态。也就是必须一件一件事做,等前一件做完了才能做下一件事
异步:
异步通常意味着非阻塞,可以使得我们的
任务单独运行在与主
线程分离的其他
线程中,并且通过回调可以在主
线程中得到
异步任务的执行状态,是否完成,和是否异常等信息。
没有返回值的
异步任务
可以过滤掉失败的继续完成
CompletableFuture.allOf(Arrays.stream(arrayOfFutures).
filter(element -> !element.isDone() || element.isCompletedExceptionally()).
toArray(CompletableFuture[]::new)).t...