10 KiB
Future接口
对将来的某个时刻会发生的结果进行建模, 建模了一种异步运算, 返回一个被执行运算结果的引用, 当运算结束后,这个引用被返回给调用方
Future 接口的局限性
使用 isDone 检测异步计算是否结束, 很难描述 Future 结果之间的依赖性
- 把两个异步计算合并为一个
- 等待 Futrure 集合中所有任务完成
- 仅等待 Future 集合中最快结束的任务完成,并返回结果
- 通过编程方式完成一个 Future 任务的执行
- 应对 Future 的完成事件(当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步操作,而不是简单的阻塞等待操作的结果)
使用 CompletableFuture 构建异步应用
- 如何为你的客户提供异步 API
- 让使用了同步 API 的代码变为非阻塞代码
- 以响应式的方式处理异步操作的完成事件
实现异步 API
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0)+product.charAt(1);
}
将同步方法转换为异步方法
public Future<Double> getPriceAsync(String price) {
CompletabelFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
double price = calculatePrice(produce);
futurePrice.complete(price);
}).start();
returen futurePrice();
}
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1000000);
System.out.println("Invocation returned after " + InvocationTime + "msecs");
doSomethingElse();
try{
double price = futurePrice.get();
System.out.println("Price is %.2f%n", price);
} catch (Exception e) {
throw new RuntimeException(e);
}
long retrievalTime = ( (System.nanoTime() - start) / 1000000) ;
System.out.println("Price returned after " + retrievalTime + " mseGs");
错误处理
public Future<Double> getPriceAsync(String price) {
CompletabelFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
try {
double price ± calculatePrice(product)
futurePrice.complete (price) ;
} catch (Exception ex)
futurePrice.completeExceptionally (ex);
}
}).start();
returen futurePrice();
}
使用工厂方法 supplyAsync 创建 CompletableFuture
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(()-> calculatePrice(product));
}
让你的代码免受阻塞之苦
List<Shop> shops=Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("gs price is g.2f", shop.getName(), shop-getPrice(product))
.collect (toList ());
}
使用并行流对请求进行并行操作
public List<String> findPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("gs price is g.2f", shop.getName(), shop-getPrice(product))
.collect (toList ());
}
使用 CompletableFuture 发起异步请求
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> String.format("gs price is g.2f", shop.getName(), shop-getPrice(product))
.collect (toList());
return priceFutures.stream().map(CompletabelFuture::join).collect(toList());
}
寻找更好的方案
它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于 Runtime. getRuntime().availableProcessors()的返回值
CompletableFuture 具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流 API 无法提供的
使用定制的执行器
创建一个配有线程池的执行器
线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads=NCPU* UCPU* (1+W/C)
处理器核数目*期望 CPU 利用率 (0 和 1 之间)*(1+等待时间与计算时间的比率)
private final Executor executor = Executor.newFixedThreadPool(Math.min(shop.size(), 100), new ThreadFactory() {
public Thread new Thread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
// 使用守护线程——这种方式不会阻止程序的关停
return t;
}
}
CompletableFuture.supplyAsync(()-> shop.getName()+" price is "+shop.getPrice(product), executor);
使用流还是 CompletableFutures?
- 如果进行的时计算密集型的操作,并且没有 I/O,推荐使用 Stream 接口,因为实现简单,同时效率也可能是最高的
- 并行的工作单元涉及等待 I/O 操作,使用 CompletableFuture 灵活性更好,可设置线程数
对多个异步任务进行流水线操作
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage=percentage;
}
}
// Discount类的具体实现这里暂且不表示,参见代码清单11-14
}
public String getPrice(String product) {
double price=calculatePrice(product);
Discount.Code code=Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0)+product.charAt(1);
}
实现折扣服务
将对商店返回字符串的解析操作封装到 Quote 类中
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code code) {
this.shopName=shopName;
this.price=price;
this.discountCode=code;
}
public static Quote parse(String s) {
String[] split=s.split(":");
String shopName=split[0];
double price=Double.parseDouble(split[1]);
Discount.Code discountCode=Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName() { return shopName; }
public double getPrice() { return price; }
public Discount.Code getDiscountCode() { return discountCode; }
}
----------------------------------------------------------
public class Discount{
public enum Code {
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice() + quote.getDiscountCode());
}
private static double apply(double price, Code code) {
delay();
return format(price * (100 - code.percentage) / 100);
}
}
使用 Discount 服务
- 第一个操作将每个 shop 对象转换成了一个字符串, 该字符串包含了该 shop 中指定商品价格和折扣代码
- 第二个对象对这些字符串进行解析, 在 Quote 对象中转换
- 第三个 map 操作联系远程 Discount 服务, 计算最终折扣价格, 返回该价格以及提供该商品的shop
public List<String> findPrices(String produce) {
return shop.stream()
.map(shop -> shop.getPrice(Product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(toList());
}
构造同步和异步操作
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join).
.collect(toList());
}
- 获取价格
- 解析报价
- 为计算折扣价格构造 Future
- 从 shop 对象中获取价格, 把价格转换为 Quote
- 拿到返回的 Quote 对象, 将其作为参数传递给 Discount 服务, 取得最终折扣价格
thenCompose 允许对两个异步操作进行流水线, 第一个操作完成将其结果作为参数传递给第二个操作
将两个 Completable-Future 对象整合起来,无论它们是否存在依赖
使用 thenCombine 方法, 接受 BiFunction 的第二参数
Future<Double> futurePriceInUSD =
CompletabelFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(CpmpletabelFuture.supplyAsync(() ->
exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
);
对 Future 和 Completable-Future 的回顾
响应 CompletableFuture 的 completion 事件
只要有商店返回商品价格就在第一时间现实返回值, 不再等待那些还未返回的商店
对最佳价格查询器应用的优化
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop-> CompletableFuture.supplyAsync(
()-> shop.getPrice(product), executor))
.map(future-> future.thenApply(Quote::parse))
.map(future-> future.thenCompose(quote->
CompletableFuture.supplyAsync(
()-> Discount.applyDiscount(quote), executor)));
}
通过 thenAccept 方法提供这一功能, 接受 CompletableFuture 执行完毕后的返回值作为参数, thenAccept 已经定义了如何处理结果, 一旦得到计算结果, 返回一个 CompletabelFuture<Void>
findPricesStream("myPhone").map(f-> f.thenAccept(System.out::println));
CompletableFuture[] futures = findPricesStream("myPhone")
.map(f-> f.thenAccept(System.out::println))
.toArray(size-> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
付诸实践
long start=System.nanoTime();
CompletableFuture[] futures=findPricesStream("myPhone27S")
.map(f-> f.thenAccept(
s-> System.out.println(s+" (done in "+
((System.nanoTime() - start) / 1_000_000)+" msecs)")))
.toArray(size-> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in "
+((System.nanoTime() - start) / 1_000_000)+" msecs");

