说明:本系列笔记总结自雷丰阳老师教学项目《谷粒商城》


一、开启线程的四种方法

1. 继承Thread类

Thread类实现了Runnable接口

public class Thread_create_01 {

    @SneakyThrows
    public static void main(String[] args) {
        
        System.out.println("开始");

        thread01 thread01 = new thread01();
        thread01.start();

        System.out.println("结束...");
    }

    public static class thread01 extends Thread{
        @Override
        public void run() {
            System.out.println("进入异步");
            System.out.println("线程号:"+Thread.currentThread().getId());
        }
    }
}
开始
结束...
进入异步
线程号:18

2. 实现Runnable接口

public class Thread_create_01 {

    @SneakyThrows
    public static void main(String[] args) {
        
        System.out.println("开始");

        runnable01 runnable01 = new runnable01();
        new Thread(runnable01).start();
        //new Thread(()-> System.out.println("使用lambda表达式")).start();

        System.out.println("结束...");
    }

    public static class runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println("进入异步");
            System.out.println("线程号:"+Thread.currentThread().getId());
        }
    }
}

3. 实现Callable接口

可以拿到返回值,可以处理异常

public class Thread_create_01 {

    @SneakyThrows
    public static void main(String[] args) {

        System.out.println("开始");

        FutureTask<String> futureTask = new FutureTask<>(new callable01());
        new Thread(futureTask).start();
        String result = futureTask.get();   //阻塞等待整个线程执行完成,获取返回值

        System.out.println("结束...");
    }

    public static class callable01 implements Callable<String>{
        @Override
        public String call() throws Exception {
            System.out.println("进入异步");
            System.out.println("线程号:"+Thread.currentThread().getId());
            return Thread.currentThread().getName();
        }
    }
}

4. 使用线程池

给线程池直接提交任务

public class Thread_create_01 {

    //每个系统中池只有一两个,每个异步任务都交给线程池去(排队)执行
    //这里使用Executors工具类创建
    public static ExecutorService service = Executors.newFixedThreadPool(10);

    @SneakyThrows
    public static void main(String[] args) {

        System.out.println("开始");

        service.execute(new runnable01());  //提交线程到线程池执行,没有返回值
        //Future<?> submit = service.submit(new runnable01());    //有返回值
        //System.out.println(submit);

        System.out.println("结束...");
    }

    public static class runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println("进入异步");
            System.out.println("线程号:"+Thread.currentThread().getId());
        }
    }
}

5. 四种方式区别

1、2不能得到返回值,3可以拿到
1、2、3都不能控制资源
4可以控制资源,性能稳定,业务代码中使用4,将所有的线程异步任务都交给线程池去执行

二、启动线程任务

1. runAsync

没有返回值

public class CompletableFuture_create_02 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    @SneakyThrows(value = {ExecutionException.class, InterruptedException.class})
    public static void main(String[] args) {
        System.out.println("开始");

        CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> {   //没有返回值
            System.out.println("进入异步");
            System.out.println("线程号:" + Thread.currentThread().getId());
        }, service);

        System.out.println("结束...");
    }
}

2. supplyAsync

supplyAsync->whenComplete->exceptionally

有返回值

方法执行完成后的感知

public class CompletableFuture_create_02 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    @SneakyThrows(value = {ExecutionException.class, InterruptedException.class})
    public static void main(String[] args) {
        System.out.println("开始");

        //方法执行完成后的感知
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {    //有返回值
            System.out.println("进入异步2");
            System.out.println("线程号:" + Thread.currentThread().getId());
            //int i = 10/0;
            return Thread.currentThread().getName() + "哈哈哈";
        }, service).whenComplete((res, exception) -> {
            //计算完成时回调方法,没法修改返回数据
            System.out.println("异步任务完成了,结果是:" + res + ";异常是:" + exception);
        }).exceptionally(throwable -> {
            //可以感知异常,同时返回默认值
            return "异常了";
        });
        String result = future1.get();   //阻塞拿到返回结果
        
        System.out.println("结束..." + result);
    }
}

3. supplyAsync

supplyAsync->handle->同上

有返回值

方法执行完成后的处理

public class CompletableFuture_create_02 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    @SneakyThrows(value = {ExecutionException.class, InterruptedException.class})
    public static void main(String[] args) {
        System.out.println("开始");

        //方法执行完成后的处理
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("进入异步");
            System.out.println("线程号:" + Thread.currentThread().getId());
            int i = 10 / 3;
            return Thread.currentThread().getName() + " 哈哈哈 ";
        }, service).handle((res, exception) -> {
            if (res != null) {
                return res + "没有异常的结果";
            }
            if (exception != null) {
                return exception + "有异常";
            }
            return "默认返回";
        });
        String result2 = future2.get();   //拿到返回结果

        System.out.println("结束..." + result2);
    }
}

三、线程串行化

1. thenRunAsync

不能获取到上一步的执行结果,无返回值

2. thenAcceptAsync

能获取到上一步的执行结果,但无返回值

3. thenApplyAsync

能获取到上一步的执行结果,有返回值

测试代码

public class CompletableFuture_then_03 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    @SneakyThrows
    public static void main(String[] args) {
        System.out.println("开始");

        //thenRunAsync 不能获取到上一步的执行结果,无返回值
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("进入异步");
            return 20 / 5;
        }, service).thenRunAsync(() -> {
            System.out.println("任务2启动了。。。");
        }, service);

        //thenAcceptAsync 能获取到上一步的执行结果,但无返回值
        CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("进入异步");
            return 20 / 5;
        }, service).thenAcceptAsync((res) -> {
            //void accept(T t);
            System.out.println("任务2启动了。。。"+"上一步的结果:"+res);
        }, service);

        //thenApplyAsync 能获取到上一步的执行结果,有返回值
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("进入异步");
            return 20 / 5;
        }, service).thenApplyAsync((res) -> {
            //R apply(T t);
            System.out.println("任务2启动了。。。" + "上一步的结果:" + res);
            return "这一步运算的结果:" + res + 10;
        }, service);

        System.out.println("结束..."+future.get()+future2.get()+future3.get());
    }
}

四、任务组合编排

1. 完成任意一个后开启新任务

三种方式

  • runAfterEitherAsync 不感知结果,无返回值
  • acceptEitherAsync 感知结果,无返回值
  • applyToEitherAsync 感知结果,有返回值
public class CompletableFuture_combination_04 {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    @SneakyThrows
    public static void main(String[] args) {
        System.out.println("开始");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务线程1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("任务1结束。。。");
            return i;
        }, executor);

        CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务线程2:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务2结束。。。");
            return i;
        }, executor);

        /*
            两个完成任意一个后开启新任务
            runAfterEitherAsync 不感知结果,无返回值
            acceptEitherAsync   感知结果,无返回值
            applyToEitherAsync   感知结果,有返回值
         */
        future01.runAfterEitherAsync(future02,()->{
            System.out.println("测试2-任务3开始...");
        },executor);

        future01.acceptEitherAsync(future02,(res)->{
            System.out.println("测试2-任务3开始之前的结果..."+res);
        },executor);

        CompletableFuture<String> future000 = future01.applyToEitherAsync(future02, (res) -> {
            System.out.println("测试2-任务3开始之前的结果..." + res);
            return res + "->哈哈";
        }, executor);
        System.out.println(future000.get());

        System.out.println("结束...");
    }
}

2. 两个都完成后开启新任务

三种方式

  • runAfterBothAsync 不能获取到前两步的执行结果,无返回值
  • thenAcceptBothAsync 能获取到前两步的执行结果,无返回值
  • thenCombineAsync 能获取到前两步的执行结果,有返回值
public class CompletableFuture_combination_04 {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    @SneakyThrows
    public static void main(String[] args) {
        System.out.println("开始");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务线程1:" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("任务1结束。。。");
            return i;
        }, executor);

        CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务线程2:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务2结束。。。");
            return i;
        }, executor);

        /*
            两个都完成后开启新任务
         */
        //不能获取到前两步的执行结果,无返回值
        future01.runAfterBothAsync(future02,()->{
            System.out.println("任务3开始...001");
        },executor);

        //能获取到前两步的执行结果,无返回值     void accept(T t, U u);
        future01.thenAcceptBothAsync(future02,(f1,f2)->{
            System.out.println("任务3开始之前的结果...002--"+f1+"-->"+f2);
        },executor);

        //能获取到前两步的执行结果,有返回值     R apply(T t, U u);
        CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            System.out.println("任务3开始之前的结果...003--" + f1 + "-->" + f2);
            return f1 + "<-->" + f2;
        }, executor);
        System.out.println("任务3的返回结果"+future.get());

        System.out.println("结束...");
    }
}

五、多任务组合

  • allOf 等待所有结果完成后才会结束放行
  • anyOf 等待任一结果完成后就会继续执行
public class CompletableFuture_Multiple_05 {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    @SneakyThrows
    public static void main(String[] args) {
        System.out.println("开始");

        CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品的图片信息");
            return "hello.jpg";
        }, executor);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("查询商品的属性");
            return "黑色_256G";
        }, executor);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("查询商品的描述");
            return "苹果13";
        }, executor);

//        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
//        allOf.get();    //等待所有结果完成后才会结束放行

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
        anyOf.get();    //等待任一结果完成后就会继续执行

        System.out.println("结束...");
    }
}

六、谷粒业务

服务地址:

配置线程池:

@Configuration
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
public class MyThreadConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
        return new ThreadPoolExecutor(pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    }
}

从配置文件动态读取线程池配置:

@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolConfigProperties {
    private Integer coreSize;
    private Integer maxSize;
    private Integer keepAliveTime;
}

查询商品详情页业务代码:

@SneakyThrows
@Override
public SkuItemVo item(Long skuId) {

    SkuItemVo skuItemVo = new SkuItemVo();
    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        //1.sku基本信息获取 pms_sku_info
        SkuInfoEntity info = getById(skuId);
        skuItemVo.setInfo(info);
        return info;
    }, executor);
    CompletableFuture<Void> saleFuture = infoFuture.thenAcceptAsync((res) -> {
        //spu的销售属性组合
        List<SkuItemSaleAttrsVo> saleAttrsVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
        skuItemVo.setSaleAttr(saleAttrsVos);
    }, executor);
    CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
        //spu的详细介绍
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesp(spuInfoDescEntity);
    }, executor);
    CompletableFuture<Void> baseFuture = infoFuture.thenAcceptAsync((res) -> {
        //规格参数
        List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
        skuItemVo.setGroupAttrs(attrGroupVos);
    }, executor);
    CompletableFuture<Void> imgFuture = CompletableFuture.runAsync(() -> {
        //sku图片信息 pms_sku_images
        List<SkuImagesEntity> images = imagesService.getImagesBySkuId(skuId);
        skuItemVo.setImagesEntites(images);
    }, executor);

    //等待所有任务都完成
    CompletableFuture.allOf(saleFuture,descFuture,baseFuture,imgFuture).get();
    return skuItemVo;
}

七、线程池详解

见:Java多线程:彻底搞懂线程池