OngaLog

最近GOが楽しくて、

Go からBIgQueryを扱うサンプル

Goを使ってAWSからBIgQueryにアクセスする必要があるときの処理。

package bigquery

import (
        "log"
        "reflect"
        "io/ioutil"
        "golang.org/x/oauth2"
        "golang.org/x/oauth2/google"
        "google.golang.org/api/bigquery/v2"
)

const projectId = "***"
const dataSetId = "***"
const tableName = "***"

// Type:
// STRING, BYTES, INTEGER, FLOAT, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME,

type TableModel struct {
        Name    string `column:"name" bq_type:"STRING" bq_mode:"NULLABLE"`
        Address string `column:"address" bq_type:"STRING" bq_mode:"NULLABLE"`
}

func CreateTableSchema(tableModel interface{}) bigquery.TableSchema {

        var schemas = &bigquery.TableSchema{}
        fields := []*bigquery.TableFieldSchema{}
        rt := reflect.TypeOf(tableModel)

        for i := 0; i < rt.NumField(); i++ {
                field := rt.Field(i)
                fields = append(fields,
                        &bigquery.TableFieldSchema{
                                Mode: field.Tag.Get("bq_mode"),
                                Name: field.Tag.Get("column"),
                                Type: field.Tag.Get("bq_type"),
                        },
                )
        }

        schemas.Fields = fields
        return *schemas
}

func CreateInsertRow(tableModel interface{}) []*bigquery.TableDataInsertAllRequestRows {

        rows := make([]*bigquery.TableDataInsertAllRequestRows, 0)
        row := &bigquery.TableDataInsertAllRequestRows{
                Json: make(map[string]bigquery.JsonValue, 0),
        }
        rt := reflect.TypeOf(tableModel)
        rv := reflect.ValueOf(tableModel)

        for i := 0; i < rt.NumField(); i++ {
                value := rv.Field(i).Interface()
                field := rt.Field(i)
                row.Json[field.Tag.Get("column")] = value
                rows = append(rows, row)
        }

        return rows
}

func BqSample() {

        pemKeyBytes, err := ioutil.ReadFile(<your file path>)

        if err != nil {
                panic(err)
        }

        t, err := google.JWTConfigFromJSON(pemKeyBytes, "https://www.googleapis.com/auth/bigquery")

        if err != nil {
                panic(err)
        }

        /*
            Make DataSet
        */

        client := t.Client(oauth2.NoContext)
        service, _ := bigquery.New(client)

        datasetList, _ := service.Datasets.List(projectId).All(false).Do()

        for _, dataset := range datasetList.Datasets {
                log.Print(dataset.DatasetReference.DatasetId)
        }

        /*
            DataSet Get
        */

        Dataset, _ := service.Datasets.Get(projectId, datasetId).Do()
        if Dataset != nil {
                log.Print(Dataset.DatasetReference.DatasetId)
        } else {
                log.Print("dataset not exist")
        }

        ds := bigquery.Dataset{
                DatasetReference:&bigquery.DatasetReference{
                        ProjectId:projectId,
                        DatasetId:"test_dataset",
                },
        }

        /*
            Table List
        */

        service.Datasets.Insert(projectId, &ds).Do()
        TableList, _ := service.Tables.List(projectId, dataSetId).Do()

        for _, tableListTables := range TableList.Tables {
                log.Print(tableListTables.TableReference.TableId)
        }

        /*
            Table Get
        */

        tableListTables, _ := service.Tables.Get(
                projectId,
                dataSetId,
                tableName).Do()

        if tableListTables != nil {
                log.Print(tableListTables.TableReference.TableId)
        } else {
                log.Print("table not exist")
        }

        /*
            Make Table 
        */

        schema := CreateTableSchema(TableModel{})

        table := bigquery.Table{
                TableReference: &bigquery.TableReference{
                        DatasetId: dataSetId,
                        ProjectId: projectId,
                        TableId: tableName,
                },
                Schema:&schema,
        }

        tableRes, _ := service.Tables.Insert(
                projectId,
                dataSetId,
                &table).Do()

        log.Print(tableRes)


        /*
            InsertAll 
        */

        res, _ := service.Tabledata.InsertAll(
                projectId,
                dataSetId,
                tableName,
                &bigquery.TableDataInsertAllRequest{
                        Kind: "bigquery#tableDataInsertAllRequest",
                        Rows: CreateInsertRow(TableModel{Name:"name", Address:"address"}),
                }).Do()

        log.Print(res)


        /*
            Query 
        */

        query := "SELECT address FROM [<project-name>:<dataset-name>.<table-name>] LIMIT 1000"

        result, err := service.Jobs.Query(projectId, &bigquery.QueryRequest{
                Kind:  "bigquery#queryRequest",
                Query: query,
        }).Do()

        for _, row := range result.Rows {
                log.Print(row)
                for _, cell := range row.F {
                        log.Print(cell.V)
                }
        }

}

gist

gist:113726894216fa2982a541362028a696 · GitHub