我必须使用 goelastic 库从即将到来的脉冲星中大量插入数据。但我有一个问题。首先,pulsar 每部分批量发送 1000 个数据。然后当我插入松紧带时,有时会出现问题。附上这个问题。此问题会导致数据丢失。谢谢解答... ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk[s]] would be [524374312/500mb], which is larger than the limit of [510027366/486.3mb], real usage: [524323448/500mb], new bytes reserved: [50864/49.6kb], usages [request=0/0b, fielddata=160771183/153.3mb, in_flight_requests=50864/49.6kb, model_inference=0/0b, eql_sequence=0/0b, accounting=6898128/6.5mb]这部分是批量代码。func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) { fmt.Println("------------------") bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: enum.IndexName, Client: ElasticStruct.Client, FlushBytes: 10e+6, }) if err != nil { panic(err) } start := time.Now().UTC() for _, x := range y { data, err := json.Marshal(x) if err != nil { panic(err) } err = bi.Add( context.Background(), esutil.BulkIndexerItem{ Action: "index", Body: bytes.NewReader(data), OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { i++ }, OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { log.Printf("ERROR: %s", err) } else { log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) if err != nil { log.Fatalf("Unexpected error: %s", err) } x++ } if err := bi.Close(context.Background()); err != nil { log.Fatalf("Unexpected error: %s", err) }
- 1 回答
- 0 关注
- 186 浏览
添加回答
举报
0/150
提交
取消