为了账号安全,请及时绑定邮箱和手机立即绑定

PyMongo 游标batch_size

PyMongo 游标batch_size

婷婷同学_ 2021-11-16 09:35:09
使用 PyMongo 3.7.2,我尝试通过在 MongoDB 游标上使用 batch_size 来分块读取集合,如here所述。基本思想是在集合对象上使用find()方法,以batch_size为参数。但是无论我尝试什么,游标总是返回我集合中的所有文档。我的代码的一个基本片段如下所示(该集合有超过 10K 的文档):import pymongo as pmclient = pm.MongoClient()coll = client.get_database('db').get_collection('coll')cur = coll.find({}, batch_size=500)但是,游标总是立即返回完整的集合大小。我正在按照文档中的描述使用它。有谁知道我如何正确地批量迭代集合?有多种方法可以遍历 find() 方法的输出,但这仍会首先获取完整集合,并且只会遍历内存中已拉取的文档。batch_size 参数应该每次都获取一个批次并往返于服务器,以节省内存空间。
查看完整描述

2 回答

?
Cats萌萌

TA贡献1805条经验 获得超9个赞

Pymongo 为Cursor类提供了一些生活质量助手,因此它会自动为您进行批处理,并将结果以文档形式返回给您。


该batch_size设定完成,但这个想法是,你只需要设置它的find()方法,而不必做手工低水平电话或通过批次迭代。


例如,如果我的集合中有 100 个文档:


> db.test.count()

100

然后我设置分析级别以记录所有查询:


> db.setProfilingLevel(0,-1)

{

  "was": 0,

  "slowms": 100,

  "sampleRate": 1,

  "ok": 1,

...

然后我使用 pymongo 指定batch_size10:


import pymongo

import bson


conn = pymongo.MongoClient()

cur = conn.test.test.find({}, {'txt':0}, batch_size=10)

print(list(cur))

运行该查询,我在 MongoDB 日志中看到:


2019-02-22T15:03:54.522+1100 I COMMAND  [conn702] command test.test command: find { find: "test", filter: {} ....

2019-02-22T15:03:54.523+1100 I COMMAND  [conn702] command test.test command: getMore { getMore: 266777378048, collection: "test", batchSize: 10, .... 

(getMore repeated 9 more times)

所以查询是以指定的批次从服务器获取的。它只是通过Cursor课程对您隐藏。


编辑


如果真的需要批量获取文档,find_raw_batches()Collection下面有一个功能(doc link)。此方法的工作方式find()与此类似,并接受相同的参数。但是请注意,它将返回需要由应用程序在单独的步骤中解码的原始 BSON。值得注意的是,此方法不支持session。


话虽如此,如果目标是降低应用程序的内存使用量,则值得考虑修改查询,使其使用范围。例如:


find({'$gte': <some criteria>, '$lte': <some other criteria>})

范围查询更容易优化,可以使用索引,并且(在我看来)更容易调试和更容易在查询中断时重新启动。这在使用批处理时不太灵活,您必须从头开始重新启动查询,如果它被中断,则再次检查所有批处理。


查看完整回答
反对 回复 2021-11-16
?
不负相思意

TA贡献1777条经验 获得超10个赞

我就是这样做的,它有助于将数据分块,但我认为会有更直接的方法来做到这一点。我创建了一个 yield_rows 函数,它可以让您生成和生成块,它确保删除使用的块。


import pymongo as pm


CHUNK_SIZE = 500

client = pm.MongoClient()

coll = client.get_database('db').get_collection('coll')

cursor = coll.find({}, batch_size=CHUNK_SIZE)


def yield_rows(cursor, chunk_size):

    """

    Generator to yield chunks from cursor

    :param cursor:

    :param chunk_size:

    :return:

    """

    chunk = []

    for i, row in enumerate(cursor):

        if i % chunk_size == 0 and i > 0:

            yield chunk

            del chunk[:]

        chunk.append(row)

    yield chunk


chunks = yield_rows(cursor, CHUNK_SIZE)

for chunk in chunks:

    # do processing here

    pass

如果我找到一种更清洁、更有效的方法来做到这一点,我会更新我的答案。


查看完整回答
反对 回复 2021-11-16
  • 2 回答
  • 0 关注
  • 461 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信