并行执行任务

标签: 并行  Java  多线程  线程池

需求

在app列表首页,展示多个item,并有分页;而每个item里后台都会调用一个http请求,判断当前item的状态

分析

为了更好的用体验,无疑需要使用多线程并行处理http请求,而且还需要拿到每个线程的执行结果.
上面的分析,有两个问题需要解决:
1. 如何创建线程池
2. 如何拿到所有线程的执行结果
对于第一个问题,还是很好解决的,使用并发包(java.util.concurrent)下面的ThreadPoolExecutor类创建线程池,阿里巴巴Java开发手册上推荐使用该类创建线程池:https://note.youdao.com/yws/api/personal/file/WEB8ff09313c73335f1ad8ff8697dc18776?method=download&shareKey=122bd8452d30e4d6e9097a3f63205a1e,根据统计,该首页的qps最大为3以及服务器的配置后,线程池创建如下:

 protected static class ThreadFactory {
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(ThreadNumEnum.CORE_POOL_SIZE.getNum(), ThreadNumEnum.MAX_IMUM_POOL_SIZE.getNum(), TimeOutEnum.FiveSecond.getSeconds(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(),new ThreadPoolExecutor.DiscardPolicy());
        private ThreadFactory() {

        }
        public static ExecutorService getThreadPool() {
            return executor;
        }
    }

如何能拿到线程的执行结果呢,传统的Thread无法拿到执行结果,由于run方法无返回值,通过ThreadPoolExecutor类图发现:

https://note.youdao.com/yws/api/personal/file/WEB4ef3858d7f413c8ad003fff211386730?method=download&shareKey=4b734c076ea0b02705e85df1a6be5d1b
继承了AbstractExecutorService、ExecutorService,对ExecutorService中的invokeAll方法产生极大的兴趣,仔细阅读注释,其实这个方法用来并行执行任务:

/**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results
     * when all complete or the timeout expires, whichever happens first.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     * Upon return, tasks that have not completed are cancelled.
     * Note that a <em>completed</em> task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @param <T> the type of the values returned from the tasks
     * @return a list of Futures representing the tasks, in the same
     *         sequential order as produced by the iterator for the
     *         given task list. If the operation did not time out,
     *         each task will have completed. If it did time out, some
     *         of these tasks will not have completed.
     * @throws InterruptedException if interrupted while waiting, in
     *         which case unfinished tasks are cancelled
     * @throws NullPointerException if tasks, any of its elements, or
     *         unit are {@code null}
     * @throws RejectedExecutionException if any task cannot be scheduled
     *         for execution
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

到这里,第二个问题的解决思路已经有了.

编码实现

invokeAll方法的入参分别为任务列表list、超时时间、时间单位,所以首先我们需要创建任务列表:

 List<BasicUserFilter> list = new ArrayList<>();

超时时间为每个FutureTask执行超时时间,这里设置成3s,这里的3s超时时间是针对的所有tasks,而不是单个task的超时时间,如果超时,会取消没有执行完的所有任务,并抛出超时异常,源码如下:

for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }

BasicUserFilter需要实现Callable接口,因为在方法call里能拿到线程的执行结果,
下面就是并行执行任务了:

        ExecutorService executor = ThreadFactory.getThreadPool();
        List<XXX> userFilterDtoList = new ArrayList<>();
        try {
            List<Future<XXX>> futureList = executor.invokeAll(list, TimeOutEnum.FourSecond.getSeconds(), TimeUnit.MILLISECONDS);
            futureList.stream().forEach(p -> {
                try {
                    Future<XXX> filterDtoFuture = p;
                   //拿到线程执行结果  
                   userFilterDtoList.add(filterDtoFuture.get());
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                } catch (ExecutionException e) {
                    logger.error(e.getMessage(), e);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            });
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

关于多线程笔者还有很多需要去学习,上面是一个工作笔记,关于invokeAll的执行流程、神奇的Future模式,感兴趣的可以阅读源码就能找到答案.

更多文章请关注微信公众号:
https://note.youdao.com/yws/api/personal/file/WEB6c0f1a891fdea03ecc708833e0df9b3b?method=download&shareKey=45f2c5279d958cfcdc96b6c92be0ad96

版权声明:本文为BAT_os原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/BAT_os/article/details/80879085

智能推荐

OpenCV学习之路(五)图像的几何变换

在这一章将要学习图像的移动、旋转,仿射变换等 扩展缩放 我们如果想要改变图像的大小,我们就需要对图像进行扩展缩放,opencv提供给我们控制扩展缩放的函数: 参数解释: src:进行扩展缩放的原图片 dst:可以在此处设置缩放因子,也可手动设置尺寸 interpolation:在缩放时我们推荐使用cv2.INTER_AREA, 在扩展时我们推荐使用cv2.INTER_CUBIC(慢) 和 cv2....

2018.8.27

2018.8.27...

HTML 表单元素的基本样式

HTML 表单元素的基本样式 原创 ixygj197875 发布于2018-02-22 17:48:53 阅读数 2296 收藏 更新于2018-05-20 15:35:58 分类专栏: 揭秘 CSS 揭秘 CSS 收起 表单元素主要包括 label、input、textarea、select、datalist、******、progress、meter、output等,以及对表单元素进行分组的 ...

php输出语句

php输出语句 常见的输出语句 echo(): 可以一次输出多个值,多个值之间用逗号分隔。echo是语言结构(language construct),而并不是真正的函数,因此不能作为表达式的一部分使用。 print(): 函数print()打印一个值(它的参数),如果字符串成功显示则返回true,否则返回false。 print_r(): 可以把字符串和数字简单地打印出来,而数组则以括起来的键和值...

工厂模式

简介 常见的实例化对象模式。 用工厂方法替代new操作的一种模式。 当我们使用new操作实例化对象时,调用构造函数完成初始化。若初始化仅是进行赋值等简单的操作,写入构造函数即可。但如果初始化时需要执行一长串复杂的代码,将多个工作装入一个方法,是不妥的。 创建实例与使用实例分离。将创建实例所需的大量初始化工作从基类的构造函数中分离出去。 简单工厂模式、工厂方法模式针对的是一个产品等级结构;而抽象工厂...

猜你喜欢

B1105 Spiral Matrix (画图)

B1105 Spiral Matrix (25分) //第一次只拿了21分 矩阵的长和宽,求最大因子,从sqrt(num)开始枚举. 每次循环一次,s++,t--,d--,r++ 测试点四运行超时,是因为输入一个数字的时候,需要直接输出这个数字。//1分 测试点二运行超时,最后一个数字不必再while循环一次,直接输出即可。//3分 最后一个测试点卡了好久/(ㄒoㄒ)/~~ 螺旋矩阵...

Java基础=>String,StringBuffer与StringBuilder的区别

字符串常量池 什么是字符串常量池? JVM为了减少字符串对象的重复创建,其维护了一块特殊的内存,这段内存被称为字符串常量池(存储在方法区中)。 具体实现 当代码中出现字符串时,JVM首先会对其进行检查。 如果字符串常量池中存在相同内容的字符串对象,如果有,则不再创建,直接返回这个对象的地址返回。 如果字符串常量池中不存在相同内容的字符串对象,则创建一个新的字符串对象并放入常量池,并返回新创建的字符...

java调用其他java项目的Https接口

项目中是这样的: 用户拿出二维码展示,让机器识别二维码, 机器调用开门的后台系统接口, 然后开门的后台系统接口需要调用管理系统的接口, 管理系统需要判断能不能开门.这两个系统是互相独立的.当时使用http调用是没有问题的.当时后来要求必须用https.废话不说,直接代码: 我的项目中调用的是 HttpsUtils.Get(utlStr) 这个接口 开门系统接口如下图:   管理系统的接口...

Hadoop1.2.1全分布式模式配置

一 集群规划 主机名            IP                               安装的软件 &nbs...

Go语言gin框架的安装

尝试安装了一下gin,把遇到的一些小问题来记录一下 安装步骤 首先来看看官方文档,链接点这里 可以看到安装步骤很简单,就一句话 在命令行中输入这句话运行等待就好。 问题来了,因为墙的问题,go get会很慢,所以命令行里面半天什么反应也没有,不要急,慢慢等着就会看到gin-gonic/gin这个目录出现 这个时候命令行还是没有结束,表示还在下一些东西。有的时候可能心急的人就停了(比如我),然后写个...