zoukankan      html  css  js  c++  java
  • 异步线程池如何做同步业务

    需求描述

    简化下,数据源是kafka消息,目标是将消息按主键(一种可区分消息重复方法)插入pg表(有则更新,无则插入);

    源消息确定有重复的,尽量保证入库速度;另外,producer端已经保证key相同的消息入到了同一个分区,数据量8000万消息

    业务分析

    首先key相同的消息在同一个分区,那么consumer可以按分区个数,起多个实例,一一对应;

    对于某一个consumer,单批消费下来N条消息,将N条数据拆分成K份,提交给线程池异步执行upsert入库;

    这里要考虑的一个问题是,如果上一批消息的某些线程执行upsert时尚未结束,下一次poll消息执行入库又开始了,

    可能发生的问题是,两个批次中,有相同key的消息,那么将会出现>=2个线程对同一条记录执行update,产生死锁,

    实际在生产环境确实发生了,和服务器性能关系较大,性能稍差的服务器,入库速度显著慢一些,更容易发生

    因此,这个地方需要设计成:poll一批消息之前,必须阻塞,等待上一批消息完全处理完成

    然后因为一次插入N(假设消费N完全不重复,N可配置,可以是很大)条数据入库耗时长,改进为均分后用多个线程并行入库

    线程使用的是初始化好的线程池

    项目框架

    springboot+maven+postgres

    需要开启的功能:异步(启动类@EnableAsync)线程池(配置注入方式)

    关键步骤代码

    一、注入一个线程池,这里可以用spring自带的,也可以用java原生的,spring自带的,类名多一个单词task:ThreadPoolTaskExecutor

    其他的注解的作用,可配置的参数,根据需要查阅配置

    @Configuration
    public class ExecutorConfig {
    
        @Value("${thread.pool.core.pool.size}")
        private int corePoolSize;
        @Value("${thread.pool.max.pool.size}")
        private int maxPoolSize;
        @Value("${thread.pool.queue.capacity}")
        private int queueCapacity;
        @Value("${thread.pool.name.prefix}")
        private String threadNamePrefix;
    
         /**
          * spring自带的
          */
        @Bean(name = "springThreadPool")
        public Executor springThreadPoolConfig() {       
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();       
            executor.setCorePoolSize(corePoolSize);       
            executor.setMaxPoolSize(maxPoolSize);       
            executor.setQueueCapacity(queueCapacity);
            executor.setThreadNamePrefix(threadNamePrefix);
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());       
            executor.initialize();
            return executor;
        }
    
        /**
         * java原生的
         */
        @Bean(name = "javaThreadPool")
        public Executor javaThreadPoolConfig() {
            return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        }
    
    }

    二、将需要执行入库的业务逻辑异步放入线程池执行

    @Service
    public class PhotoServiceImpl{
    
        @Autowired
        private PhotoDao photoDao;
    
        /**
         * 这里使用异步,并定义异步执行的线程池标志
         * 返回必须是Future<Void> 或 Future<T>
         * 前者仅表示异步线程执行后,需要拿到执行结果
         */
        @Override
        @Async("springThreadPool")
        public Future<Void> batchUpsert(List<PhotoModel> list) {
            if (!CollectionUtils.isEmpty(list)) {
                photoDao.batchUpsert(list);
                log.debug("batch upsert photo,size:{}", list.size());
            }
            return new AsyncResult<>(null);
        }
    }

    三、消费下来的消息预处理:包括内存中去重覆盖,分组,入口方法是下面的execute(),是一个List<Future<Void>>

    它的size取决于这个批次分成了多少组(每组都是一个线程去执行入库,在for中)

    @Service
    public class BatchProcessPhotoService {
    
        //单个线程最大处理记录数
        @Value("${batch.upsert.max.record.size}")
        private int batchUpsertMaxRecordSize;
        //批量提交开启的线程数
        @Value("${batch.upsert.thread.size}")
        private int threadSize;
    
        @Autowired
        private PhotoService photoService;
    
        public List<Future<Void>> execute(List<PhotoModel> list) {
            log.info("batch upsert dw_human_photo start");
            List<List<PhotoModel>> splitList = split(list);
            List<Future<Void>> futures = new ArrayList<>();
            for (List<PhotoModel> everyList : splitList) {
                Future<Void> future = photoService.batchUpsert(everyList);
                futures.add(future);
            }
            return futures;
        }
    
        /**
         * 将去重后的数据分段:总数/线程数
         */
        private List<List<PhotoModel>> split(List<PhotoModel> list) {
            List<PhotoModel> distinctList = coverDuplicate(list);
            int start, end;
            int total = distinctList.size();
            List<List<PhotoModel>> splitList = new ArrayList<>();
            if (total <= batchUpsertMaxRecordSize) {
                splitList.add(distinctList);
                return splitList;
            }
            for (int i = 0; i < threadSize; i++) {
                start = total / threadSize * i;
                end = total / threadSize * (i + 1);
                if (i == threadSize - 1) {
                    end = total;
                }
                splitList.add(distinctList.subList(start, end));
            }
            return splitList;
        }
    
        /**
         * 将personId重复的,用后面的覆盖前面的
         */
        private List<PhotoModel> coverDuplicate(List<PhotoModel> sourceList) {
            if (CollectionUtils.isEmpty(sourceList)) {
                return new ArrayList<>();
            }
            List<PhotoModel> distinctList = sourceList.stream().collect(
                    Collectors.toMap(PhotoModel::getHumanPicId, Function.identity(), (e1, e2) -> e2)
            ).values().stream().collect(Collectors.toList());
            return distinctList;
        }
    }

    四、consumer消费消息

        @KafkaListener(id = "xxxKafka", groupId = "${config.kafka.humanGroupId}", topics = {"${config.kafka.humanTopic}"}, containerFactory = "batchFactoryHuman")
        public void xxxListener(List<ConsumerRecord<?, String>> list, Acknowledgment ack) {
            log.info("consume data,size:{},offset:{}", list.size(), list.get(0).offset());
            List<HumanInfoKafkaModel> humanKafkaList = buildHumanKafkaList(list);
            if (CollectionUtils.isNotEmpty(humanKafkaList)) {
                ......一大堆消息解析和处理逻辑,返回photoListForUpsert
                try {                             
                    List<Future<Void>> futures = batchProcessPhotoService.execute(photoListForUpsert);               
                    if (CollectionUtils.isNotEmpty(futures)) {
                        for (Future<Void> future : futures) {
                           //这里需要调用get()同步起来
                           future.get();
                        }
                    }                
                } catch (Exception e) {
                    //photoListForUpsert异常处理,入库,发到另一个topic单独一条条消费均可
                }
            }
            //全部逻辑执行正常,全部数据入库完毕,提交offset
            ack.acknowledge();
        }

    五、配置Kafka消费工厂,可配置参数可查阅配置

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        @Autowired
        private Config config;
    
        @Value("${kafka.max.poll.records}")
        private String maxPollRecords;
    
        @Value("${kafka.max.partition.fetch.bytes}")
        private String maxPartitionFetchBytes;
    
        @Value("${kafka.consumer.thread.size}")
        private Integer threadRecord;
    
    
        /**
         * POOL_PHOTO_IMPORT_TOPIC导入 kafka属性配置
         */
        public Map<String, Object> consumerConfigsPhoto() {
            Map<String, Object> props = Maps.newHashMap();
            props.put(KafkaConstants.KAFKA_CONNECT_BROKERS, config.getKafka().getPhotoBroker());
            props.put(KafkaConstants.KAFKA_CONNECT_GROUP_ID, config.getKafka().getPhotoGroupId());
            buildProp(props, true);
            return props;
        }
    
        /**
         * POOL_PHOTO_IMPORT_TOPIC导入  消费工厂
         * containerFactory = "batchFactoryPhoto"
         */
        @Bean
        public KafkaListenerContainerFactory<?> batchFactoryPhoto() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigsPhoto()));
            // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
            factory.setBatchListener(true);
            //消费线程数量设置,这里采用的是单线程消费+后续worker线程池处理
            factory.setConcurrency(threadRecord);
            factory.setAutoStartup(Boolean.valueOf(config.getKafka().getPhotoSwitch()));
            factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
            return factory;
        }
    }

    总结

    consumer端,如果是要保证顺序,最好是多实例(每个实例默认单线程消费)

    消费下来的消息需要做预处理,可以先内部去重覆盖,然后分组

    每个组一个线程,给线程池统一处理;异步执行需要指定注入的线程池名,并且要有返回

    在for循环中必须调用Future的get方法同步起来,保证每一批消息彻底处理完毕再接着消费

  • 相关阅读:
    CVPR-2021-Papers
    caffe中使用多个GPU的方法
    大道至简!深度解读CVPR2021论文RepVGG
    195上的rknn
    RKNN1.6下载路径
    基于深度学习的2D和3D仿射变换配准
    涵盖18+ SOTA GAN实现,开源工程StudioGAN火了
    基于opencv实战眼睛控制鼠标
    S7-200SMART PLC与变频器MODBUS RTU通讯与SMART LINE系列屏控制(案例三)
    Java异常处理学习
  • 原文地址:https://www.cnblogs.com/yb38156/p/14605676.html
Copyright © 2011-2022 走看看