为了账号安全,请及时绑定邮箱和手机立即绑定

Go elasticsearch从传入的脉冲星插入批量数据

Go elasticsearch从传入的脉冲星插入批量数据

Go
汪汪一只猫 2023-02-14 18:19:54
我必须使用 goelastic 库从即将到来的脉冲星中大量插入数据。但我有一个问题。首先,pulsar 每部分批量发送 1000 个数据。然后当我插入松紧带时,有时会出现问题。附上这个问题。此问题会导致数据丢失。谢谢解答... ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk[s]] would be [524374312/500mb], which is larger than the limit of [510027366/486.3mb], real usage: [524323448/500mb], new bytes reserved: [50864/49.6kb], usages [request=0/0b, fielddata=160771183/153.3mb, in_flight_requests=50864/49.6kb, model_inference=0/0b, eql_sequence=0/0b, accounting=6898128/6.5mb]这部分是批量代码。func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {    fmt.Println("------------------")    bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{        Index:      enum.IndexName,        Client:     ElasticStruct.Client,        FlushBytes: 10e+6,    })    if err != nil {        panic(err)    }    start := time.Now().UTC()    for _, x := range y {        data, err := json.Marshal(x)        if err != nil {            panic(err)        }        err = bi.Add(            context.Background(),            esutil.BulkIndexerItem{                Action: "index",                Body: bytes.NewReader(data),                OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {                    i++                },                OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {                    if err != nil {                        log.Printf("ERROR: %s", err)                    } else {                        log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)                    }                },            },        )        if err != nil {            log.Fatalf("Unexpected error: %s", err)        }        x++    }    if err := bi.Close(context.Background()); err != nil {        log.Fatalf("Unexpected error: %s", err)    }    
查看完整描述

1 回答

?
紫衣仙女

TA贡献1839条经验 获得超15个赞

Tldr;

您的节点上似乎没有足够的 JVM 堆。

您正在使用断路器以避免 Elasticsearch 内存不足 (OOM)。

解决方案

  • 增加 JVM 内存,您会在此处找到一些文档来调整节点大小。

  • 较小的批量请求


查看完整回答
反对 回复 2023-02-14
  • 1 回答
  • 0 关注
  • 186 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信