Querier

Cloud Pub/SubのBigQueryサブスクリプションを使ってBigQueryにユーザーの行動ログをリアルタイムに収集する

2023.02.16に公開 | 2023.02.16に更新

Querier運営

@querier_io@querierinc

「Querier(クエリア)」は社内向け管理画面を圧倒的な速さで、かつビジネスのスケールに合わせて柔軟に構築することができるローコードツールです。

管理画面の構築もWeb上で完結
エンジニアのためのローコードツール

Querierについて詳しく見る

みなさんこんにちは。今回はCloud Pub/SubのBigQueryサブスクリプションを使って、ユーザーの行動ログをリアルタイムに収集する方法をご紹介します。

BigQueryサブスクリプションとは

昨年夏ごろにCloud Pub/Subの機能として追加された機能です。今までBigQueryにデータを収集しようとしたときに、純粋なBigQuery APIのインサートを使うとコストが嵩んでしまう問題があり、それを回避するためにCloud Pub/SubからDataflowを経由してBigQueryへストリーミングする方法がよく利用されてきました。

そこで、今回Cloud Pub/Subの機能としてBigQueryサブスクリプションが追加され、Dataflowを経由することなくBigQueryへデータを投入することが可能になりました。

全体の流れ

クエリアはGoで書かれたGraphQL APIをクライアントへ提供していますので、Cloud Pub/SubへのパブリッシュなどはGoを使って行います。
全体の流れは以下のようになります。

  1. ログのリクエストをAvro形式に変換
  2. 変換されたログをCloud Pub/Subへパブリッシュ
  3. Avro形式のバイナリデータがBigQueryのスキーマにマッピングされてインサートされる(BigQueryサブスクリプション)


BigQueryのセットアップ

今回は以下のスキーマでBigQueryのテーブルを作成します。
BigQueryのデータセット、テーブルの作成については特別なことはしていませんので割愛します。

[
    {
        "name": "id",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "email",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "os_version",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "os_name",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "browser_name",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "browser_version",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "timestamp",
        "type": "TIMESTAMP",
        "mode": "NULLABLE"
    },
    {
        "name": "params",
        "type": "JSON",
        "mode": "NULLABLE"
    }
]

Pub/Subのセットアップ

スキーマの作成

コンソール内のPub/Subタブから、スキーマを作成します。任意のスキーマIDを指定して、スキーマタイプをAvroとし、以下のAvroスキーマで作成します。

{
  "type": "record",
  "name": "ClientEvent",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "browser_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "browser_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    },
    {
      "name": "params",
      "type": [
        {
          "type": "string",
          "sqlType": "JSON"
        },
        "null"
      ]
    }
  ]
}


トピックの作成

次に、スキーマに関連させる形でトピックを作成します。ここで「メッセージエンコードの選択」を「バイナリ」にします。


サブスクリプションの編集

最後に作成されたサブスクリプションを編集します。デフォルトでは配信タイプが「pull」となっているので、「BigQueryへの書き込み」に変更します。
先程作成したBigQueryのデータセットとテーブルを指定し、「トピックスキーマを使用する」「不明な項目を削除する」にチェックを入れて更新をします。

作成したトピックへパブリッシュする

まずはBigQueryのテーブルに沿う形で構造体を定義し、Avro形式のバイナリに変換する関数を作っていきます。
Avro形式への変換はgoavroというライブラリを使って行っています。

package main

import (
	"encoding/json"
	"time"

	"github.com/linkedin/goavro/v2"
)

type ClientEvent struct {
	ID             string
	Email          string
	OSName         *string
	OSVersion      *string
	BrowserName    *string
	BrowserVersion *string
	Params         map[string]any
	Timestamp      time.Time
}


func (e *ClientEvent) AvroBinary() ([]byte, error) {
	var schema = `{
  "type": "record",
  "name": "ClientEvent",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "browser_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "browser_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    },
    {
      "name": "params",
      "type": [
        {
          "type": "string",
          "sqlType": "JSON"
        },
        "null"
      ]
    }
  ]
}`

	c, err := goavro.NewCodec(schema)
	if err != nil {
		return nil, err
	}

	n := map[string]any{
		"id":        e.ID,
		"email":     e.Email,
		"timestamp": e.Timestamp,
	}

	if e.Params != nil {
		paramsBytes, err := json.Marshal(e.Params)
		if err != nil {
			return nil, err
		}

		n["params"] = map[string]any{"string": string(paramsBytes)}
	}
	if e.BrowserName != nil {
		n["browser_name"] = map[string]any{"string": *e.BrowserName}
	}
	if e.BrowserVersion != nil {
		n["browser_version"] = map[string]any{"string": *e.BrowserVersion}
	}
	if e.OSName != nil {
		n["os_name"] = map[string]any{"string": *e.OSName}
	}
	if e.OSVersion != nil {
		n["os_version"] = map[string]any{"string": *e.OSVersion}
	}

	return c.BinaryFromNative(nil, n)
}

最後に実際にパブリッシュしていきます。

package main

import (
	"context"
	"log"

	"cloud.google.com/go/pubsub"
)

func main() {
	var ce *ClientEvent
	// 取得したパラメータをClientEventに詰める

	b, err := ce.AvroBinary()
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()
	client := pubsub.NewClient(ctx, "project-id")
	topic := client.Topic("topic-name")
	defer topic.Stop()

	topic.Publish(ctx, &pubsub.Message{
		Data: b,
	})
}


最後に

今回はCloud Pub/SubのBigQueryサブスクリプションを使ってBigQueryにユーザーの行動ログを収集する方法を解説していきました。以前までのDataflowなどを使った方法よりもシンプルに実装できるので、ぜひ試してみてください。

また、クエリアはBigQueryや様々なデータベース、APIと連携して、社内向けの管理画面やツールを構築できるローコードツールの『Querier』を開発しています。無料トライアルも実施しておりますので、もし気になる方がいればサイトを覗いてみてください。

「Querier(クエリア)」は社内向け管理画面を圧倒的な速さで、かつビジネスのスケールに合わせて柔軟に構築することができるローコードツールです。

最新の記事

【告知】値の参照時の仕様変更のお知らせ

このたび2024年11月11日に値の参照に関する仕様変更を予定しておりますので詳細について報告いたします。

more

管理画面の構築もWeb上で完結
エンジニアのためのローコードツール

Querierについて詳しく見る