Files
Hui-s-notebook/CompletableFuture 组合式异步编程.md
2023-09-10 10:50:53 +08:00

10 KiB
Raw Blame History

Future接口

对将来的某个时刻会发生的结果进行建模, 建模了一种异步运算, 返回一个被执行运算结果的引用, 当运算结束后,这个引用被返回给调用方

Future 接口的局限性

使用 isDone 检测异步计算是否结束, 很难描述 Future 结果之间的依赖性

  • 把两个异步计算合并为一个
  • 等待 Futrure 集合中所有任务完成
  • 仅等待 Future 集合中最快结束的任务完成,并返回结果
  • 通过编程方式完成一个 Future 任务的执行
  • 应对 Future 的完成事件(当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步操作,而不是简单的阻塞等待操作的结果)

使用 CompletableFuture 构建异步应用

  • 如何为你的客户提供异步 API
  • 让使用了同步 API 的代码变为非阻塞代码
  • 以响应式的方式处理异步操作的完成事件

实现异步 API

public double getPrice(String product) {
	return calculatePrice(product);
}
private double calculatePrice(String product) {
	delay();
	return random.nextDouble()  product.charAt(0)+product.charAt(1);
}

将同步方法转换为异步方法

public Future<Double> getPriceAsync(String price) {
	CompletabelFuture<Double> futurePrice = new CompletableFuture<>();
	new Thread( () -> {
		double price = calculatePrice(produce);
		futurePrice.complete(price);
	}).start();
	returen futurePrice();
}

Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1000000);
System.out.println("Invocation returned after " + InvocationTime + "msecs");

doSomethingElse();

try{
	double price = futurePrice.get();
	System.out.println("Price is %.2f%n", price);
} catch (Exception e) {
	throw new RuntimeException(e);
}
long retrievalTime = ( (System.nanoTime() - start) / 1000000) ;
System.out.println("Price returned after " + retrievalTime + " mseGs");

错误处理

public Future<Double> getPriceAsync(String price) {
	CompletabelFuture<Double> futurePrice = new CompletableFuture<>();
	new Thread( () -> {
		try {
			double price ± calculatePrice(product)
			futurePrice.complete (price) ;
		} catch (Exception ex)
			futurePrice.completeExceptionally (ex);
		}
	}).start();
	returen futurePrice();
}

使用工厂方法 supplyAsync 创建 CompletableFuture

public Future<Double> getPriceAsync(String product) {
	return CompletableFuture.supplyAsync(()-> calculatePrice(product));
}

让你的代码免受阻塞之苦

List<Shop> shops=Arrays.asList(new Shop("BestPrice"),
								  new Shop("LetsSaveBig"),
								  new Shop("MyFavoriteShop"),
								  new Shop("BuyItAll"));
public List<String> findPrices(String product) {
	return shops.stream()
		.map(shop -> String.format("gs price is g.2f", shop.getName(), shop-getPrice(product))
		.collect (toList ());
}

使用并行流对请求进行并行操作

public List<String> findPrices(String product) {
	return shops.parallelStream()
		.map(shop -> String.format("gs price is g.2f", shop.getName(), shop-getPrice(product))
		.collect (toList ());
}

使用 CompletableFuture 发起异步请求

public List<String> findPrices(String product) {
	List<CompletableFuture<String>> priceFutures =  
		shops.stream()
		.map(shop -> String.format("gs price is g.2f", shop.getName(), shop-getPrice(product))
		.collect (toList());
	return priceFutures.stream().map(CompletabelFuture::join).collect(toList());
}

寻找更好的方案

它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于 Runtime. getRuntime().availableProcessors()的返回值

CompletableFuture 具有一定的优势因为它允许你对执行器Executor进行配置尤其是线程池的大小让它以更适合应用需求的方式进行配置满足程序的要求而这是并行流 API 无法提供的

使用定制的执行器

创建一个配有线程池的执行器

线程池大小与处理器的利用率之比可以使用下面的公式进行估算:

Nthreads=NCPU UCPU (1+W/C)

处理器核数目*期望 CPU 利用率 (0 和 1 之间)*(1+等待时间与计算时间的比率)

private final Executor executor = Executor.newFixedThreadPool(Math.min(shop.size(), 100), new ThreadFactory() {
	public Thread new Thread(Runnable r) {
		Thread t = new Thread(r);
		t.setDaemon(true);
		// 使用守护线程——这种方式不会阻止程序的关停
		return t;
	}
}

CompletableFuture.supplyAsync(()-> shop.getName()+" price is "+shop.getPrice(product), executor);

使用流还是 CompletableFutures

  • 如果进行的时计算密集型的操作,并且没有 I/O推荐使用 Stream 接口,因为实现简单,同时效率也可能是最高的
  • 并行的工作单元涉及等待 I/O 操作,使用 CompletableFuture 灵活性更好,可设置线程数

对多个异步任务进行流水线操作

public class Discount {
	public enum Code {
		NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
		private final int percentage;
		Code(int percentage) {
			this.percentage=percentage;
		}
	}
	// Discount类的具体实现这里暂且不表示参见代码清单11-14
}

public String getPrice(String product) {
	double price=calculatePrice(product);
	Discount.Code code=Discount.Code.values()[
							  random.nextInt(Discount.Code.values().length)];
	return String.format("%s:%.2f:%s", name, price, code);
}

private double calculatePrice(String product) {
	delay();
	return random.nextDouble()  product.charAt(0)+product.charAt(1);
}

实现折扣服务

将对商店返回字符串的解析操作封装到 Quote 类中

public class Quote {

	private final String shopName;
	private final double price;
	private final Discount.Code discountCode;
	public Quote(String shopName, double price, Discount.Code code) {
		this.shopName=shopName;
		this.price=price;
		this.discountCode=code;
  }

  public static Quote parse(String s) {
	  String[] split=s.split(":");
	  String shopName=split[0];
	  double price=Double.parseDouble(split[1]);
	  Discount.Code discountCode=Discount.Code.valueOf(split[2]);
	  return new Quote(shopName, price, discountCode);
  }

  public String getShopName() { return shopName; }
  public double getPrice() { return price; }
  public Discount.Code getDiscountCode() { return discountCode; }
}

----------------------------------------------------------
public class Discount{
	public enum Code {
	}

	public static String applyDiscount(Quote quote) {
		return quote.getShopName() + " price is " + Discount.apply(quote.getPrice() + quote.getDiscountCode());
	}

	private static double apply(double price, Code code) {
		delay();
		return format(price * (100 - code.percentage) / 100)
	}
}

使用 Discount 服务

  • 第一个操作将每个 shop 对象转换成了一个字符串, 该字符串包含了该 shop 中指定商品价格和折扣代码
  • 第二个对象对这些字符串进行解析, 在 Quote 对象中转换
  • 第三个 map 操作联系远程 Discount 服务, 计算最终折扣价格, 返回该价格以及提供该商品的shop
public List<String> findPrices(String produce) {
	return shop.stream()
			   .map(shop -> shop.getPrice(Product))
			   .map(Quote::parse)
			   .map(Discount::applyDiscount)
			   .collect(toList());
}

构造同步和异步操作

!Pasted image 20230705093951.png

public List<String> findPrices(String product) {
	List<CompletableFuture<String>> priceFutures = 
		shops.stream()
			 .map(shop -> CompletableFuture.supplyAsync(
				 () -> shop.getPrice(product), executor))
			 .map(future -> future.thenApply(Quote::parse))
			 .map(future -> future.thenCompose(quote -> 
				 CompletableFuture.supplyAsync(
					 () -> Discount.applyDiscount(quote), executor)))
			 .collect(toList());
			
	return priceFutures.stream()
			.map(CompletableFuture::join).
			.collect(toList());
}
  1. 获取价格
  2. 解析报价
  3. 为计算折扣价格构造 Future
  • 从 shop 对象中获取价格, 把价格转换为 Quote
  • 拿到返回的 Quote 对象, 将其作为参数传递给 Discount 服务, 取得最终折扣价格

thenCompose 允许对两个异步操作进行流水线, 第一个操作完成将其结果作为参数传递给第二个操作

将两个 Completable-Future 对象整合起来,无论它们是否存在依赖

使用 thenCombine 方法, 接受 BiFunction 的第二参数

Future<Double> futurePriceInUSD = 
	CompletabelFuture.supplyAsync(() -> shop.getPrice(product))
	.thenCombine(CpmpletabelFuture.supplyAsync(() -> 
		exchangeService.getRate(Money.EUR, Money.USD)),
		(price, rate) -> price * rate	
	);

!Pasted image 20230705104736.png

对 Future 和 Completable-Future 的回顾

响应 CompletableFuture 的 completion 事件

只要有商店返回商品价格就在第一时间现实返回值, 不再等待那些还未返回的商店

对最佳价格查询器应用的优化

public Stream<CompletableFuture<String>> findPricesStream(String product) {
	return shops.stream()
			.map(shop-> CompletableFuture.supplyAsync(
					()-> shop.getPrice(product), executor))
			.map(future-> future.thenApply(Quote::parse))
			.map(future-> future.thenCompose(quote->
					CompletableFuture.supplyAsync(
					  ()-> Discount.applyDiscount(quote), executor)));
}

通过 thenAccept 方法提供这一功能, 接受 CompletableFuture 执行完毕后的返回值作为参数, thenAccept 已经定义了如何处理结果, 一旦得到计算结果, 返回一个 CompletabelFuture<Void>

findPricesStream("myPhone").map(f-> f.thenAccept(System.out::println));

CompletableFuture[] futures = findPricesStream("myPhone")
	.map(f-> f.thenAccept(System.out::println))
	.toArray(size-> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();

付诸实践

long start=System.nanoTime();
CompletableFuture[] futures=findPricesStream("myPhone27S")
	.map(f-> f.thenAccept(
		s-> System.out.println(s+" (done in "+
			((System.nanoTime() - start) / 1_000_000)+" msecs)")))
	.toArray(size-> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in "
				 +((System.nanoTime() - start) / 1_000_000)+" msecs");