我正在使用 Apache Beam Go SDK 并且很难以正确的格式获取PCollection以按键进行分组/组合。我在 PCollection 的字符串中每个键有多个记录,如下所示:Bob, catBob, dogCarla, catCarla, bunnyDoug, horse我想使用GroupByKey和CombinePerKey,这样我就可以像这样汇总每个人的宠物:Bob, [cat, dog]Carla, [cat, bunny]Doug, [horse]如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?他们在这里提到了类似的东西,但不包括聚合字符串值的代码。我可以使用 ParDo 获取字符串键和字符串值,如下所示,但我不知道如何转换为 GroupPerKey 输入所需的 KV<string, string> 或 CoGBK<string, string> 格式。pcolOut := beam.ParDo(s, func(line string) (string, string) { cleanString := strings.TrimSpace(line) openingChar := "," iStart := strings.Index(cleanString, openingChar) key := cleanString[0:iStart] value := cleanString[iStart+1:] // How to convert to PCollection<KV<string, string>> before returning? return key, value}, pcolIn)groupedKV := beam.GroupByKey(s, pcolOut) 它失败并出现以下错误。有什么建议么?panic: inserting ParDo in scope root creating new DoFn in scope root binding fn main.main.func2 binding params [{Value string} {Value string}] to input CoGBK<string,string>values of CoGBK<string,string> cannot bind to {Value string}
1 回答
汪汪一只猫
TA贡献1898条经验 获得超8个赞
要映射到 KV,您可以应用 MapElements 并使用 into() 来设置 KV 类型,并在 via() 逻辑中创建一个新KV.of(myKey, myValue)的 ,例如,要获取一个KV<String,String>,请使用以下内容:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));
- 1 回答
- 0 关注
- 108 浏览
添加回答
举报
0/150
提交
取消