作者: whooyun发表于: 2025-07-08 18:44
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderMapper orderMapper;
// 使用Java 21虚拟线程池
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
public List<Order> batchConcurrentQuery(List<String> orderNumberList) {
// 1. 拆分订单号列表(每50个一组)
List<List<String>> chunks = partitionList(orderNumberList, 50);
List<Order> allResults = new ArrayList<>();
int batchSize = 3; // 每批并发数
for (int i = 0; i < chunks.size(); i += batchSize) {
// 2. 获取当前批次(最多3个查询)
int end = Math.min(i + batchSize, chunks.size());
List<List<String>> currentBatch = chunks.subList(i, end);
// 3. 并发执行当前批次的查询
List<CompletableFuture<List<Order>>> futures = new ArrayList<>();
for (List<String> chunk : currentBatch) {
futures.add(CompletableFuture.supplyAsync(
() -> queryDatabase(chunk), executor
));
}
// 4. 等待当前批次所有查询完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 5. 收集结果
for (CompletableFuture<List<Order>> future : futures) {
allResults.addAll(future.join());
}
// 6. 批次间隔处理(最后一组不等待)
if (end < chunks.size()) {
try {
TimeUnit.SECONDS.sleep(1); // 等待1秒
System.out.println("批次完成,等待1秒后继续...");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
return allResults;
}
// 数据库查询方法
private List<Order> queryDatabase(List<String> orderNumbers) {
System.out.println("执行查询,数量: " + orderNumbers.size() +
" | 线程: " + Thread.currentThread());
if (orderNumbers.isEmpty()) {
return Collections.emptyList();
}
QueryWrapper<Order> queryWrapper = new QueryWrapper<>();
queryWrapper.in("order_number", orderNumbers);
return orderMapper.selectList(queryWrapper);
}
// 手动实现列表拆分(避免Guava依赖)
private <T> List<List<T>> partitionList(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}
=======================
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderMapper orderMapper;
// 使用虚拟线程池(Java 21特性)
private final ExecutorService virtualThreadExecutor =
Executors.newVirtualThreadPerTaskExecutor();
public ListconcurrentQuery(ListorderNumberList) {
// 1. 拆分订单号列表(每50个一组)
Listchunks = Lists.partition(orderNumberList, 50);
// 2. 创建信号量控制并发度(每秒3个查询)
Semaphore rateLimiter = new Semaphore(3);
// 3. 提交查询任务
List