Querier

Cloud Pub/SubのBigQueryサブスクリプションを使ってBigQueryにユーザーの行動ログをリアルタイムに収集する

2023.02.16に公開 | 2023.02.16に更新

Querier運営

@querier_io@querierinc

「Querier(クエリア)」は社内向け管理画面を圧倒的な速さで、かつビジネスのスケールに合わせて柔軟に構築することができるローコードツールです。

管理画面の構築もWeb上で完結
エンジニアのためのローコードツール

Querierについて詳しく見る

みなさんこんにちは。今回はCloud Pub/SubのBigQueryサブスクリプションを使って、ユーザーの行動ログをリアルタイムに収集する方法をご紹介します。

BigQueryサブスクリプションとは

昨年夏ごろにCloud Pub/Subの機能として追加された機能です。今までBigQueryにデータを収集しようとしたときに、純粋なBigQuery APIのインサートを使うとコストが嵩んでしまう問題があり、それを回避するためにCloud Pub/SubからDataflowを経由してBigQueryへストリーミングする方法がよく利用されてきました。

そこで、今回Cloud Pub/Subの機能としてBigQueryサブスクリプションが追加され、Dataflowを経由することなくBigQueryへデータを投入することが可能になりました。

全体の流れ

クエリアはGoで書かれたGraphQL APIをクライアントへ提供していますので、Cloud Pub/SubへのパブリッシュなどはGoを使って行います。
全体の流れは以下のようになります。

  1. ログのリクエストをAvro形式に変換
  2. 変換されたログをCloud Pub/Subへパブリッシュ
  3. Avro形式のバイナリデータがBigQueryのスキーマにマッピングされてインサートされる(BigQueryサブスクリプション)


BigQueryのセットアップ

今回は以下のスキーマでBigQueryのテーブルを作成します。
BigQueryのデータセット、テーブルの作成については特別なことはしていませんので割愛します。

[
    {
        "name": "id",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "email",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "os_version",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "os_name",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "browser_name",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "browser_version",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "timestamp",
        "type": "TIMESTAMP",
        "mode": "NULLABLE"
    },
    {
        "name": "params",
        "type": "JSON",
        "mode": "NULLABLE"
    }
]

Pub/Subのセットアップ

スキーマの作成

コンソール内のPub/Subタブから、スキーマを作成します。任意のスキーマIDを指定して、スキーマタイプをAvroとし、以下のAvroスキーマで作成します。

{
  "type": "record",
  "name": "ClientEvent",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "browser_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "browser_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    },
    {
      "name": "params",
      "type": [
        {
          "type": "string",
          "sqlType": "JSON"
        },
        "null"
      ]
    }
  ]
}


トピックの作成

次に、スキーマに関連させる形でトピックを作成します。ここで「メッセージエンコードの選択」を「バイナリ」にします。


サブスクリプションの編集

最後に作成されたサブスクリプションを編集します。デフォルトでは配信タイプが「pull」となっているので、「BigQueryへの書き込み」に変更します。
先程作成したBigQueryのデータセットとテーブルを指定し、「トピックスキーマを使用する」「不明な項目を削除する」にチェックを入れて更新をします。

作成したトピックへパブリッシュする

まずはBigQueryのテーブルに沿う形で構造体を定義し、Avro形式のバイナリに変換する関数を作っていきます。
Avro形式への変換はgoavroというライブラリを使って行っています。

package main

import (
	"encoding/json"
	"time"

	"github.com/linkedin/goavro/v2"
)

type ClientEvent struct {
	ID             string
	Email          string
	OSName         *string
	OSVersion      *string
	BrowserName    *string
	BrowserVersion *string
	Params         map[string]any
	Timestamp      time.Time
}


func (e *ClientEvent) AvroBinary() ([]byte, error) {
	var schema = `{
  "type": "record",
  "name": "ClientEvent",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "browser_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "browser_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "os_version",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    },
    {
      "name": "params",
      "type": [
        {
          "type": "string",
          "sqlType": "JSON"
        },
        "null"
      ]
    }
  ]
}`

	c, err := goavro.NewCodec(schema)
	if err != nil {
		return nil, err
	}

	n := map[string]any{
		"id":        e.ID,
		"email":     e.Email,
		"timestamp": e.Timestamp,
	}

	if e.Params != nil {
		paramsBytes, err := json.Marshal(e.Params)
		if err != nil {
			return nil, err
		}

		n["params"] = map[string]any{"string": string(paramsBytes)}
	}
	if e.BrowserName != nil {
		n["browser_name"] = map[string]any{"string": *e.BrowserName}
	}
	if e.BrowserVersion != nil {
		n["browser_version"] = map[string]any{"string": *e.BrowserVersion}
	}
	if e.OSName != nil {
		n["os_name"] = map[string]any{"string": *e.OSName}
	}
	if e.OSVersion != nil {
		n["os_version"] = map[string]any{"string": *e.OSVersion}
	}

	return c.BinaryFromNative(nil, n)
}

最後に実際にパブリッシュしていきます。

package main

import (
	"context"
	"log"

	"cloud.google.com/go/pubsub"
)

func main() {
	var ce *ClientEvent
	// 取得したパラメータをClientEventに詰める

	b, err := ce.AvroBinary()
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()
	client := pubsub.NewClient(ctx, "project-id")
	topic := client.Topic("topic-name")
	defer topic.Stop()

	topic.Publish(ctx, &pubsub.Message{
		Data: b,
	})
}


最後に

今回はCloud Pub/SubのBigQueryサブスクリプションを使ってBigQueryにユーザーの行動ログを収集する方法を解説していきました。以前までのDataflowなどを使った方法よりもシンプルに実装できるので、ぜひ試してみてください。

また、クエリアはBigQueryや様々なデータベース、APIと連携して、社内向けの管理画面やツールを構築できるローコードツールの『Querier』を開発しています。無料トライアルも実施しておりますので、もし気になる方がいればサイトを覗いてみてください。

「Querier(クエリア)」は社内向け管理画面を圧倒的な速さで、かつビジネスのスケールに合わせて柔軟に構築することができるローコードツールです。

最新の記事

2〜3ヶ月と見積もっていた開発期間を、クエリアを導入することでわずか1週間に短縮できました

2012年5月創業のフルカイテン株式会社。 「在庫をフル回転させる」をコンセプトに、機械学習を駆使したSaaS『FULL KAITEN』を提供し、在庫問題の解決に取り組む。

more

管理画面の構築もWeb上で完結
エンジニアのためのローコードツール

Querierについて詳しく見る