Golang如何处理parquet文件
这篇文章主要介绍“Golang如何处理parquet文件”,在日常操作中,相信很多人在Golang如何处理parquet文件问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Golang如何处理parquet文件”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
前言
Parquet是Apache基金会支持的项目,是面向列存储二进制文件格式。支持不同类型的压缩方式,广泛用于数据科学和大数据环境,如Hadoop生态。
创建结构体
首先创建struct,用于表示要处理的数据:
type user struct { ID string `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` FirstName string `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` LastName string `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Email string `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Phone string `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Blog string `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Username string `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Score float64 `parquet:"name=score, type=DOUBLE"` CreatedAt time.Time //wont be saved in the parquet file}
这里要提醒的是tag,用于说明struct中每个字段在生成parquet过程中如何被处理。
parquet-go包可以处理parquet数据,更多的tag可以参考其官网。
生成parquet文件
下面现给出生成parquet文件的代码,然后分别进行说明:
package mainimport ( "fmt" "log" "time" "github.com/bxcodec/faker/v3" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/writer")type user struct { ID string `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` FirstName string `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` LastName string `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Email string `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Phone string `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Blog string `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Username string `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` Score float64 `parquet:"name=score, type=DOUBLE"` CreatedAt time.Time //wont be saved in the parquet file}const recordNumber = 10000func main() { var data []*user //create fake data for i := 0; i < recordNumber; i++ { u := &user{ ID: faker.UUIDDigit(), FirstName: faker.FirstName(), LastName: faker.LastName(), Email: faker.Email(), Phone: faker.Phonenumber(), Blog: faker.URL(), Username: faker.Username(), Score: float64(i), CreatedAt: time.Now(), } data = append(data, u) } err := generateParquet(data) if err != nil { log.Fatal(err) }}func generateParquet(data []*user) error { log.Println("generating parquet file") fw, err := local.NewLocalFileWriter("output.parquet") if err != nil { return err } //parameters: writer, type of struct, size pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data))) if err != nil { return err } //compression type pw.CompressionType = parquet.CompressionCodec_GZIP defer fw.Close() for _, d := range data { if err = pw.Write(d); err != nil { return err } } if err = pw.WriteStop(); err != nil { return err } return nil}
定义结构体上面已经说明,但需要提醒的是类型与文档保持一致:
Primitive Type | Go Type |
---|---|
BOOLEAN | bool |
INT32 | int32 |
INT64 | int64 |
INT96(deprecated) | string |
FLOAT | float32 |
DOUBLE | float64 |
BYTE_ARRAY | string |
FIXED_LEN_BYTE_ARRAY | string |
接着就是使用faker包生成模拟数据。然后调用err := generateParquet(data)
方法。该方法大概逻辑为:
首先准备输出文件,然后基于本地输出文件构造pw,用于写parquet数据:
fw, err := local.NewLocalFileWriter("output.parquet") if err != nil { return err } //parameters: writer, type of struct, size pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data))) if err != nil { return err } //compression type pw.CompressionType = parquet.CompressionCodec_GZIP defer fw.Close()
然后设置压缩类型,并通过defer操作确保关闭文件。下面开始写数据:
for _, d := range data { if err = pw.Write(d); err != nil { return err } } if err = pw.WriteStop(); err != nil { return err } return nil
循环写数据,最后调用pw.WriteStop()
停止写。 成功写文件后,下面介绍如何读取parquet文件。
读取parquet文件
首先介绍如何一次性读取文件,主要用于读取较小的文件:
func readParquet() ([]*user, error) { fr, err := local.NewLocalFileReader("output.parquet") if err != nil { return nil, err } pr, err := reader.NewParquetReader(fr, new(user), recordNumber) if err != nil { return nil, err } u := make([]*user, recordNumber) if err = pr.Read(&u); err != nil { return nil, err } pr.ReadStop() fr.Close() return u, nil}
大概流程如下:首先定义本地文件,然后构造pr用于读取parquet文件:
fr, err := local.NewLocalFileReader("output.parquet") if err != nil { return nil, err } pr, err := reader.NewParquetReader(fr, new(user), recordNumber) if err != nil { return nil, err }
然后定义目标内容容器u,一次性读取数据:
u := make([]*user, recordNumber) if err = pr.Read(&u); err != nil { return nil, err } pr.ReadStop() fr.Close()
但一次性大量记录加载至内存可能有问题。这是官方文档提示:
If the parquet file is very big (even the size of parquet file is small, the uncompressed size may be very large), please don’t read all rows at one time, which may induce the OOM. You can read a small portion of the data at a time like a stream-oriented file.
大意是不要一次读取文件至内存,可能造成OOM。实际应用中应该分页读取,下面通过代码进行说明:
func readPartialParquet(pageSize, page int) ([]*user, error) {fr, err := local.NewLocalFileReader("output.parquet")if err != nil {return nil, err}defer func() {_ = fr.Close()}()pr, err := reader.NewParquetReader(fr, new(user), int64(pageSize))if err != nil {return nil, err}defer pr.ReadStop()//num := pr.GetNumRows()pr.SkipRows(int64(pageSize * page))u := make([]*user, pageSize)if err = pr.Read(&u); err != nil {return nil, err}return u, nil}
与上面函数差异不大,首先函数包括两个参数,用于指定页大小和页数,关键代码是跳过一定记录:
pr.SkipRows(int64(pageSize * page))
根据这个方法可以获得总行数,pr.GetNumRows()
,然后结合页大小计算总页数,最后循环可以实现分页查询。
计算列平均值
既然使用了Parquet列存储格式,下面演示下如何计算Score列的平均值。
func calcScoreAVG() (float64, error) { fr, err := local.NewLocalFileReader("output.parquet") if err != nil { return 0.0, err } pr, err := reader.NewParquetColumnReader(fr, recordNumber) if err != nil { return 0.0, err } num := int(pr.GetNumRows()) data, _, _, err := pr.ReadColumnByPath("parquet_go_root\u0001score", num) if err != nil { return 0.0, err } var result float64 for _, i := range data { result += i.(float64) } return (result / float64(num)), nil}
首先打开文件,然后调用pr.GetNumRows()方法获取总行数。然后基于路径指定列,其中parquet_go_root
为根路径,因为前面使用字节数组,这里分割符变为\u0001,完整路径为:parquet_go_root\u0001score
。
到此,关于“Golang如何处理parquet文件”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341