【告知】値の参照時の仕様変更のお知らせ
このたび2024年11月11日に値の参照に関する仕様変更を予定しておりますので詳細について報告いたします。
more
2023.02.16に公開 | 2023.02.16に更新
Querier運営
「Querier(クエリア)」は社内向け管理画面を圧倒的な速さで、かつビジネスのスケールに合わせて柔軟に構築することができるローコードツールです。
管理画面の構築もWeb上で完結
エンジニアのためのローコードツール
みなさんこんにちは。今回はCloud Pub/SubのBigQueryサブスクリプションを使って、ユーザーの行動ログをリアルタイムに収集する方法をご紹介します。
昨年夏ごろにCloud Pub/Subの機能として追加された機能です。今までBigQueryにデータを収集しようとしたときに、純粋なBigQuery APIのインサートを使うとコストが嵩んでしまう問題があり、それを回避するためにCloud Pub/SubからDataflowを経由してBigQueryへストリーミングする方法がよく利用されてきました。
そこで、今回Cloud Pub/Subの機能としてBigQueryサブスクリプションが追加され、Dataflowを経由することなくBigQueryへデータを投入することが可能になりました。
クエリアはGoで書かれたGraphQL APIをクライアントへ提供していますので、Cloud Pub/SubへのパブリッシュなどはGoを使って行います。
全体の流れは以下のようになります。
今回は以下のスキーマでBigQueryのテーブルを作成します。
BigQueryのデータセット、テーブルの作成については特別なことはしていませんので割愛します。
[
{
"name": "id",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "email",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "os_version",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "os_name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "browser_name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "browser_version",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "timestamp",
"type": "TIMESTAMP",
"mode": "NULLABLE"
},
{
"name": "params",
"type": "JSON",
"mode": "NULLABLE"
}
]
コンソール内のPub/Subタブから、スキーマを作成します。任意のスキーマIDを指定して、スキーマタイプをAvroとし、以下のAvroスキーマで作成します。
{
"type": "record",
"name": "ClientEvent",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "browser_name",
"type": [
"null",
"string"
]
},
{
"name": "browser_version",
"type": [
"null",
"string"
]
},
{
"name": "os_name",
"type": [
"null",
"string"
]
},
{
"name": "os_version",
"type": [
"null",
"string"
]
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "params",
"type": [
{
"type": "string",
"sqlType": "JSON"
},
"null"
]
}
]
}
次に、スキーマに関連させる形でトピックを作成します。ここで「メッセージエンコードの選択」を「バイナリ」にします。
最後に作成されたサブスクリプションを編集します。デフォルトでは配信タイプが「pull」となっているので、「BigQueryへの書き込み」に変更します。
先程作成したBigQueryのデータセットとテーブルを指定し、「トピックスキーマを使用する」「不明な項目を削除する」にチェックを入れて更新をします。
まずはBigQueryのテーブルに沿う形で構造体を定義し、Avro形式のバイナリに変換する関数を作っていきます。
Avro形式への変換はgoavroというライブラリを使って行っています。
package main
import (
"encoding/json"
"time"
"github.com/linkedin/goavro/v2"
)
type ClientEvent struct {
ID string
Email string
OSName *string
OSVersion *string
BrowserName *string
BrowserVersion *string
Params map[string]any
Timestamp time.Time
}
func (e *ClientEvent) AvroBinary() ([]byte, error) {
var schema = `{
"type": "record",
"name": "ClientEvent",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "browser_name",
"type": [
"null",
"string"
]
},
{
"name": "browser_version",
"type": [
"null",
"string"
]
},
{
"name": "os_name",
"type": [
"null",
"string"
]
},
{
"name": "os_version",
"type": [
"null",
"string"
]
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "params",
"type": [
{
"type": "string",
"sqlType": "JSON"
},
"null"
]
}
]
}`
c, err := goavro.NewCodec(schema)
if err != nil {
return nil, err
}
n := map[string]any{
"id": e.ID,
"email": e.Email,
"timestamp": e.Timestamp,
}
if e.Params != nil {
paramsBytes, err := json.Marshal(e.Params)
if err != nil {
return nil, err
}
n["params"] = map[string]any{"string": string(paramsBytes)}
}
if e.BrowserName != nil {
n["browser_name"] = map[string]any{"string": *e.BrowserName}
}
if e.BrowserVersion != nil {
n["browser_version"] = map[string]any{"string": *e.BrowserVersion}
}
if e.OSName != nil {
n["os_name"] = map[string]any{"string": *e.OSName}
}
if e.OSVersion != nil {
n["os_version"] = map[string]any{"string": *e.OSVersion}
}
return c.BinaryFromNative(nil, n)
}
最後に実際にパブリッシュしていきます。
package main
import (
"context"
"log"
"cloud.google.com/go/pubsub"
)
func main() {
var ce *ClientEvent
// 取得したパラメータをClientEventに詰める
b, err := ce.AvroBinary()
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
client := pubsub.NewClient(ctx, "project-id")
topic := client.Topic("topic-name")
defer topic.Stop()
topic.Publish(ctx, &pubsub.Message{
Data: b,
})
}
今回はCloud Pub/SubのBigQueryサブスクリプションを使ってBigQueryにユーザーの行動ログを収集する方法を解説していきました。以前までのDataflowなどを使った方法よりもシンプルに実装できるので、ぜひ試してみてください。
また、クエリアはBigQueryや様々なデータベース、APIと連携して、社内向けの管理画面やツールを構築できるローコードツールの『Querier』を開発しています。無料トライアルも実施しておりますので、もし気になる方がいればサイトを覗いてみてください。
Querier運営
「Querier(クエリア)」は社内向け管理画面を圧倒的な速さで、かつビジネスのスケールに合わせて柔軟に構築することができるローコードツールです。
このたび2024年11月11日に値の参照に関する仕様変更を予定しておりますので詳細について報告いたします。
more
データフローの通知設定機能・監査ログへのパラメータが追加されましたのでご紹介します。
more
データフローのアクションに永続化などに利用できるローカルストレージ機能を追加しました。
more
日本を健康に。多彩なフィットネスブランドを展開中。スタジオ付きの「JOYFIT」、24時間型の「JOYFIT24」、ヨガスタジオ「JOYFIT YOGA」、パーソナルジム「JOYFIT+」、家族向けの「FIT365」など、多彩なブランド展開で全国を席巻しているスポーツ事業。
more
管理画面の構築もWeb上で完結
エンジニアのためのローコードツール