为了账号安全,请及时绑定邮箱和手机立即绑定

Spring Integration 任务执行器在测试过程中大约 1000 毫秒后毫无警告地终止

Spring Integration 任务执行器在测试过程中大约 1000 毫秒后毫无警告地终止

猛跑小猪 2023-10-19 21:04:22
我使用 Spring Integration 构建以下流程:输入通道 -> 拆分器 -> 转换器 -> 服务激活器 -> 聚合器Transformer 和 Service Activator 使用任务执行器链接并执行。在应用程序执行期间,没有任何问题。但是,当我尝试运行单元测试时,如果存在长时间运行的任务,则执行服务激活器的执行程序线程会神秘退出。为了演示这一点,我创建了一个具有以下配置的示例项目:<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/><jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"    channel="helloWorldChannel"/>    <int:channel id="helloWorldChannel"/><int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">    <bean id="stringSplitter" class="hello.Splitter"></bean></int:splitter><int:channel id="execChannel">    <int:dispatcher task-executor="executor"></int:dispatcher></int:channel><int:chain input-channel="execChannel" output-channel="aggregatorChannel">    <int:transformer>        <bean id="stringTransformer" class="hello.Transformer"></bean>    </int:transformer>    <int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/></int:chain><int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">    <bean class="hello.ResponseAggregator"/></int:aggregator>这是 Splitter 类:public class Splitter {public List<String> splitMessage(Message message)  {    String msg = message.getPayload().toString();    return Arrays.asList(msg.split(","));}}这是 Transformer 类:public class Transformer {public String transform(Message message)  {    String msg = message.getPayload().toString();    return msg+"t";}}为了模拟长时间运行的任务,我Thread.sleep()在 processMsg 方法中添加了一个。
查看完整描述

1 回答

?
呼啦一阵风

TA贡献1802条经验 获得超6个赞

您的问题是您错过了一个事实,即您的应用程序是一个async,但您的测试与处理多线程解决方案无关。


您在测试方法中发送一条消息,并且不执行任何操作来等待输出消息。因此,启动测试执行的主线程就存在,而将其他线程中的所有执行抛在后面。


您的想法是发送到而helloWorldChannel不是处理 JMS 目的地是一个不错的选择。唯一的问题是聚合后不等待流结果。


将端点的输出放入 中也很奇怪errorChannel,但您可以在生成消息之前从测试用例中订阅它:


@Autowired

private SubscribableChannel errorChannel;



@Test

public void test() {

    SettableListenableFuture<Message<?>> messageFuture = new SettableListenableFuture<>();

    this.errorChannel.subscribe((message) -> messageFuture.set(message));

    helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());

    Message<?> messageToAssert = messageFuture.get(10, TimeUnit.SECONDS);

    ...

}

这样,您的主 JUnit 线程将等待该 中的结果,而与流行为无关Future。


查看完整回答
反对 回复 2023-10-19
  • 1 回答
  • 0 关注
  • 103 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信