2 回答
TA贡献1895条经验 获得超7个赞
我使用 REST API 解决了同样的问题。为了创建/删除用于集成测试目的的队列,我使用以下配置文件 ( qpid-config.json):
{
"name": "EmbeddedBroker",
"modelVersion": "8.0",
"authenticationproviders": [
{
"name": "anonymous",
"type": "Anonymous"
}
],
"ports": [
{
"name": "AMQP",
"bindingAddress": "localhost",
"port": "${qpid.amqp_port}",
"protocols": [ "AMQP_1_0" ],
"authenticationProvider": "anonymous",
"virtualhostaliases" : [ {
"name" : "nameAlias",
"type" : "nameAlias"
}, {
"name" : "defaultAlias",
"type" : "defaultAlias"
}, {
"name" : "hostnameAlias",
"type" : "hostnameAlias"
} ]
},
{
"name" : "HTTP",
"port" : "${qpid.http_port}",
"protocols" : [ "HTTP" ],
"authenticationProvider" : "anonymous"
}
],
"virtualhostnodes": [
{
"name": "default",
"defaultVirtualHostNode": "true",
"type": "Memory",
"virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
}
],
"plugins" : [
{
"type" : "MANAGEMENT-HTTP",
"name" : "httpManagement"
}
]
}
相关的 Gradle 依赖项:
testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0
testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")
testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")
testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")
testImplementation("org.springframework.boot:spring-boot-starter-webflux")
testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")
启动代理的代码(Kotlin):
private fun startQpidBroker() {
val attributes: MutableMap<String, Any> = HashMap()
val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!
attributes["type"] = "Memory"
attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()
attributes["startupLoggedToSystemOut"] = true
System.setProperty("qpid.amqp_port", "$amqpPort")
System.setProperty("qpid.http_port", "$httpPort")
// needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues
System.setProperty("qpid.tests.mms.messagestore.persistence", "true")
broker.startup(attributes)
}
删除/创建队列的代码:
private fun recreateQueue(queueName: String) {
val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");
try {
client.method(HttpMethod.DELETE)
.uri("/api/latest/queue/default/$queueName")
.retrieve()
.toBodilessEntity()
.block()
.statusCode
} catch (e: WebClientResponseException) {
if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable
throw e
}
}
client.method(HttpMethod.PUT)
.uri("/api/latest/queue/default/default/$queueName")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))
.retrieve()
.toBodilessEntity()
.block()
.statusCode
}
TA贡献1789条经验 获得超10个赞
我设法通过使用连接工厂解决了这个问题:
@Autowired
ConnectionFactory factory;
....
factory.setHost("localhost");
factory.setPort(qpid_server_port);
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
String queue = "queue-x";
channel.queueDeclare(queue, true, false, false, null);
//channel.queueBind(queue, "exchange-x" , "routing-key-x");
} catch (Exception e) {
e.printStackTrace();
}
添加回答
举报