为了账号安全,请及时绑定邮箱和手机立即绑定

生产者-消费者。消费者等待所有生产者完成,毒丸

生产者-消费者。消费者等待所有生产者完成,毒丸

POPMUISE 2021-10-13 15:44:56
我有两个生产者和一个消费者:public class Main {    public static void main(String[] args) throws InterruptedException {        final BlockingQueue<Integer> integersQueue = new ArrayBlockingQueue<>(20);        final Thread producer = new Thread(() -> {            for (int i = 0; i < 10; i++) {                try {                    integersQueue.put(i);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        final Thread thread = new Thread(() -> {            while (integersQueue.size() > 0) { //Wait while all producers work                try {                    System.out.println("GET: " + integersQueue.take());                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        Thread thread1 = new Thread(producer);        Thread thread2 = new Thread(producer);        thread1.start();        thread2.start();        Thread.sleep(5000);        thread.start();    }}如果所有生产者都完成了,我正试图找到一种方法来阻止消费者。有多个生产者,但只有一个消费者。我需要一些毒丸,但如何从不同的生产商指定它?我发现了这个:https : //docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html但我不明白如何应用它?
查看完整描述

2 回答

?
饮歌长啸

TA贡献1951条经验 获得超3个赞

有人必须知道有多少生产者。

如果消费者知道,那么每个生产者在完成后发送一个毒丸,消费者计算毒丸,当计数等于生产者数量时结束。

如果生产者知道,用一个AtomicInteger计数,最后一个生产者送毒丸

如果只main知道,即它是“控制器”,那么它需要等待所有生产者线程结束,使用join(),然后main发送毒丸


查看完整回答
反对 回复 2021-10-13
?
胡说叔叔

TA贡献1804条经验 获得超8个赞

我如何处理毒丸的类型安全变体:


public sealed interface BaseMessage {


    final class ValidMessage<T> implements BaseMessage {


        @Nonnull

        private final T value;



        public ValidMessage(@Nonnull T value) {

            this.value = value;

        }


        @Nonnull

        public T getValue() {

            return value;

        }


        @Override

        public boolean equals(Object o) {

            if (this == o) return true;

            if (o == null || getClass() != o.getClass()) return false;

            ValidMessage<?> that = (ValidMessage<?>) o;

            return value.equals(that.value);

        }


        @Override

        public int hashCode() {

            return Objects.hash(value);

        }


        @Override

        public String toString() {

            return "ValidMessage{value=%s}".formatted(value);

        }

    }


    final class PoisonedMessage implements BaseMessage {


        public static final PoisonedMessage INSTANCE = new PoisonedMessage();



        private PoisonedMessage() {

        }


        @Override

        public String toString() {

            return "PoisonedMessage{}";

        }

    }

}


public class Producer implements Callable<Void> {


    @Nonnull

    private final BlockingQueue<BaseMessage> messages;


    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {

        this.messages = messages;

    }


    @Override

    public Void call() throws Exception {

        messages.put(new BaseMessage.ValidMessage<>(1));

        messages.put(new BaseMessage.ValidMessage<>(2));

        messages.put(new BaseMessage.ValidMessage<>(3));

        messages.put(BaseMessage.PoisonedMessage.INSTANCE);

        return null;

    }

}


public class Consumer implements Callable<Void> {


    @Nonnull

    private final BlockingQueue<BaseMessage> messages;


    private final int maxPoisons;



    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {

        this.messages = messages;

        this.maxPoisons = maxPoisons;

    }


    @Override

    public Void call() throws Exception {

        int poisonsReceived = 0;

        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {

            BaseMessage message = messages.take();

            if (message instanceof BaseMessage.ValidMessage<?> vm) {

                Integer value = (Integer) vm.getValue();

                System.out.println(value);

            } else if (message instanceof BaseMessage.PoisonedMessage) {

                ++poisonsReceived;

            } else {

                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);

            }

        }

        return null;

    }

}


查看完整回答
反对 回复 2021-10-13
  • 2 回答
  • 0 关注
  • 139 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信