为了账号安全,请及时绑定邮箱和手机立即绑定

单生产者多消费者模型中,如果一个消费者出现异常,怎么通知其他消费者停止处理任务?

单生产者多消费者模型中,如果一个消费者出现异常,怎么通知其他消费者停止处理任务?

MM们 2019-03-01 10:34:10
如题,在处理海量数据的时候,生产者生产数据的速度远大于消费者,故为了平衡处理能力,增加多个消费者进行处理,但是这里有个问题就是,如果这批数据一旦有那个消费者处理出现问题,其他消费者要停止处理,等待下一批数据,有什么办法能做到吗?
查看完整描述

3 回答

?
小唯快跑啊

TA贡献1863条经验 获得超2个赞

生产者类DataProducer

import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.BlockingQueue;

public class DataProducer implements Observer, Runnable {

    private BlockingQueue<String> blockingQueue;

    private boolean handling = true;

    public DataProducer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (handling) {
            try
            {
                String str =  handleData();
                blockingQueue.offer(str);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private String handleData() {
        return null;
    }

    @Override
    public void update(Observable o, Object arg) {
        if (arg.toString().equals("stopHandling")) {
            System.out.println("stopHandling data");
            handling = false;
            blockingQueue.clear();
        }
    }
}

消费者类DataConsumer

import java.util.Observable;
import java.util.concurrent.BlockingQueue;

public class DataConsumer extends Observable implements Runnable {

    private BlockingQueue<String> blockingQueue;

    public DataConsumer(BlockingQueue<String> blockingDeque) {
        this.blockingQueue = blockingDeque;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String str = blockingQueue.take();
                handleData(str);
            } catch (Exception e) {
                notifyObservers("stopHandling");
            }
        }

    }

    private void handleData(String str) {
    }

    @Override
    public void notifyObservers(Object arg) {
        super.setChanged();

        super.notifyObservers(arg);

    }
}

主线程启动任务,测试程序

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;

public class TestData {
    private static final ExecutorService executor = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new LinkedTransferQueue<>();
        DataProducer dataProducer = new DataProducer(blockingQueue);
        executor.execute(dataProducer);
        for (int i = 0; i < 4; i++) {
            executor.execute(new DataConsumer(blockingQueue));
        }
    }
}
查看完整回答
反对 回复 2019-03-01
?
温温酱

TA贡献1752条经验 获得超4个赞

在传递给消费者的参数里加上状态,一但某个消费者处理出问题,就修改其状态标记问题,其它消费者检查到问题标记之后就不继续处理了(但是已经在处理过程中的仍然会继续,除非过程中多次检查状态)

查看完整回答
反对 回复 2019-03-01
?
大话西游666

TA贡献1817条经验 获得超14个赞

可以在队列里面塞一个‘毒丸’ 元素 ,消费者检测到该对象就停止消费

查看完整回答
反对 回复 2019-03-01
  • 3 回答
  • 0 关注
  • 771 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信