我正在使用 Apache Prediction IO 开发推荐引擎。在事件服务器之前,我有一个 GO api,用于侦听来自客户和进口商的事件。在客户使用导入器的特定情况下,我收集导入的身份,并将 json 从导入器 api 发送到 GO api。例如,如果用户导入一个包含 45000 个数据的 csv,我会将这 45000 个身份以 json 格式发送到 GO api,例如{"barcodes":[...]}. 预测 IO 事件服务器需要特定形状的数据。type ItemEvent struct { Event string `json:"event"` EntityType string `json:"entityType"` EntityId string `json:"entityId"` Properties map[string][]string `json:"properties"` EventTime time.Time `json:"eventTime"`}type ItemBulkEvent struct { Event string `json:"event"` Barcodes []string `json:"barcodes"` EventTime time.Time `json:"eventTime"`}ItemEvent是我将从 GO Api 发送到事件服务器的最终数据。ItemBulkEvent是我从进口商 api 收到的数据。func HandleItemBulkEvent(w http.ResponseWriter, r *http.Request) { var itemBulk model.ItemBulkEvent err := decode(r,&itemBulk) if err != nil { log.Fatalln("handleitembulkevent -> ",err) util.RespondWithError(w,400,err.Error()) }else { var item model.ItemEvent item.EventTime = itemBulk.EventTime; item.EntityType = "item"; item.Event = itemBulk.Event itemList := make([]model.ItemEvent,0,50) for index, barcode := range itemBulk.Barcodes{ item.EntityId = barcode if (index > 0 && (index % 49) == 0){ itemList = append(itemList, item) go sendBulkItemToEventServer(w,r,itemList)HandleItemBulkEvent是批量更新的处理函数。在这一步中,我应该提到预测 io 的批量上传。通过 rest api 预测 io 事件服务器每个请求需要 50 个事件。所以我创建了一个包含 50 个上限和一个项目的列表。我使用相同的项目,只是每次都更改身份部分(条形码)并添加到列表中。在每 50 个项目中,我使用了一个处理函数,将该列表发送到事件服务器,然后清理列表,依此类推。sendBulkItemToEventServer函数编组传入的项目列表并向预测 io 的事件服务器发出发布请求。在这部分中,当我尝试使用 5000+- 项目时,它做得很好,但是当我尝试使用 45000 项目时,应用程序崩溃并出现以下错误。知道如何解决这个问题吗?
2 回答
慕盖茨4494581
TA贡献1850条经验 获得超11个赞
您的程序中有几个错误。运行时错误是因为您正在检查 if err2
is not nil,但是您正在打印err
,而不是err2
. err
为零,因此运行时错误。
这意味着err2
不是零,所以你应该看到那个错误是什么。
您提到您正在分批发送 50 条消息,但该实现是错误的。您将元素添加到itemList
,然后用 that 启动一个 goroutine itemList
,然后截断它并再次开始填充。这是一场数据竞赛,你的 goroutine 将看到itemList
处理程序正在修改的实例。无需截断,只需itemList
在向 goroutine 提交一个时创建一个新的,这样每个 goroutine 都可以拥有自己的副本。
如果您想继续使用相同的切片,您可以编组切片,然后将 JSON 消息传递给 goroutine 而不是切片。
- 2 回答
- 0 关注
- 106 浏览
添加回答
举报
0/150
提交
取消