为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 RestTemplate 在 qpid 中创建队列?

如何使用 RestTemplate 在 qpid 中创建队列?

Smart猫小萌 2023-05-17 15:52:05
我正在尝试为使用 RabbitMQ 的应用程序编写集成测试,为此我正在使用 Qpid 代理。我设法启动了服务器并且我的测试正在连接到它,但我需要在启动前在 Qpid 中创建队列。因为我有大量队列,所以我动态创建了 bean:applicationContext.getBeanFactory().registerSingleton(queueName, queue);这需要在启动前创建队列。这是 qpid 配置文件:{  "name": "tst",  "modelVersion": "2.0",  "defaultVirtualHost" : "default",  "authenticationproviders" : [ {    "name" : "noPassword",    "type" : "Anonymous",    "secureOnlyMechanisms": []        },    {      "name" : "passwordFile",      "type" : "PlainPasswordFile",      "path" : "/src/test/resources/passwd.txt",      "secureOnlyMechanisms": [],      "preferencesproviders" : [{        "name": "fileSystemPreferences",        "type": "FileSystemPreferences",        "path" : "${qpid.work_dir}${file.separator}user.preferences.json"        }      ]    }   ],  "ports" : [    {      "name": "AMQP",      "port": "5673",      "authenticationProvider": "passwordFile",      "protocols": [        "AMQP_0_10",        "AMQP_0_8",        "AMQP_0_9",        "AMQP_0_9_1"      ]    }],  "virtualhostnodes" : [ {    "name" : "default",    "type" : "JSON",    "virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"  }]}从官方文档(https://qpid.apache.org/releases/qpid-broker-j-7.1.4/book/Java-Broker-Management-Channel-REST-API.html#d0e2130)我读到可以为 REST 调用创建队列,所以我尝试使用 RestTemplate 来实现这一点,但它似乎没有创建队列。    @BeforeClass    public static void startup() throws Exception {        brokerStarter = new BrokerManager();        brokerStarter.startBroker();        RestTemplate restTemplate = new RestTemplate();        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue1", "");        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue-2", "");    }有人可以解释我做错了什么吗?谢谢你!
查看完整描述

2 回答

?
人到中年有点甜

TA贡献1895条经验 获得超7个赞

我使用 REST API 解决了同样的问题。为了创建/删除用于集成测试目的的队列,我使用以下配置文件 ( qpid-config.json):


{

  "name": "EmbeddedBroker",

  "modelVersion": "8.0",

  "authenticationproviders": [

    {

      "name": "anonymous",

      "type": "Anonymous"

    }

  ],

  "ports": [

    {

      "name": "AMQP",

      "bindingAddress": "localhost",

      "port": "${qpid.amqp_port}",

      "protocols": [ "AMQP_1_0" ],

      "authenticationProvider": "anonymous",

      "virtualhostaliases" : [ {

        "name" : "nameAlias",

        "type" : "nameAlias"

      }, {

        "name" : "defaultAlias",

        "type" : "defaultAlias"

      }, {

        "name" : "hostnameAlias",

        "type" : "hostnameAlias"

      } ]

    },

    {

      "name" : "HTTP",

      "port" : "${qpid.http_port}",

      "protocols" : [ "HTTP" ],

      "authenticationProvider" : "anonymous"

    }

  ],

  "virtualhostnodes": [

    {

      "name": "default",

      "defaultVirtualHostNode": "true",

      "type": "Memory",

      "virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"

    }

  ],

  "plugins" : [

    {

      "type" : "MANAGEMENT-HTTP",

      "name" : "httpManagement"

    }

  ]

}

相关的 Gradle 依赖项:


    testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0

    testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")

    testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")

    testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")


    testImplementation("org.springframework.boot:spring-boot-starter-webflux")

    testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")

启动代理的代码(Kotlin):


    private fun startQpidBroker() {

        val attributes: MutableMap<String, Any> = HashMap()

        val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!

        attributes["type"] = "Memory"

        attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()

        attributes["startupLoggedToSystemOut"] = true

        System.setProperty("qpid.amqp_port", "$amqpPort")

        System.setProperty("qpid.http_port", "$httpPort")

        // needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues

        System.setProperty("qpid.tests.mms.messagestore.persistence", "true")

        broker.startup(attributes)

    }

删除/创建队列的代码:


    private fun recreateQueue(queueName: String) {

        val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");

        try {

            client.method(HttpMethod.DELETE)

                    .uri("/api/latest/queue/default/$queueName")

                    .retrieve()

                    .toBodilessEntity()

                    .block()

                    .statusCode

        } catch (e: WebClientResponseException) {

            if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable

                throw e

            }

        }


        client.method(HttpMethod.PUT)

                .uri("/api/latest/queue/default/default/$queueName")

                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)

                .body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))

                .retrieve()

                .toBodilessEntity()

                .block()

                .statusCode

    }


查看完整回答
反对 回复 2023-05-17
?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

我设法通过使用连接工厂解决了这个问题:


            @Autowired

            ConnectionFactory factory;


            ....

            factory.setHost("localhost");

            factory.setPort(qpid_server_port);

            try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {

                String queue = "queue-x";

                channel.queueDeclare(queue, true, false, false, null);

                //channel.queueBind(queue, "exchange-x" , "routing-key-x");


            } catch (Exception e) {

                e.printStackTrace();

            }


查看完整回答
反对 回复 2023-05-17
  • 2 回答
  • 0 关注
  • 120 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信