Apache Beam 从 Go 中的 PCollection 中选择前 N 行
Apache Beam 是一个开源的分布式数据处理框架,它提供了一种统一的编程模型,可以在不同的批处理和流处理引擎上运行。最近,Apache Beam 的 Go SDK 中新增了一个非常有用的功能——从 PCollection 中选择前 N 行。这个功能对于需要对大型数据集进行采样或者快速预览的场景非常有帮助。在本文中,我们将介绍如何在 Apache Beam 的 Go SDK 中使用这个功能,并展示一些实际的示例代码。让我们开始吧!
问题内容
我有一个 pcollection,我需要从中选择 n 个最大的行。我正在尝试使用 go 创建一个数据流管道并陷入困境。
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
type user struct {
name string
age int
}
func printrow(ctx context.context, list user) {
fmt.println(list)
}
func main() {
flag.parse()
beam.init()
ctx := context.background()
p := beam.newpipeline()
s := p.root()
var userlist = []user{
{"bob", 5},
{"adam", 8},
{"john", 3},
{"ben", 1},
{"jose", 1},
{"bryan", 1},
{"kim", 1},
{"tim", 1},
}
initial := beam.createlist(s, userlist)
pc2 := beam.pardo(s, func(row user, emit func(user)) {
emit(row)
}, initial)
beam.pardo0(s, printrow, pc2)
if err := beamx.run(ctx, p); err != nil {
log.exitf(ctx, "failed to execute job: %v", err)
}
}
从上面的代码中,我需要根据 user.age 选择前 5 行 我发现链接顶部包具有相同的功能,但它说它返回单个元素 pcollection。有什么不同?
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.RegisterFunction(less)
}
type User struct {
Name string
Age int
}
func printRow(ctx context.Context, list User) {
fmt.Println(list)
}
func less(a, b User) bool {
return a.Age < b.Age
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []User{
{"Bob", 5},
{"Adam", 8},
{"John", 3},
{"Ben", 1},
{"Jose", 1},
{"Bryan", 1},
{"Kim", 1},
{"Tim", 1},
}
initial := beam.CreateList(s, userList)
best := top.Largest(s, initial, 5, less)
pc2 := beam.ParDo(s, func(row User, emit func(User)) {
emit(row)
}, best)
beam.ParDo0(s, printRow, pc2)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
我像上面一样添加了选择前 5 行的函数,但出现错误 []main.user is not allocate to main.user
我需要与以前相同格式的 pcollection,因为我需要进一步处理。我怀疑这是因为 top.largest 函数返回单个元素 pcollection。关于如何转换格式有什么想法吗?
解决方法
最好的 pcollection 是 []user
所以尝试一下...
pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
for _, row := range rows {
emit(row)
}
}, best)
以上就是Apache Beam 从 Go 中的 PCollection 中选择前 N 行的详细内容,更多请关注编程网其它相关文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341