3 回答

TA贡献1876条经验 获得超5个赞
我们让instrumentList实例以非同步方式逃逸,即访问/操作在列表上发生而没有任何同步。通过将副本传递给其他方法来修复相同的问题就可以了。
以下代码行是发生此问题的代码行
recordSaver.persist(instrumentList); 仪器列表.clear();
在这里,我们允许instrumentList实例以非同步方式逃逸,即它被传递到另一个类(recordSaver.persist),在该类中对其进行操作,但我们也在下一行(在 Aggregator 类中)清除列表,并且所有这些都以非同步方式发生。无法在记录保护程序中预测列表状态......一个非常愚蠢的错误。
我们通过将instrumentList的克隆副本传递给 recordSaver.persist(...) 方法来解决此问题。这样,instrumentList.clear()不会影响 recordSaver 中可用的列表以进行进一步操作。

TA贡献1788条经验 获得超4个赞
看起来这是对不需要的优化的尝试。在这种情况下,越少越好,越简单越好。在下面的代码中,仅使用了两个并发概念:synchronized确保正确更新共享列表并final确保所有线程看到相同的值。
import java.util.ArrayList;
import java.util.List;
public class Aggregator<T> implements Runnable {
private final List<T> instruments = new ArrayList<>();
private final RecordSaver recordSaver;
private final int batchSize;
public Aggregator(RecordSaver recordSaver, int batchSize) {
super();
this.recordSaver = recordSaver;
this.batchSize = batchSize;
}
public synchronized void addAll(List<T> moreInstruments) {
instruments.addAll(moreInstruments);
if (instruments.size() >= batchSize) {
storeInstruments();
}
}
public synchronized void storeInstruments() {
if (instruments.size() > 0) {
// in case recordSaver works async
// recordSaver.persist(new ArrayList<T>(instruments));
// else just:
recordSaver.persist(instruments);
instruments.clear();
}
}
@Override
public void run() {
while (true) {
try { Thread.sleep(1L); } catch (Exception ignored) {
break;
}
storeInstruments();
}
}
class RecordSaver {
void persist(List<?> l) {}
}
}

TA贡献1802条经验 获得超4个赞
我明白了,您parallelStream
在锁中使用了 ConcurrentHashMap。我不了解 Java 8+ 流支持,但快速搜索显示,
ConcurrentHashMap 是一种复杂的数据结构,过去曾经存在并发错误
并行流必须遵守复杂且记录不充分的使用限制
您正在并行流中修改数据
基于这些信息(以及我的直觉驱动的并发错误检测器™),我敢打赌,删除对的调用parallelStream
可能会提高代码的健壮性。此外,正如@Slaw 所提到的,如果所有instrumentMap
使用都已被锁保护,则应该使用普通 HashMap 代替 ConcurrentHashMap 。
当然,由于您没有发布 的代码recordSaver
,因此它也有可能存在错误(不一定是与并发相关的错误)。特别是,您应该确保从持久存储中读取记录的代码(用于检测记录丢失的代码)是安全、正确的,并且与系统的其余部分正确同步(最好使用健壮的, 行业标准的 SQL 数据库)。
添加回答
举报