我有 2,000,000 条记录的集合> db.events.count(); │2000000 我使用 golang mongodb 客户端连接到数据库package mainimport ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options")func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27888").SetAuth(options.Credential{ Username: "mongoadmin", Password: "secret", })) if err != nil { panic(err) } defer func() { if err = client.Disconnect(ctx); err != nil { panic(err) } }() collection := client.Database("test").Collection("events") var bs int32 = 10000 var b = true cur, err := collection.Find(context.Background(), bson.D{}, &options.FindOptions{ BatchSize: &bs, NoCursorTimeout: &b}) if err != nil { log.Fatal(err) } defer cur.Close(ctx) s, n := runningtime("retrive db from mongo and publish to kafka") count := 0 for cur.Next(ctx) { var result bson.M err := cur.Decode(&result) if err != nil { log.Fatal(err) } bytes, err := json.Marshal(result) if err != nil { log.Fatal(err) } count++ msg := &sarama.ProducerMessage{ Topic: "hello", // Key: sarama.StringEncoder("aKey"), Value: sarama.ByteEncoder(bytes), } asyncProducer.Input() <- msg }但是该程序仅检索大约 600,000 条记录,而不是每次运行该程序时的 2,000,000 条。$ go run main.godonecount = 605426nErrors = 02020/09/18 11:23:43 End: retrive db from mongo and publish to kafka took 10.080603336s我不知道为什么?我想检索所有 2,000,000 条记录。谢谢你的帮助。
1 回答
慕娘9325324
TA贡献1783条经验 获得超4个赞
您获取结果的循环可能会提前结束,因为您使用相同的ctx上下文来迭代具有 10 秒超时的结果。
这意味着如果检索和处理 200 万条记录(包括连接)时间超过 10 秒,上下文将被取消,因此游标也会报告错误。
请注意,设置FindOptions.NoCursorTimeout为true只是为了防止光标因不活动而超时,它不会覆盖使用的上下文的超时。
使用另一个上下文来执行查询并迭代结果,一个没有超时的上下文,例如context.Background().
另请注意,要构造 的选项find,请使用辅助方法,因此它可能看起来像这样简单而优雅:
options.Find().SetBatchSize(10000).SetNoCursorTimeout(true)
所以工作代码:
ctx2 := context.Background()
cur, err := collection.Find(ctx2, bson.D{},
options.Find().SetBatchSize(10000).SetNoCursorTimeout(true))
// ...
for cur.Next(ctx2) {
// ...
}
// Also check error after the loop:
if err := cur.Err(); err != nil {
log.Printf("Iterating over results failed: %v", err)
}
- 1 回答
- 0 关注
- 134 浏览
添加回答
举报
0/150
提交
取消