2 回答
TA贡献1824条经验 获得超8个赞
在与我的同事考虑后,我们决定实施和使用_ingestAPI,而不是在 ES 中创建一个管道,该管道在每个文档上插入服务器文档摄取日期。
脚步:
创建时间戳管道
PUT _ingest/pipeline/timestamp_pipeline
{
"description" : "Inserts timestamp field for all documents",
"processors" : [
{
"set" : {
"field": "insert_date",
"value": "{{_ingest.timestamp}}"
}
}
]
}
更新索引以添加新的默认字段
PUT /*/_settings
{
"index" : {
"default_pipeline": "timestamp_pipeline"
}
}
在 Python 中,我会_scroll像这样使用 API:
es = Elasticsearch(cfg.esUrl, port = cfg.esPort, timeout = 200)
doc = {
"query": {
"range": {
"insert_date": {
"gte": lastRowDateOffset
}
}
}
}
res = es.search(
index = Index,
sort = "insert_date:asc",
scroll = "2m",
size = NumberOfResultsPerPage,
body = doc
)
lastRowDateOffset最后一次跑步的日期在哪里
TA贡献1871条经验 获得超8个赞
我想您正在寻找的是一个流式传输/更改 API,@Val 在这里对此进行了很好的描述,还有一个开放的功能请求。
与此同时,您不能真正依赖size
和from
参数——您可能会进行冗余查询并在重复项到达您的数据仓库之前对其进行处理。
另一种选择是在这方面跳过 ES 并直接流式传输到仓库吗?我的意思是,在给定时间之前拍摄一次 ES 快照(这样您就可以保留历史数据),将其提供给仓库,然后直接从您获取数据的地方流式传输到仓库。
附录
AFAIK 默认排序是按插入日期。但是没有内部_insertTime
或类似的东西。你可以使用游标——它被称为滚动,这是一个 py实现。但这是从“最新”文档到“第一个”文档,反之亦然。所以它会给你所有现有的文档,但我不太确定你滚动时新添加的文档。然后你想再次运行滚动,这是次优的。
您还可以预先排序您的索引,当结合滚动时,它应该非常适合您的用例。
添加回答
举报