如题,在处理海量数据的时候,生产者生产数据的速度远大于消费者,故为了平衡处理能力,增加多个消费者进行处理,但是这里有个问题就是,如果这批数据一旦有那个消费者处理出现问题,其他消费者要停止处理,等待下一批数据,有什么办法能做到吗?
3 回答
小唯快跑啊
TA贡献1863条经验 获得超2个赞
生产者类DataProducer
import java.util.Observable; import java.util.Observer; import java.util.concurrent.BlockingQueue; public class DataProducer implements Observer, Runnable { private BlockingQueue<String> blockingQueue; private boolean handling = true; public DataProducer(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (handling) { try { String str = handleData(); blockingQueue.offer(str); } catch (Exception e) { e.printStackTrace(); } } } private String handleData() { return null; } @Override public void update(Observable o, Object arg) { if (arg.toString().equals("stopHandling")) { System.out.println("stopHandling data"); handling = false; blockingQueue.clear(); } } }
消费者类DataConsumer
import java.util.Observable; import java.util.concurrent.BlockingQueue; public class DataConsumer extends Observable implements Runnable { private BlockingQueue<String> blockingQueue; public DataConsumer(BlockingQueue<String> blockingDeque) { this.blockingQueue = blockingDeque; } @Override public void run() { while (true) { try { String str = blockingQueue.take(); handleData(str); } catch (Exception e) { notifyObservers("stopHandling"); } } } private void handleData(String str) { } @Override public void notifyObservers(Object arg) { super.setChanged(); super.notifyObservers(arg); } }
主线程启动任务,测试程序
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedTransferQueue; public class TestData { private static final ExecutorService executor = Executors.newFixedThreadPool(5); public static void main(String[] args) { BlockingQueue<String> blockingQueue = new LinkedTransferQueue<>(); DataProducer dataProducer = new DataProducer(blockingQueue); executor.execute(dataProducer); for (int i = 0; i < 4; i++) { executor.execute(new DataConsumer(blockingQueue)); } } }
温温酱
TA贡献1752条经验 获得超4个赞
在传递给消费者的参数里加上状态,一但某个消费者处理出问题,就修改其状态标记问题,其它消费者检查到问题标记之后就不继续处理了(但是已经在处理过程中的仍然会继续,除非过程中多次检查状态)
添加回答
举报
0/150
提交
取消