2 回答
TA贡献1876条经验 获得超7个赞
您的假设是正确的,您不会等待来自数据库的异步响应。你能做的,就是把它包装在一个像这样的未来里:
public Future<JsonObject> save() {
Future<JsonObject> future = Future.future();
MongoDBConnection.mongoClient.save("booking", query, res -> {
if(res.succeeded()) {
documentID = res.result();
if(documentID != null) {
System.out.println("MongoDB inserted successfully. + document ID is : " + documentID);
String resMsg = "A confirmed booking has been successfully created with booking id as " + documentID +
". An email has also been triggered to the shared email id " + emailID;
future.complete(new JsonObject().put("fulfillmentText", resMsg));
}else{
future.complete(new JsonObject().put("fulfillmentText",
"There is some issues while booking the shipment. Please start afreash."))
}
} else {
System.out.println("MongoDB insertion failed.");
future.fail(res.cause());
}
});
return future;
}
然后我假设你有和端点最终调用它,例如:
router.route("/book").handler(this::addBooking);
...然后,您可以调用 save 方法并根据结果提供不同的响应
public void addBooking(RoutingContext ctx){
save().setHandler(h -> {
if(h.succeeded()){
ctx.response().end(h.result());
}else{
ctx.response().setStatusCode(500).end(h.cause());
}
})
}
TA贡献1900条经验 获得超5个赞
您可以使用 RxJava 2 和反应式 Mongo 客户端 (io.vertx.reactivex.ext.mongo.MongoClient
)
下面是一个代码片段:
部署程序
public class Deployer extends AbstractVerticle {
private static final Logger logger = getLogger(Deployer.class);
@Override
public void start(Future<Void> startFuture) {
DeploymentOptions options = new DeploymentOptions().setConfig(config());
JsonObject mongoConfig = new JsonObject()
.put("connection_string",
String.format("mongodb://%s:%s@%s:%d/%s",
config().getString("mongodb.username"),
config().getString("mongodb.password"),
config().getString("mongodb.host"),
config().getInteger("mongodb.port"),
config().getString("mongodb.database.name")));
MongoClient client = MongoClient.createShared(vertx, mongoConfig);
RxHelper.deployVerticle(vertx, new BookingsStorage(client), options)
.subscribe(e -> {
logger.info("Successfully Deployed");
startFuture.complete();
}, error -> {
logger.error("Failed to Deployed", error);
startFuture.fail(error);
});
}
}
预订存储
public class BookingsStorage extends AbstractVerticle {
private MongoClient mongoClient;
public BookingsStorage(MongoClient mongoClient) {
this.mongoClient = mongoClient;
}
@Override
public void start() {
var eventBus = vertx.eventBus();
eventBus.consumer("GET_ALL_BOOKINGS_ADDRESS", this::getAllBookings);
}
private void getAllBookings(Message msg) {
mongoClient.rxFindWithOptions("GET_ALL_BOOKINGS_COLLECTION", new JsonObject(), sortByDate())
.subscribe(bookings -> {
// do something with bookings
msg.reply(bookings);
},
error -> {
fail(msg, error);
}
);
}
private void fail(Message msg, Throwable error) {
msg.fail(500, "An unexpected error occurred: " + error.getMessage());
}
private FindOptions sortByDate() {
return new FindOptions().setSort(new JsonObject().put("date", 1));
}
}
HttpRouterVerticle
// inside a router handler:
vertx.eventBus().rxSend("GET_ALL_BOOKINGS_ADDRESS", new JsonObject())
.subscribe(bookings -> {
// do something with bookings
},
e -> {
// handle error
});
添加回答
举报