为了账号安全,请及时绑定邮箱和手机立即绑定

如何确保在 ES API 中捕获所有数据?

如何确保在 ES API 中捕获所有数据?

红颜莎娜 2023-05-09 15:12:39
我正在尝试在 Python 中创建一个 API 以从 ES 中提取数据并将其提供给数据仓库。数据是实时的并且每秒都会被填充,所以我将创建一个近乎实时的管道。当前的 URL 格式是{{url}}/{{index}}/_search,我发送的测试负载是:{   "from" : 0,   "size" : 5}在下一次刷新时,它将使用有效负载进行拉取:{   "from" : 6,   "size" : 5}以此类推,直到达到记录总数。PROD 环境有大约 250M 行,我将大小设置为每次提取 10K。我很担心,因为我不知道这些记录是否在 ES 中被重新排序。目前,有一个使用用户生成的时间戳的插件,但它存在缺陷,因为有时由于 json 可用于在 ES 中提取的延迟以及时间的生成方式可能导致文档被跳过。有谁知道使用提取数据时的默认排序是什么/_search?
查看完整描述

2 回答

?
有只小跳蛙

TA贡献1824条经验 获得超8个赞

在与我的同事考虑后,我们决定实施和使用_ingestAPI,而不是在 ES 中创建一个管道,该管道在每个文档上插入服务器文档摄取日期。


脚步:


创建时间戳管道

PUT _ingest/pipeline/timestamp_pipeline

{

  "description" : "Inserts timestamp field for all documents",

  "processors" : [

    {

      "set" : {

        "field": "insert_date",

        "value": "{{_ingest.timestamp}}"

      }

    }

  ]

}

更新索引以添加新的默认字段

PUT /*/_settings

{

  "index" : {

    "default_pipeline": "timestamp_pipeline"

  }

}

在 Python 中,我会_scroll像这样使用 API:

    es = Elasticsearch(cfg.esUrl, port = cfg.esPort, timeout = 200)

    doc = {

      "query": {

        "range": {

          "insert_date": {

            "gte": lastRowDateOffset

          }

        }

      }

    }


    res = es.search(

        index = Index,

        sort = "insert_date:asc",

        scroll = "2m",

        size = NumberOfResultsPerPage,

        body = doc

    )

lastRowDateOffset最后一次跑步的日期在哪里


查看完整回答
反对 回复 2023-05-09
?
ITMISS

TA贡献1871条经验 获得超8个赞

我想您正在寻找的是一个流式传输/更改 API,@Val 在这里对此进行了很好的描述,还有一个开放的功能请求。

与此同时,您不能真正依赖sizefrom参数——您可能会进行冗余查询并在重复项到达您的数据仓库之前对其进行处理。

另一种选择是在这方面跳过 ES 并直接流式传输到仓库吗?我的意思是,在给定时间之前拍摄一次 ES 快照(这样您就可以保留历史数据),将其提供给仓库,然后直接从您获取数据的地方流式传输到仓库。


附录

AFAIK 默认排序是按插入日期。但是没有内部_insertTime或类似的东西。你可以使用游标——它被称为滚动,这是一个 py实现。但这是从“最新”文档到“第一个”文档,反之亦然。所以它会给你所有现有的文档,但我不太确定你滚动时新添加的文档。然后你想再次运行滚动,这是次优的。

您还可以预先排序您的索引,当结合滚动时,它应该非常适合您的用例。


查看完整回答
反对 回复 2023-05-09
  • 2 回答
  • 0 关注
  • 119 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信