复制代码

为懒人提供无限可能,生命不息,code不止

人类感性的情绪,让我们知难行难
我思故我在
日拱一卒,功不唐捐
  • 首页
  • 前端
  • 后台
  • 数据库
  • 运维
  • 资源下载
  • 实用工具
  • 接口文档工具
  • 登录
  • 注册

JAVA代码

【原创】商品品态调整线程池案例

作者: whooyun发表于: 2023-04-11 17:44

    /**
     * 当需要调整品态的SKU比较多时,使用线程池提升处理效率
     *
     * @param acvGdsAdjustStateList
    */
    private void orgGdsAdjustState(List<AcvGdsAdjustState> acvGdsAdjustStateList) {
        if (!CollectionUtils.isEmpty(acvGdsAdjustStateList)) {
            ExecutorService executor = new ThreadPoolExecutor(5, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());

            log.info("品态调整,线程池创建成功:{}", executor);
            acvGdsAdjustStateList.forEach(item -> {
                AsyncAcvGdsAdjustStateGoodsTask task = new AsyncAcvGdsAdjustStateGoodsTask();
                task.setAcvGdsAdjustState(item);
                executor.execute(task);
            });
            executor.shutdown();
            try {
                if (!executor.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
                    // 超时后强制关闭所有线程
                    executor.shutdownNow();
                    log.info("==>品态调整,线程池已关闭");
                }
            } catch (InterruptedException e) {
                log.info("==>品态调整,线程池关闭异常:{}", e.getMessage());
                e.printStackTrace();
            }
        }

	}

==============================================

//创建的对象无法被自动注入,只能通过spring上线文去拿
@Slf4j
public class AsyncAcvGdsAdjustStateGoodsExecutor implements Runnable{


    //处理state的原子性和可见性问题
    private final AtomicReference<AcvGdsAdjustState> stateRef = new AtomicReference<>();;
    public void setAcvGdsAdjustState(AcvGdsAdjustState state) {
        stateRef.set(state);
    }

    //业务无线程安全需求,所以run内的方法无需处理
    @Override
    public void run() {
        AcvGdsAdjustStateMapper  acvGdsAdjustStateMapper=  ApplicationContextProvider.getBeanByClass(AcvGdsAdjustStateMapper.class);

        AcvGdsAdjustStateGoodsMapper  acvGdsAdjustStateGoodsMapper=  ApplicationContextProvider.getBeanByClass(AcvGdsAdjustStateGoodsMapper.class);

        AcvOrgGroupMapper  acvOrgGroupMapper=  ApplicationContextProvider.getBeanByClass(AcvOrgGroupMapper.class);


        //log.debug("==> 需要调整品态的机构单据:{}", item);
        AcvGdsAdjustState item = stateRef.get();
        log.info("当前线程:"+Thread.currentThread().getName()+",拿到的参数:"+item);
        log.info("---------------------------------");
        QueryWrapper orgGroupQuery = new QueryWrapper();
        orgGroupQuery.eq("biz_uid", item.getBizUid());
        orgGroupQuery.eq("corp_id", item.getCorpId());
        List<AcvOrgGroup> acvOrgGroupList = acvOrgGroupMapper.selectList(orgGroupQuery);
        log.info("需要更新的机构数量:{}",acvOrgGroupList);
        List<Integer> orgIdList = new ArrayList<>();
        acvOrgGroupList.forEach(i -> {
            orgIdList.add(i.getGroupOrgId());
        });
        QueryWrapper stateGoodsQuery = new QueryWrapper();
        stateGoodsQuery.eq("bill_id", item.getId());
        stateGoodsQuery.eq("corp_id", item.getCorpId());
        List<AcvGdsAdjustStateGoods> stateGoodsList = acvGdsAdjustStateGoodsMapper.selectList(stateGoodsQuery);
        log.info("需要更新的商品数量获取完成");
        stateGoodsList.forEach(i -> {
            this.updateOrgGoodsState(Long.valueOf(i.getNewStatusDic()), i.getGoodsId(), orgIdList);
        });
        log.info("更新商品状态完成");
        AcvGdsAdjustState acvGdsAdjustState = new AcvGdsAdjustState();
        acvGdsAdjustState.setId(item.getId());
        ////状态,代码类型:1,代码值:1-草稿,2-审核中,3-待执行,4-已生效,5-已作废,6-已终止
        acvGdsAdjustState.setBillStatusDic(CommonConstant.MATH_BYTE_4);
        acvGdsAdjustState.setModifiedDtm(LocalDateTime.now());
        UpdateWrapper updateWrapper= new UpdateWrapper();
        updateWrapper.eq("id", acvGdsAdjustState.getId());
        updateWrapper.eq("corp_id", item.getCorpId());
        acvGdsAdjustStateMapper.update(acvGdsAdjustState, updateWrapper);
        log.info("==>品态调整单:{},处理完毕", acvGdsAdjustState);
        log.info("当前线程结束:"+Thread.currentThread().getName()+",拿到的参数:"+item);
    }


    /**
     * 修改机构商品状态
     *
     * @param stateId
     * @param goodsId
     * @param idList
     */
    private void updateOrgGoodsState(final long stateId, final long goodsId, final List idList) {

        AcvGdsStateMapper  acvGdsStateMapper=  ApplicationContextProvider.getBeanByClass(AcvGdsStateMapper.class);

        AcvOrgGoodsMapper  acvOrgGoodsMapper=  ApplicationContextProvider.getBeanByClass(AcvOrgGoodsMapper.class);

        AcvGdsState state = acvGdsStateMapper.selectById(stateId);
        log.info("==>获取品态业务数据:{}", state);
        UpdateWrapper stateUpdateWrapper = new UpdateWrapper();
      /*  stateUpdateWrapper.set("state_no", state.getStateNo());
        stateUpdateWrapper.set("life_dic", state.getLifeDic());
        stateUpdateWrapper.set("modified_dtm",LocalDateTime.now());*/
        stateUpdateWrapper.eq("goods_id", goodsId);
        stateUpdateWrapper.eq("is_deleted", CommonConstant.MATH_1);
        stateUpdateWrapper.in("goods_org_id", idList);
        stateUpdateWrapper.eq("corp_id", state.getCorpId());
        AcvOrgGoods acvOrgGoods = new AcvOrgGoods();
        acvOrgGoods.setStateNo(state.getStateNo());
        acvOrgGoods.setLifeDic(state.getLifeDic().byteValue());
        acvOrgGoods.setModifiedDtm(LocalDateTime.now());
        log.info("==>更新机构商品品态:{}", acvOrgGoods);
        int resultCount = acvOrgGoodsMapper.update(acvOrgGoods, stateUpdateWrapper);
        log.info("==>更新机构商品:{},品态完成,修改条数:{}",goodsId,resultCount);
    }
}
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    private static ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        if (context == null) {
            throw new IllegalStateException("ApplicationContext not initialized yet.");
        }
        return context;
    }

    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }

    public static Object getBean(String beanName) {
        return context.getBean(beanName);
    }

}

=====================生产环境出现了多个线程池,且线程池内存无法被GVM回收的情况

原因
(一)AsyncAcvGdsAdjustStateGoodsExecutor 在执行过程中抛出未捕获的异常,线程可能不会正常结束,从而阻止线程池关闭
(二)orgGdsAdjustState方法被多个线程同时调用,会导致同时创建多个线程池实例

想法初衷:
1、因为是被xxl-job调用,且是串行,所以默认以为线程池只会有一个
2、线程池代码缺陷,知识层面不够,代码缺陷

==========================优化方案===============================

使用单例线程池:创建一个全局的线程池实例,在应用程序启动时初始化,并在应用程序关闭时统一关闭。这样可以避免每次方法调用时创建和销毁线程池的开销。
正确处理异常:确保AsyncAcvGdsAdjustStateGoodsExecutor任务能够正确处理异常,并且在异常发生时能够优雅地结束。
恢复中断状态:在捕获InterruptedException后,调用Thread.currentThread().interrupt()来恢复中断状态。
增加超时时间:根据任务的实际执行时间,适当增加awaitTermination的超时时间,以确保大多数情况下线程池可以正常关闭

import java.util.concurrent.*;

public class ThreadPoolManager {

    // 使用volatile关键字确保可见性和禁止指令重排
    private volatile static ExecutorService executor;

    private ThreadPoolManager() {
        // 私有构造函数防止外部实例化
    }

    public static ExecutorService getExecutor() {
        // 第一次检查,如果已经初始化则直接返回
        if (executor == null) {
            synchronized (ThreadPoolManager.class) {
                // 第二次检查,确保只有一个线程能够进入这里初始化线程池
                if (executor == null) {
                    executor = new ThreadPoolExecutor(
                            5, 10, 5000L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<>(100),
                            new ThreadPoolExecutor.CallerRunsPolicy()
                    );
                }
            }
        }
        return executor;
    }
}

// 如果使用Spring框架,可以将ThreadPoolManager注册为Bean
@Configuration
public class AppConfig {

    @Bean(destroyMethod = "shutdown")
    public ExecutorService threadPoolExecutor() {
        return new ThreadPoolExecutor(
                5, 10, 5000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(100),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}