为了账号安全,请及时绑定邮箱和手机立即绑定

在 Golang 中使用 BigQuery Write API

在 Golang 中使用 BigQuery Write API

Go
FFIVE 2022-10-17 17:11:51
我正在尝试使用新的Bigquery Storage API从 Golang 进行流式插入。根据此页面,我了解到此 API 取代了旧的流式插入 bigquery API。但是,文档中的示例都没有显示如何实际插入行。为了创建 AppendRowsRequest,我得到了以下结果:&storagepb.AppendRowsRequest{    WriteStream: resp.Name,    Rows: &storagepb.AppendRowsRequest_ProtoRows{        ProtoRows: &storagepb.AppendRowsRequest_ProtoData{            WriterSchema: nil, // protobuf schema??            Rows: &storagepb.ProtoRows{                SerializedRows: [][]byte{}, // serialized protocol buffer data??            },        },    },}我应该在上面的 SerializedRows 字段中输入什么数据?上面的storagepb.ProtoRows结构记录在这里。不幸的是,给出的只是协议缓冲区主概述页面的链接。谁能给我一个使用新的 Bigquery Storage API 将行从 Golang 流式传输到 bigquery 的示例?
查看完整描述

2 回答

?
波斯汪

TA贡献1811条经验 获得超4个赞

在上述答案的帮助下,我找到了一个工作示例,该示例可在 github 上找到: https ://github.com/alexflint/bigquery-storage-api-example

主要代码是:

const (

    project = "myproject"

    dataset = "mydataset"

    table   = "mytable"

    trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging

)


// the data we will stream to bigquery

var rows = []*Row{

    {Name: "John Doe", Age: 104},

    {Name: "Jane Doe", Age: 69},

    {Name: "Adam Smith", Age: 33},

}


func main() {

    ctx := context.Background()


    // create the bigquery client

    client, err := storage.NewBigQueryWriteClient(ctx)

    if err != nil {

        log.Fatal(err)

    }

    defer client.Close()


    // create the write stream

    // a COMMITTED write stream inserts data immediately into bigquery

    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{

        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),

        WriteStream: &storagepb.WriteStream{

            Type: storagepb.WriteStream_COMMITTED,

        },

    })

    if err != nil {

        log.Fatal("CreateWriteStream: ", err)

    }


    // get the stream by calling AppendRows

    stream, err := client.AppendRows(ctx)

    if err != nil {

        log.Fatal("AppendRows: ", err)

    }


    // get the protobuf descriptor for our row type

    var row Row

    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())

    if err != nil {

        log.Fatal("NormalizeDescriptor: ", err)

    }


    // serialize the rows

    var opts proto.MarshalOptions

    var data [][]byte

    for _, row := range rows {

        buf, err := opts.Marshal(row)

        if err != nil {

            log.Fatal("protobuf.Marshal: ", err)

        }

        data = append(data, buf)

    }


    // send the rows to bigquery

    err = stream.Send(&storagepb.AppendRowsRequest{

        WriteStream: resp.Name,

        TraceId:     trace, // identifies this client

        Rows: &storagepb.AppendRowsRequest_ProtoRows{

            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{

                // protocol buffer schema

                WriterSchema: &storagepb.ProtoSchema{

                    ProtoDescriptor: descriptor,

                },

                // protocol buffer data

                Rows: &storagepb.ProtoRows{

                    SerializedRows: data, // serialized protocol buffer data

                },

            },

        },

    })

    if err != nil {

        log.Fatal("AppendRows.Send: ", err)

    }


    // get the response, which will tell us whether it worked

    _, err = stream.Recv()

    if err != nil {

        log.Fatal("AppendRows.Recv: ", err)

    }


    log.Println("done")

}

上面“Row”结构的协议缓冲区定义是:


syntax = "proto3";


package tutorial;


option go_package = ".;main";


message Row {

    string Name = 1;

    int32 Age = 2;

}

您需要首先使用与协议缓冲区对应的模式创建一个 bigquery 数据集和表。请参阅上面链接的存储库中的自述文件以了解如何执行此操作。


运行上面的代码后,数据在 bigquery 中显示如下:


$ bq query 'select * from mydataset.mytable'

Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   

+------------+-----+

|    name    | age |

+------------+-----+

| John Doe   | 104 |

| Jane Doe   |  69 |

| Adam Smith |  33 |

+------------+-----+

感谢大家的帮助!


查看完整回答
反对 回复 2022-10-17
?
慕斯709654

TA贡献1840条经验 获得超5个赞

我找到了一些关于将流写入表的文档[1] [2],但我不确定这是否是您要查找的内容。请记住, storage/apiv1beta2 当前处于 beta 状态,因此可能尚未实现或缺少有关它的文档。如果我附加的文档对您没有帮助,我们可以打开一个公共问题跟踪器来正确记录或实施行流。



查看完整回答
反对 回复 2022-10-17
  • 2 回答
  • 0 关注
  • 147 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信