为了账号安全,请及时绑定邮箱和手机立即绑定

如何在Java中使Vertx MongoClient操作同步但不阻塞事件循环?

如何在Java中使Vertx MongoClient操作同步但不阻塞事件循环?

桃花长相依 2022-09-01 16:29:13
我正在尝试使用Vertx MongoClient将新文档保存到MongoDB,如下所示:MongoDBConnection.mongoClient.save("booking", query, res -> {    if(res.succeeded()) {        documentID = res.result();        System.out.println("MongoDB inserted successfully. + document ID is : " + documentID);    }    else {        System.out.println("MongoDB insertion failed.");    }});if(documentID != null) {    // MongoDB document insertion successful. Reply with a booking ID    String resMsg = "A confirmed booking has been successfully created with booking id as " + documentID +         ". An email has also been triggered to the shared email id " + emailID;    documentID = null;    return new JsonObject().put("fulfillmentText", resMsg);}else {    // return intent response    documentID = null;    return new JsonObject().put("fulfillmentText",         "There is some issues while booking the shipment. Please start afreash.");}上面的代码成功地将查询jsonObject写入MongoDB集合。但是,包含此代码的函数始终以 返回 。bookingThere is some issues while booking the shipment. Please start afreash发生这种情况可能是因为MongoClient处理程序“res”是异步的。但是,我想返回基于成功操作和失败保存操作的条件响应。save()save()如何在Vertx Java中实现它?
查看完整描述

2 回答

?
幕布斯6054654

TA贡献1876条经验 获得超7个赞

您的假设是正确的,您不会等待来自数据库的异步响应。你能做的,就是把它包装在一个像这样的未来里:


  public Future<JsonObject> save() {

    Future<JsonObject> future = Future.future();

    MongoDBConnection.mongoClient.save("booking", query, res -> {

      if(res.succeeded()) {

        documentID = res.result();

        if(documentID != null) {

          System.out.println("MongoDB inserted successfully. + document ID is : " + documentID);

          String resMsg = "A confirmed booking has been successfully created with booking id as " + documentID +

            ". An email has also been triggered to the shared email id " + emailID;

          future.complete(new JsonObject().put("fulfillmentText", resMsg));

        }else{

          future.complete(new JsonObject().put("fulfillmentText",

            "There is some issues while booking the shipment. Please start afreash."))

        }

      } else {

        System.out.println("MongoDB insertion failed.");

        future.fail(res.cause());

      }

    });

    return future;

  }

然后我假设你有和端点最终调用它,例如:


router.route("/book").handler(this::addBooking);

...然后,您可以调用 save 方法并根据结果提供不同的响应


public void addBooking(RoutingContext ctx){

    save().setHandler(h -> {

        if(h.succeeded()){

            ctx.response().end(h.result());

        }else{

            ctx.response().setStatusCode(500).end(h.cause());

        }

    })

}


查看完整回答
反对 回复 2022-09-01
?
梵蒂冈之花

TA贡献1900条经验 获得超5个赞

您可以使用 RxJava 2 和反应式 Mongo 客户端 (io.vertx.reactivex.ext.mongo.MongoClient)

下面是一个代码片段:

部署程序

public class Deployer extends AbstractVerticle {


   private static final Logger logger = getLogger(Deployer.class);


   @Override

   public void start(Future<Void> startFuture) {

      DeploymentOptions options = new DeploymentOptions().setConfig(config());


      JsonObject mongoConfig = new JsonObject()

            .put("connection_string",

                  String.format("mongodb://%s:%s@%s:%d/%s",

                        config().getString("mongodb.username"),

                        config().getString("mongodb.password"),

                        config().getString("mongodb.host"),

                        config().getInteger("mongodb.port"),

                        config().getString("mongodb.database.name")));


      MongoClient client = MongoClient.createShared(vertx, mongoConfig);


      RxHelper.deployVerticle(vertx, new BookingsStorage(client), options)

            .subscribe(e -> {

               logger.info("Successfully Deployed");

               startFuture.complete();

            }, error -> {

               logger.error("Failed to Deployed", error);

               startFuture.fail(error);

            });

   }

}

预订存储


public class BookingsStorage extends AbstractVerticle {


   private MongoClient mongoClient;


   public BookingsStorage(MongoClient mongoClient) {

      this.mongoClient = mongoClient;

   }


   @Override

   public void start() {

      var eventBus = vertx.eventBus();

      eventBus.consumer("GET_ALL_BOOKINGS_ADDRESS", this::getAllBookings);

   }


   private void getAllBookings(Message msg) {

      mongoClient.rxFindWithOptions("GET_ALL_BOOKINGS_COLLECTION", new JsonObject(), sortByDate())

            .subscribe(bookings -> {

                     // do something with bookings

                     msg.reply(bookings);

                  },

                  error -> {

                     fail(msg, error);

                  }

            );

   }


   private void fail(Message msg, Throwable error) {

      msg.fail(500, "An unexpected error occurred: " + error.getMessage());

   }


   private FindOptions sortByDate() {

      return new FindOptions().setSort(new JsonObject().put("date", 1));

   }

}

HttpRouterVerticle


// inside a router handler: 


 vertx.eventBus().rxSend("GET_ALL_BOOKINGS_ADDRESS", new JsonObject())

                 .subscribe(bookings -> {

                     // do something with bookings

                  },

                    e -> {

                      // handle error

                 }); 


查看完整回答
反对 回复 2022-09-01
  • 2 回答
  • 0 关注
  • 98 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信