为了账号安全,请及时绑定邮箱和手机立即绑定

如何模拟 KafkaTemplate 的结果

如何模拟 KafkaTemplate 的结果

Qyouu 2023-06-04 17:38:00
我有一种发送 kafka 消息的方法是这样的:@Asyncpublic void sendMessage(String topicName, Message message) {    ListenableFuture<SendResult<String, Message >> future = kafkaTemplate.send(topicName, message);    future.addCallback(new ListenableFutureCallback<>() {        @Override        public void onSuccess(SendResult<String, Message > result) {            //do nothing        }        @Override        public void onFailure(Throwable ex) {            log.error("something wrong happened"!);        }    });}现在我正在为它编写单元测试。我还想测试这两个回调方法onSuccess和onFailure方法,所以我的想法是模拟 KafkaTemplate,例如:KafkaTemplate kafkaTemplate = Mockito.mock(KafkaTemplate.class);但是现在我陷入了这两种情况的模拟结果:when(kafkaTemplate.send(anyString(), any(Message.class))).thenReturn(????);thenReturn我应该在案例成功和案例失败的方法中输入什么?有人有想法吗?非常感谢!
查看完整描述

1 回答

?
holdtom

TA贡献1805条经验 获得超10个赞

您可以模拟模板,但最好模拟界面。


    Sender sender = new Sender();

    KafkaOperations template = mock(KafkaOperations.class);

    SettableListenableFuture<SendResult<String, String>> future = new SettableListenableFuture<>();

    when(template.send(anyString(), any(Message.class))).thenReturn(future);

    sender.setTemplate(template);

    sender.send(...);


    future.set(new SendResult<>(...));

    

    ...or...


    future.setException(...

编辑


更新为CompletableFuture(Apache Kafka 3.0.x 及更高版本的 Spring)...


public class Sender {


    private  KafkaOperations<String, String> template;


    public void setTemplate(KafkaOperations<String, String> template) {

        this.template = template;

    }


    public void send(String topic, Message<?> data) {

        CompletableFuture<SendResult<String, String>> future = this.template.send(data);

        future.whenComplete((result, ex) -> {

            if (ex == null) {

                System.out.println(result);

            }

            else {

                System.out.println(ex.getClass().getSimpleName() + "(" + ex.getMessage() + ")");

            }

        });

    }


}


@ExtendWith(OutputCaptureExtension.class)

public class So57475464ApplicationTests {


    @Test

    public void test(CapturedOutput captureOutput) {

        Message message = new GenericMessage<>("foo");

        Sender sender = new Sender();

        KafkaOperations template = mock(KafkaOperations.class);

        CompletableFuture<SendResult<String, String>> future = new CompletableFuture<>();

        given(template.send(any(Message.class))).willReturn(future);

        sender.setTemplate(template);

        sender.send("foo", message);

        future.completeExceptionally(new RuntimeException("foo"));

        assertThat(captureOutput).contains("RuntimeException(foo)");

    }


}


查看完整回答
反对 回复 2023-06-04
  • 1 回答
  • 0 关注
  • 165 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信