1、CompletableFuture异步编列

1.1 为什么需求异步编列

问题:查询产品概况页的逻辑十分复杂,数据的获取都需求长途调用,必然需求花费更多的时刻。

现在我事务中产品概况页包括如下7个办法:

获取sku的根本概况和图片列表

获取实时价格

获取三级分类

获取出售属性和选中状况

获取产品切换数据

获取海报信息

获取渠道信息

上面查询进程都是用OpenFeign完结的,假定每个长途调用需求1s时刻,那么全部履行完需求7s,这对用户来说是难以接受的。

那如果有多个线程同时履行这7步操作呢,时刻是不是就更短了。


1.2 CompletableFuture介绍

FutureJava 5增加的类,用来描绘一个异步计算的成果。你能够运用isDone办法检查计算是否完结,或者运用get堵塞住调用线程,直到计算完结回来成果,你也能够运用cancel办法停止使命的履行。

在Java 8中, 新增加了一个包括50个办法左右的类: CompletableFuture,供给了十分强壮的Future的扩展功能,能够帮助咱们简化异步编程的复杂性,供给了函数式编程的才能,能够经过回调的方式处理计算成果,并且供给了转化和组合CompletableFuture的办法。

CompletableFuture类完结了Future接口,所以你仍是能够像曾经相同经过get办法堵塞或者轮询的方式取得成果,但是这种方式不引荐运用。

CompletableFutureFutureTask同属于Future接口的完结类,都能够获取线程的履行成果。

CompletableFuture异步编排

1.3 创立异步目标

CompletableFuture 供给了四个静态办法来创立一个异步操作。

CompletableFuture异步编排

没有指定Executor的办法会运用ForkJoinPool.commonPool() 作为它的线程池履行异步代码。

  • runAsync办法不支撑回来值。

  • supplyAsync能够支撑回来值。

whenComplete能够处理正常或反常的计算成果,exceptionally处理反常情况。BiConsumer<? super T,? super Throwable>能够定义处理事务


whenCompletewhenCompleteAsync 的区别:

whenComplete:是履行当时使命的线程履行持续履行 whenComplete 的使命。

whenCompleteAsync:是履行把 whenCompleteAsync 这个使命持续提交给线程池来进行履行。

办法不以Async结尾,意味着Action运用相同的线程履行,而Async可能会运用其他线程履行(如果是运用相同的线程池,也可能会被同一个线程选中履行)

代码演示:

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创立一个没有回来值的异步目标
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("没有回来值成果");
        });
        System.out.println(future.get());
        //创立一个有回来值的异步目标
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int a=1/0;
                return 404;
            }
        }).whenComplete(new BiConsumer<Integer, Throwable>() {
            /**
             *whenComplete 和异步目标运用用一个线程
             * @param integer   异步目标履行后的回来值成果
             * @param throwable 反常目标
             */
            @Override
            public void accept(Integer integer, Throwable throwable) {
                System.out.println("whenComplete:"+integer);
                System.out.println("whenComplete:"+throwable);
            }
        }).exceptionally(new Function<Throwable, Integer>() {
            /**
             * 只处理反常的回调
             * @param throwable
             * @return
             */
            @Override
            public Integer apply(Throwable throwable) {
                return null;
            }
        }).whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
            /**
             * whenCompleteAsync跟异步目标有可能不适用同一个线程,由线程池重新分配
             * @param integer
             * @param throwable
             */
            @Override
            public void accept(Integer integer, Throwable throwable) {
            }
        });
    }
}

CompletableFuture异步编排

1.4 线程串行化与并行化办法

thenApply 办法:当一个线程依赖另一个线程时,获取上一个使命回来的成果,并回来当时使命的回来值。

CompletableFuture异步编排

thenAccept办法:消费处理成果。接纳使命的处理成果,并消费处理,无回来成果。

CompletableFuture异步编排

thenRun办法:只要上面的使命履行完结,就开端履行thenRun,仅仅处理完使命后,履行 thenRun的后续操作

CompletableFuture异步编排

带有Async默许是异步履行的。这里所谓的异步指的是不在当时线程内履行。

Function<? super T,? extends U>
T:上一个使命回来成果的类型 
U:当时使命的回来值类型

代码演示:

public class CompletableFutureDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(
                        50,
                        500,
                        30,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(10000)
                );
        //创立一个异步使命目标A
        CompletableFuture<Object> futureA = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                return "404";
            }
        },threadPoolExecutor);
        //创立一个B
        futureA.thenAcceptAsync(new Consumer<Object>() {
            @SneakyThrows
            @Override
            public void accept(Object o) {
                    Thread.sleep(500);
                    System.out.println("我是B");
            }
        },threadPoolExecutor);
        //创立一个C
        futureA.thenAcceptAsync(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                System.out.println("我是C");
            }
        },threadPoolExecutor);
    }
}

CompletableFuture异步编排

这里是测验看是否是并行化,咱们让B休眠一会,能够看到先输出C再输出B,阐明是并行化。

由于如果是串行化的化,那么即便B休眠一会,那么C也会一向等着,输出顺序为B、C

1.5 多使命组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

allOf:等待一切使命完结。

anyOf:只要有一个使命完结。

1.6 优化产品概况页(事务代码)

1.6.1 未优化之前的代码

@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {
    @Autowired
    private ProductFeignClient productFeignClient;
    //获取产品概况数据
    @Override
    public HashMap<String, Object> getItem(Long skuId) {
        HashMap<String, Object> resultMap=new HashMap<>();
        //获取sku的根本概况和图片列表
        SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
        //获取实时价格
        BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);
        //判别
        if(skuInfo!=null){
            //获取三级分类
            BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
            //获取出售属性和选中状况
            List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
            //获取产品切换数据
            Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
            //获取海报信息
            List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());
            resultMap.put("categoryView",categoryView);
            resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
            resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
            resultMap.put("spuPosterList",spuPosterBySpuId);
        }
        //获取渠道信息
        List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
        //处理数据符合要求 List  Obj  key attrName value attrValue
        List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
            Map<String, String> map = new HashMap<>();
            map.put("attrName", baseAttrInfo.getAttrName());
            map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
            return map;
        }).collect(Collectors.toList());
        //存储数据
        resultMap.put("skuInfo",skuInfo);
        resultMap.put("price",skuPrice);
        resultMap.put("skuAttrList",spuAttrList);
        return resultMap;
    }
}

1.6.2 运用CompletableFuture异步编列

装备线程池:

@Configuration
public class ThreadPoolConfig {
    /**
     * 中心线程数
     * 最大线程数
     * 闲暇存活时刻
     * 时刻单位
     * 堵塞队列
     * 默许:
     *  线程工厂
     *  回绝策略
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        return new ThreadPoolExecutor(
                50,
                500,
                30,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10000)
        );
    }
}

完结类改造:

@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {
    @Autowired
    private ProductFeignClient productFeignClient;
    @Autowired
    private ThreadPoolExecutor executor;
    //获取产品概况数据
    @Override
    public HashMap<String, Object> getItem(Long skuId) {
        HashMap<String, Object> resultMap=new HashMap<>();
        CompletableFuture<SkuInfo> skuInfoCompletableFuture = CompletableFuture.supplyAsync(new Supplier<SkuInfo>() {
            @Override
            public SkuInfo get() {
                //获取sku的根本概况和图片列表
                SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
                resultMap.put("skuInfo", skuInfo);
                return skuInfo;
            }
        }, executor);
        CompletableFuture<Void> skuPriceCompletableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                //获取实时价格
                BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);
                resultMap.put("price", skuPrice);
            }
        }, executor);
        //判别
        CompletableFuture<Void> categoryViewCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取三级分类
                BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
                resultMap.put("categoryView",categoryView);
            }
        }, executor);
        CompletableFuture<Void> spuSaleAttrListCheckBySkuCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取出售属性和选中状况
                List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
                resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
            }
        }, executor);
        CompletableFuture<Void> skuValueIdsMapCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取产品切换数据
                Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
                resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
            }
        }, executor);
        CompletableFuture<Void> findSpuPosterBySpuIdCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取海报信息
                List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());
                resultMap.put("spuPosterList",spuPosterBySpuId);
            }
        }, executor);
        CompletableFuture<Void> attrListCompletableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                //获取渠道信息
                List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
                //处理数据符合要求 List  Obj  key attrName value attrValue
                List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
                    Map<String, String> map = new HashMap<>();
                    map.put("attrName", baseAttrInfo.getAttrName());
                    map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
                    return map;
                }).collect(Collectors.toList());
                //存储数据
                resultMap.put("skuAttrList", spuAttrList);
            }
        }, executor);
        //多使命组合 -- 一切的异步使命履行完结才是完结
        CompletableFuture.allOf(
                skuInfoCompletableFuture,
                skuPriceCompletableFuture,
                categoryViewCompletableFuture,
                spuSaleAttrListCheckBySkuCompletableFuture,
                skuValueIdsMapCompletableFuture,
                findSpuPosterBySpuIdCompletableFuture,
                attrListCompletableFuture
        ).join();
        return resultMap;
    }
}

根据是否有回来值决议调用哪个API,然后看有没有依赖关系,有好几个都依赖SkuInfo,所以要用skuInfoCompletableFuture去创立。

咱们需求等待每个使命履行完毕之后在回来,所以最后运用allOf办法进行多使命组合。

1.6.3 测验功能是否正常

这种异步作用其实在高并发下环境下测比较好,咱们这里验证功能是否正常就行。

拜访产品概况页:

CompletableFuture异步编排

检查Redis中的数据

CompletableFuture异步编排

能够看到,有6个key被缓存,由于咱们的价格是实时价格,所以一向查的是数据库,千万别用缓存。