1. CompletableFuture 简介
1.1 概述
CompletableFuture是 Java 8 中引入的一个类,它完结了CompletionStage接口,供给了一组丰厚的办法来处理异步操作和多个使命的成果。它支撑链式操作,能够方便地处理使命的依靠联系和成果转化。相比于传统的Future接口,CompletableFuture更加灵活和强大。
1.2 优势与特色
CompletableFuture的运用具有以下优势和特色:
- 异步履行:CompletableFuture答应使命在后台线程中异步履行,不会堵塞主线程,进步了应用程序的呼应性和功用。
- 链式操作:经过CompletableFuture供给的办法,能够方便地对使命进行链式操作,构建杂乱的使命依靠联系,完结高效的使命调度和履行。
- 反常处理:CompletableFuture供给了丰厚的反常处理办法,能够处理使命履行过程中或许发生的反常,并完结灵活的错误处理和回退机制。
- 多使命组合:CompletableFuture支撑多个使命的并发履行和成果组合。能够轻松地完结多使命并发处理的场景,进步应用程序的功率和并发性。
2. CompletableFuture 的根本用法
2.1 创立 CompletableFuture 目标
运用CompletableFuture创立异步使命十分简单。能够运用CompletableFuture.supplyAsync() 或CompletableFuture.runAsync() 办法来创立CompletableFuture目标。
2.1.1 运用CompletableFuture.supplyAsync() 办法
运用CompletableFuture.supplyAsync() 办法来创立 CompletableFuture 目标的示例。该办法用于履行具有回来值的使命,并在使命完结时回来成果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 履行具有回来值的使命
return "使命成果";
});
在上述示例中,咱们运用CompletableFuture.supplyAsync() 办法创立一个具有回来值的 CompletableFuture 目标,使命会在默许的 ForkJoinPool 中异步履行。
2.1.2 运用CompletableFuture.runAsync() 办法
除了CompletableFuture.supplyAsync() 办法,CompletableFuture 还供给了CompletableFuture.runAsync() 办法用于履行没有回来值的使命。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 履行没有回来值的使命
});
在上述示例中,咱们运用CompletableFuture.runAsync() 办法创立一个没有回来值的 CompletableFuture 目标,使命会在默许的 ForkJoinPool 中异步履行。
2.1.3 指定自定义线程池
咱们还能够经过指定自定义线程池来创立 CompletableFuture 目标,以满意特定的并发需求。
ExecutorService customExecutor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 履行使命的代码
}, customExecutor);
在上述示例中,咱们经过Executors.newFixedThreadPool(10) 创立了一个固定巨细为 10 的自定义线程池,并将其传递给CompletableFuture.supplyAsync() 办法来履行异步使命。
2.2 获取使命成果
获取CompletableFuture使命的成果有多种办法。最常用的办法是运用join() 办法堵塞当时线程,直到使命完结并回来成果。
2.2.1 运用join() 办法
join() 办法是 CompletableFuture 类供给的一种获取使命成果的办法,它会堵塞当时线程,直到使命完结并回来成果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 履行使命的代码
return "使命成果";
});
String result = future.join();
在上述示例中,咱们运用join() 办法获取使命的成果,并将成果赋值给result变量。假如使命还未完结,join() 办法会堵塞当时线程,直到使命完结。
join() 办法和get() 办法十分相似,但join() 办法不会抛出InterruptedException和ExecutionException反常,而是将反常包装在CompletionException中抛出。因而,它更适合在 Lambda 表达式或流式操作中运用。
2.2.2 运用get() 办法
get() 办法也是 CompletableFuture 类供给的一种获取使命成果的办法,它会堵塞当时线程,直到使命完结并回来成果。与join() 办法不同的是,get() 办法会抛出InterruptedException和ExecutionException反常,需求进行反常处理。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 履行使命的代码
return "使命成果";
});
try {
String result = future.get();
} catch (InterruptedException | ExecutionException e) {
// 反常处理逻辑
}
在上述示例中,咱们运用get() 办法获取使命的成果,并在或许抛出反常的情况下进行反常处理。假如使命还未完结,get() 办法会堵塞当时线程,直到使命完结。
get() 办法的反常处理较为繁琐,需求捕获InterruptedException和ExecutionException反常,并进行相应的处理。因而,在 Lambda 表达式或流式操作中,推荐运用join() 办法。
2.3 异步回调办法
CompletableFuture 供给了一系列办法来处理使命的完结事情,完结异步回调。咱们将逐个介绍这些办法的差异和用法。
thenApply()
办法签名:thenApply(Function<? super T, ? extends U> fn)
- 输入参数:上一阶段的使命成果类型为 T。
- 回来值:新阶段的使命成果类型为 U。
- 功用:对上一阶段的使命成果进行转化操作,并回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
.thenApply(result -> result * 2)
.thenApply(result -> result + 1);
在上述示例中,咱们运用 thenApply()办法对上一阶段的成果进行转化,将成果乘以 2,并将转化后的成果加 1。每个 thenApply()办法都回来一个新的 CompletableFuture 目标,能够持续链式调用。
thenAccept()
办法签名:thenAccept(Consumer<? super T> action)
- 输入参数:上一阶段的使命成果类型为 T。
- 回来值:CompletableFuture,没有回来值。
- 功用:对上一阶段的使命成果进行消费操作,没有回来值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
.thenAccept(result -> System.out.println("使命成果:" + result));
在上述示例中,咱们运用 thenAccept()办法对上一阶段的成果进行消费,将成果打印输出。thenAccept()办法没有回来值,仅用于消费使命成果。
thenRun()
办法签名:thenRun(Runnable action)
- 输入参数:无。
- 回来值:CompletableFuture,没有回来值。
- 功用:在上一阶段使命完结后履行给定的 Runnable 使命,没有输入参数和回来值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
.thenRun(() -> System.out.println("使命履行结束"));
在上述示例中,咱们运用 thenRun()办法在上一阶段使命完结后履行一个 Runnable 使命,输出一条使命履行结束的音讯。
2.4 多使命组合回调
CompletableFuture 还供给了一些办法来组合多个使命的成果,完结更杂乱的异步处理逻辑。
thenCombine()
办法签名:thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
- 输入参数:另一个 CompletionStage 目标和一个 BiFunction 函数,函数的输入参数分别为上一阶段的使命成果类型 T 和另一个 CompletionStage 目标的使命成果类型 U,函数的回来值类型为 V。
- 回来值:新阶段的使命成果类型为 V。
- 功用:当两个 CompletionStage 目标都完结时,将它们的使命成果传递给给定的 BiFunction 函数进行组合处理,并回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
在上述示例中,咱们运用 thenCombine()办法将两个使命的成果进行组合,将它们的成果相加并回来新的 CompletableFuture 目标。
thenCompose()
办法签名:thenCompose(Function<? super T, ? extends CompletionStage> fn)
- 输入参数:一个 Function 函数,函数的输入参数为上一阶段的使命成果类型 T,函数的回来值为另一个 CompletionStage 目标。
- 回来值:新阶段的使命成果类型为 U。
- 功用:当上一阶段的使命完结后,将成果传递给给定的 Function 函数,该函数回来一个新的 CompletionStage 目标,新阶段的使命成果类型为 U。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result * 2));
在上述示例中,咱们运用 thenCompose()办法将上一阶段的成果传递给一个 Function 函数,该函数回来一个新的 CompletionStage 目标。新阶段的使命成果为上一阶段成果的两倍。
allOf()
办法签名:allOf(CompletableFuture<?>… cfs)
- 输入参数:多个 CompletableFuture 目标。
- 回来值:CompletableFuture,没有回来值。
- 功用:等候一切给定的 CompletableFuture 目标都完结,回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
在上述示例中,咱们运用 allOf()办法等候一切的 CompletableFuture 目标都完结,回来一个新的 CompletableFuture 目标。这样咱们就能够在该目标上进跋涉一步的处理,例如获取各个 CompletableFuture 的成果。
3. 反常处理与错误处理
3.1 反常处理办法
CompletableFuture 供给了多种办法来处理异步使命履行中或许发生的反常。常用的办法有:
- exceptionally(Function<Throwable, ? extends T> fn) :当 CompletableFuture 履行过程中发生反常时,运用指定的函数进行反常处理,并回来一个新的 CompletableFuture 目标,其间包含处理成果或默许值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("使命履行反常");
});
CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
System.out.println("反常处理:" + ex.getMessage());
return 0; // 默许值
});
- handle(BiFunction<? super T, Throwable, ? extends U> fn) :当 CompletableFuture 履行完结时,运用指定的函数处理成果或反常,并回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
System.out.println("反常处理:" + ex.getMessage());
return "默许值";
} else {
return "成果:" + result;
}
});
3.2 错误处理与反常链
CompletableFuture 还支撑反常链,能够将多个 CompletableFuture 的反常连接起来,形成反常链。能够运用exceptionally() 或handle() 办法来完结反常链的处理。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("使命履行反常");
});
CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
System.out.println("反常处理:" + ex.getMessage());
throw new CustomException("自定义反常", ex);
});
在上述示例中,咱们经过exceptionally() 办法处理使命的反常,并抛出一个自定义反常,并将原始反常作为反常链的一部分传递下去。
4. 自定义线程池与资源管理
4.1 默许线程池与 ForkJoinPool
CompletableFuture 默许运用 ForkJoinPool 线程池来履行异步使命。ForkJoinPool 是一种根据工作盗取算法的线程池,适用于使命分解和并行计算。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步使命的代码
});
在上述代码中,CompletableFuture 会在默许的 ForkJoinPool 中异步履行使命。
4.2 自定义线程池
除了运用默许线程池,咱们还能够自定义线程池来满意特定的需求。自定义线程池能够经过Executors类来创立。
ExecutorService customExecutor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步使命的代码
}, customExecutor);
在上述代码中,咱们创立了一个固定巨细为 10 的自定义线程池,并将其传递给 CompletableFuture 来履行异步使命。
4.3 资源管理与关闭线程池
在运用自定义线程池时,需求留意及时关闭线程池以释放资源。能够运用ExecutorService的shutdown() 或shutdownNow() 办法来关闭线程池。
ExecutorService customExecutor = Executors.newFixedThreadPool(10);
// 异步使命代码
customExecutor.shutdown();
在上述代码中,咱们在使命完结后调用了shutdown() 办法来关闭线程池。
5. 并发使命的调度与操控
5.1 异步使命的并发度操控
CompletableFuture 答应咱们操控并发使命的履行数量。能够经过自定义线程池的巨细来限制并发度。
ExecutorService customExecutor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 异步使命1的代码
}, customExecutor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 异步使命2的代码
}, customExecutor);
// ...
在上述代码中,咱们经过自定义线程池的巨细为 5 来限制并发使命的数量。
5.2 使命的超时处理
CompletableFuture 还供给了超时处理的功用,能够操控使命的最大履行时间。能够运用completeOnTimeout() 办法来完结超时处理。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步使命的代码
}).completeOnTimeout("默许值", 5, TimeUnit.SECONDS);
在上述代码中,咱们指定使命的最大履行时间为 5 秒,假如使命在规定时间内没有完结,将回来默许值。
5.3 中止与撤销使命
在某些情况下,咱们或许需求中止或撤销正在履行的使命。CompletableFuture 供给了cancel() 办法来撤销使命的履行。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步使命的代码
});
boolean canceled = future.cancel(true);
在上述代码中,咱们调用cancel() 办法来撤销使命的履行,并传递一个布尔值表明是否中止正在履行的使命。
6. CompletableFuture 的进阶应用
6.1 CompletableFuture 与 IO 操作
CompletableFuture 在处理 IO 操作时十分有用。能够将 IO 操作封装为 CompletableFuture 使命,运用 CompletableFuture 的异步特性进步 IO 操作的功率。
CompletableFuture<String> readData = CompletableFuture.supplyAsync(() -> {
// 履行读取数据的IO操作
return "读取的数据";
});
CompletableFuture<Void> processData = readData.thenAccept(data -> {
// 处理读取到的数据
System.out.println("读取到的数据:" + data);
// 履行处理数据的操作
});
CompletableFuture<Void> writeData = processData.thenRun(() -> {
// 履行写入数据的IO操作
System.out.println("数据写入完结");
});
writeData.join();
在上述代码中,咱们运用 CompletableFuture 处理了一个包含读取数据、处理数据和写入数据的 IO 操作流程。经过异步履行和链式操作,能够有效地运用 CPU 和 IO 资源,进步程序的呼应性和吞吐量。
6.2 CompletableFuture 与网络恳求
CompletableFuture 也能够很好地与网络恳求结合运用。咱们能够运用 CompletableFuture 建议多个网络恳求,并在一切恳求完结后处理成果。
CompletableFuture<String> request1 = CompletableFuture.supplyAsync(() -> {
// 建议网络恳求1
return "恳求1成果";
});
CompletableFuture<String> request2 = CompletableFuture.supplyAsync(() -> {
// 建议网络恳求2
return "恳求2成果";
});
CompletableFuture<String> request3 = CompletableFuture.supplyAsync(() -> {
// 建议网络恳求3
return "恳求3成果";
});
CompletableFuture<Void> allRequests = CompletableFuture.allOf(request1, request2, request3);
allRequests.thenRun(() -> {
// 一切恳求完结后的处理逻辑
String result1 = request1.join();
String result2 = request2.join();
String result3 = request3.join();
// 对恳求成果进行处理
});
在上述代码中,咱们运用 CompletableFuture 建议了三个网络恳求,并经过allOf() 办法等候一切恳求完结。在一切恳求完结后,咱们能够运用join() 办法获取各个恳求的成果,并进行后续处理。
7. 实战事例
事务背景: 在电商项目的售后事务中,当客服接收到用户的售后申请时,需求进行一系列操作,包含查询订单信息、查询 ERP 中的商品信息、查询用户信息,以及创立售后工单。
代码完结:
public CompletableFuture<Void> processAfterSalesRequest(String orderId, String customerId) {
CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(() -> getOrderInfo(orderId));
CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(() -> getInventoryInfo(orderId));
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> getUserInfo(customerId));
return CompletableFuture.allOf(orderFuture, inventoryFuture, userFuture)
.thenApplyAsync(ignored -> {
Order order = orderFuture.join();
Inventory inventory = inventoryFuture.join();
User user = userFuture.join();
// 创立售后工单
createAfterSalesTicket(order, inventory, user);
return null;
});
}
private Order getOrderInfo(String orderId) {
// 查询订单信息的逻辑
// ...
return order;
}
private Inventory getInventoryInfo(String orderId) {
// 查询ERP中商品信息的逻辑
// ...
return inventory;
}
private User getUserInfo(String customerId) {
// 查询用户信息的逻辑
// ...
return user;
}
private void createAfterSalesTicket(Order order, Inventory inventory, User user) {
// 创立售后工单的逻辑
// ...
}
在上述代码中,咱们运用CompletableFuture.supplyAsync() 办法分别查询订单信息、ERP 中的商品信息和用户信息,然后运用CompletableFuture.allOf() 办法等候一切查询使命完结。完结后,咱们能够经过join() 办法获取各个查询使命的成果,并将成果传递给createAfterSalesTicket() 办法来创立售后工单。
8. 总结
CompletableFuture 是供给了丰厚的功用和办法。它能简化并发使命处理,进步系统功用和呼应性。经过了解其根本用法、进阶应用和最佳实践,咱们能够灵活处理异步回调、使命组合、反常处理和资源管理。
欢迎点赞收藏加重视