为了账号安全,请及时绑定邮箱和手机立即绑定

每个异步线程都运行一个无限循环

每个异步线程都运行一个无限循环

交互式爱情 2021-10-20 15:53:15
我正在实施一个包含不同任务的程序,并且都实现了 Runnable。例如,有一个任务在数据库上工作并将一些元组发送到同步共享内存,随后,有另一个线程检查共享内存并将消息发送到队列。此外,这两个线程在无限 while 循环中进行迭代。我已经使用了 fixedThreadPool 来执行这些线程。问题在于,有时程序控制权仍保留在第一个正在运行的线程中,而第二个线程永远没有机会进入其运行状态。这是我的类似示例代码:public class A implements Runnable {    @Override    public void run() {        while(true) {        //do something        }    }}public class B implements Runnable {    @Override    public void run() {        while(true) {        //do something        }    }}public class Driver {    public static void main(String[] args) {        ExecutorService executorService = Executors.newFixedThreadPool(2);        A a = new A();        executorService.execute(a);        B b = new B();        executorService.execute(b);    }}我还做了一些棘手的事情,让第一个线程在运行一小段时间后休眠一次。结果,它使第二个线程找到了运行的机会。但是对于这个问题,有没有什么格式良好的解决方案呢?您认为问题出在哪里?
查看完整描述

2 回答

?
汪汪一只猫

TA贡献1898条经验 获得超8个赞

这是生产者/消费者模式的一个很好的例子。有很多方法可以实现这一点。这是使用wait/notify模式的一个简单实现。


public class A implements Runnable {

    private Queue<Integer> queue;

    private int maxSize;


    public A(Queue<Integer> queue, int maxSize) {

        super();

        this.queue = queue;

        this.maxSize = maxSize;

    }


    @Override

    public void run() {

        while (true) {

            synchronized (queue) {

                while (queue.size() == maxSize) {

                    try {

                        System.out.println("Queue is full, " + "Producer thread waiting for "

                                + "consumer to take something from queue");

                        queue.wait();

                    } catch (Exception ex) {

                        ex.printStackTrace();

                    }

                }


                Random random = new Random();

                int i = random.nextInt();

                System.out.println("Producing value : " + i);

                queue.add(i);

                queue.notifyAll();

            }


        }

    }


}


public class B implements Runnable {

    private Queue<Integer> queue;


    public B(Queue<Integer> queue) {

        super();

        this.queue = queue;

    }


    @Override

    public void run() {

        while (true) {

            synchronized (queue) {

                while (queue.isEmpty()) {

                    System.out.println("Queue is empty," + "Consumer thread is waiting"

                            + " for producer thread to put something in queue");

                    try {

                        queue.wait();

                    } catch (Exception ex) {

                        ex.printStackTrace();

                    }


                }

                System.out.println("Consuming value : " + queue.remove());

                queue.notifyAll();

            }


        }


    }


}

这里很热,我们设置了一些东西。


public class ProducerConsumerTest {


    public static void main(String[] args) {

        Queue<Integer> buffer = new LinkedList<>();

        int maxSize = 10;


        Thread producer = new Thread(new A(buffer, maxSize));

        Thread consumer = new Thread(new B(buffer));


        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(producer);

        executorService.submit(consumer);

    }


}

在这种情况下,它Queue充当共享内存。您可以将其替换为适合您需要的任何其他数据结构。这里的诀窍是你必须仔细地在线程之间进行协调。这就是你上面的实现所缺乏的。


查看完整回答
反对 回复 2021-10-20
?
天涯尽头无女友

TA贡献1831条经验 获得超9个赞

我知道这听起来可能很激进,但是异步代码库的非框架部分应该尽量避免while(true)手动编码循环,而是将其建模为(可能是自我重新调度的)回调到执行程序中


这允许更公平的资源利用和最重要的每次迭代监控工具。


当代码不等待临界(或只是原型时),最简单的方法是做到这一点的Executors,可能CompletableFuture秒。


class Participant implements Runnable {

    final Executor context;


    @Override

    public void run() {

        final Item work = workSource.next();

        if (workSource.hasNext()) {

            context.execute(this::run);

        }

    }

}


查看完整回答
反对 回复 2021-10-20
  • 2 回答
  • 0 关注
  • 194 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信