大型商城:商城业务-异步&线程池

线程回顾

1.初始化线程的四种方式

1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池

通过如下两种方式初始化线程池
Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, workQueue, threadFactory, handler);

通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。

 

线程池详解

线程池的七大参数

  1. corePoolSize:[5] 核心线程数(一直存在,除非allowCoreTreadTimeOut);线程池,创建好以后就准备就绪
  2. maximumPoolSize:[200] 最大的线程数量,控制资源
  3. keepAliveTime:存活时间,如果当前的线程数量大于core数量,释放空闲的线程(不释放核心线程),只要线程空闲大于指定的keepAliveTime
  4. unit:时间单位
  5. workQueue:阻塞队列,如果任务很多,就会将目前多的任务放在队列里面,只要线程空闲,就回去队列里面取出新的任务继续执行!
  6. threadFactory:线程的创建工程
  7. handler:如果队列满了,就按照我们指定的拒绝策略拒绝执行任务

运行流程:

1、线程池创建,准备好 core 数量的核心线程,准备接受任务

2、新的任务进来,用 core 准备好的空闲线程执行。

(1) 、core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队
列获取任务执行

(2) 、阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量

(3) 、max 都执行好了。Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自
动销毁。最终保持到 core 大小

(4) 、如果线程数开到了 max 的数量,还有新任务进来,就会使用 reject 指定的拒绝策
略进行处理

3、所有的线程创建都是由指定的 factory 创建的。

线程池例题:

一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在 70 个被安排上了。剩下 30 个默认拒绝策略。

常见的4种线程池

newCachedThreadPool(core是0,所有线程都可回收)

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newFixedThreadPool(固定大小,core=max,都不可回收)

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool(定时任务的线程池)

创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor(单线程的线程池,后台从队列里面获取任务,挨个执行)

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

开发中为什么使用线程池

降低资源的消耗
 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
 提高响应速度
 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务
的状态,当任务来时无需创建新的线程就能执行
 提高线程的可管理性
 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来
的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使
用线程池进行统一分配

CompletableFuture 异步编排

业务场景:
查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间

假如商品详情页的每个查询,需要如下标注的时间才能完成那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

1.创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;

public class ThreadTest {
    public static ExecutorService exexutor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
            System.out.println("当先线程:"+Thread.currentThread().getId());
            int i = 10/0;
            System.out.println("运行结果:"+i);
            return i;
        },exexutor).whenComplete((res,excption)->{
            //虽然能得到异常信息,但是没法修改返回数据
            System.out.println("异步任务完成,结果是:"+res+";异常是:"+excption);
        }).exceptionally(throwable -> {
            //可以感知异常。同时返回默认值
            return 10;
        });
        Integer integer = future.get();
        System.out.println("结束"+integer);
    }
}

计算完成时回调方法

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况

public class ThreadTest {
    public static ExecutorService exexutor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
            System.out.println("当先线程:"+Thread.currentThread().getId());
            int i = 10/0;
            System.out.println("运行结果:"+i);
            return i;
        },exexutor).whenComplete((res,excption)->{
            //虽然能得到异常信息,但是没法修改返回数据
            System.out.println("异步任务完成,结果是:"+res+";异常是:"+excption);
        }).exceptionally(throwable -> {
            //可以感知异常。同时返回默认值
            return 10;
        });
        Integer integer = future.get();
        System.out.println("结束"+integer);
    }
}

whenComplete 和 whenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

handle方法

和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

线程串行化方法

  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
  • thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
  • thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行

thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

  • thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
  • thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
  • runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。

两任务组合 - 一个完成

当两个任务中,任意一个 future 任务完成的时候,执行任务。

  • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
  • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  • runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。

多任务组合

allOf:等待所有任务完成
anyOf:只要有一个任务完成

假设有三个线程:futureImg,futureAttr,futureDesc

allof等三个线程全部执行完毕再往下进行,,anyOf等其中一个完成就往下进行!

 

阅读剩余
THE END