2 回答

TA贡献1780条经验 获得超5个赞
在花了一些时间寻找更好的解决方案后,我坚持以下几点:
package test;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
@Component
public class TestQueue {
private static final String QUEUE_NAME = "TestQueue";
private static final Logger log = LoggerFactory.getLogger(TestQueue.class);
public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(regionProvider.getRegion())
.build();
// custom QueueMessageHandler to initialize only this queue
CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
queueMessageHandler.init(this);
queueMessageHandler.afterPropertiesSet();
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
}
@SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
}
}
和习俗QueueMessageHandler:
package test;
import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
public class CustomQueueMessageHandler extends QueueMessageHandler {
public void init(Object handler) {
detectHandlerMethods(handler);
}
}
的唯一目的CustomQueueMessageHandler是传递一个应该扫描 SQS 注释的对象。由于我不使用 Spring Context 启动它,因此它不会在每个 bean 中搜索注释@SqsListener。但所有的初始化都隐藏在受保护的方法后面。这就是为什么我需要覆盖该类以访问这些 init 方法。
我不认为这是一个非常优雅的解决方案,手动创建所有 AWS 客户端内容并调用 bean init 方法。但这是我能找到的唯一仍然可以访问 AWS SQS 库的所有功能的解决方案,例如转换传入消息和通知、删除策略、队列轮询(包括故障处理)等。

TA贡献1744条经验 获得超4个赞
@Bean您可以为您希望使用 custom 的帐户声明不同的s @Qualifier。假设您在两个不同的帐户中有两个 SQS 队列。然后声明两个类型为 的 bean AmazonSQS。
@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(credentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(anotherCredentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
然后在你的服务中,你可以为@Autowired他们服务。
@Autowired @Qualifier("second") private AmazonSQS sqsSecond;
添加回答
举报