1 回答
TA贡献1847条经验 获得超11个赞
这些参数决定 将使用多少个线程。这就是为什么默认情况下,可用 CPU 内核计数是:parallelism
ForkJoinPool
parallelism
Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())
在你的情况下,装瓶工应该检查文件是否存在并将其上传到S3。这里的时间将取决于至少几个因素:CPU,网卡和驱动程序,操作系统,其他。在您的案例中,S3 网络操作时间似乎没有 CPU 限制,因为您正在通过创建更多模拟工作线程来观察改进,也许网络请求由操作系统排队。
的正确值因工作负荷类型而异。由于上下文切换的负面影响,CPU 密集型工作流最好使用默认值等于 CPU 内核。像您这样的非CPU密集型工作负载可以通过更多的工作线程来加速,假设工作负载不会阻塞CPU,例如通过忙于等待。parallelism
parallelism
中没有一个单一的理想值。parallelism
ForkJoinPool
由于您所有有用的建议和解释,我设法减少到8秒。
由于瓶颈是上传到 aws s3,并且您提到了 aws 上的非阻塞 API,因此经过一些研究,我发现类 TransferManager 包含非阻塞上传。
转移管理器类
因此,我没有使用分叉连接池来增加线程数,而是保留了简单的并行流:
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});
上传到S3方法改变了一点,我没有使用亚马逊S3,而是使用了转移管理器:
public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
if (s3client.doesObjectExist(bucketName, fileName)) {
throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
}
InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(fileContent.getBytes().length);
return transferManager.upload(bucketName, fileName, targetStream, metadata);
}
这样,当调用上载时,它不会等待它完成,从而允许处理另一个 DTO。处理完所有 DTO 后,我会检查其上传状态以查看可能的错误(在第一个 Each 之外)
添加回答
举报