CODING ECHO

インターネット広告屋でエンジニアをやっている人のブログ

Cloud Machine Learning Engineを使用した機械学習のシステムアーキテクチャ

Google Cloud PlatformのCloud Machine Learning Engineを使用した機械学習のシステムアーキテクチャについて考える機会があったので紹介したいと思います。

前提条件

  • Google Cloud Platformを使用する
  • オンライン予測が短時間に集中するが頻度は高くない (1予測バースト/day)
  • 使っていないときはシステムを停止させてお金をかけたくない
  • 学習用データは月1回程度増えていく
  • 専属で運用する人がいない
  • オンライン予測するためにREST APIを使いたい
  • 学習用データは人が入力する
  • 学習用データの入力からREST APIの提供までを自動化したい

システム概要

f:id:ytanak:20190117203839p:plain
図1. システムアーキテクチャ

図1はシステム全体の学習パイプラインです。学習用データを人間が入力したあと、REST APIが公開されるまで自動で実行します。

学習して精度の向上があれば新しいモデルに差し替える方法もありますが、 ここでは学習データが追加されるたびに新しいバージョンに更新しています。

処理の流れ

全体の流れは大まかに分けると以下のようになります。

  1. 学習用データの入力
  2. データの結合
  3. 学習
  4. 学習モデルを新しいバージョンにアップデート

以下で細かく見ていきます。

1. 学習用データの入力

前面にバリデーション用のWebサービスをおきます。人間が入力するデータには誤りが発生するので、正しいフォーマットで入力されるようにバリデーション用のWebサービスを入力の入り口とします。

2. データの結合

Cloud StorageにアップロードされたらCloud Functionsでトリガーします。

学習用データが増えることに対応するために、オリジナルのデータと結合されたデータの2つをCloud Storageに別々のバケットとして保存します。

データの結合処理は以下の例のようになります。

main.py

from google.cloud import storage
from google.cloud.storage import Blob


def f(event, context):
    storage_client = storage.Client()
    bucket_name = 'my-origin-bucket'
    bucket = storage_client.get_bucket(event['bucket'])
    blobs = bucket.list_blobs()

    # バケットから学習用CSVデータを取得して結合する
    with open('/tmp/' + "merged.csv", "w") as out_file:
        for blob in blobs:
            print(blob.name)
            with open('/tmp/' + blob.name, "wb") as file_obj:
                blob.download_to_file(file_obj)

            # 最終行に改行文字が無ければ入れたい
            with open('/tmp/' + blob.name) as f:
                for line in f:
                    if line[-1] == '\n':
                        out_file.write(line)
                    else:
                        out_file.write(line + '\n')

    # 結合されたCSVをトレーニング用のバケットに保存する
    store_bucket_name = 'my-bucket'
    store_bucket = storage_client.get_bucket(store_bucket_name)
    blob = Blob("data.csv", store_bucket)
    with open('/tmp/' + "merged.csv", "rb") as f: # working directoryには保存できなので/tmpに保存する
        blob.upload_from_file(file_obj=f)

requirements.txt

google-cloud-storage>=1.13.2,<1.14.0

3. 学習

同じように結合されたデータがCloud FunctionsでトリガーされCloud Machine Learning Engineで学習が実行されます。Cloud Machine Learning Engineのジョブ実行には専用のSDKは提供されていないようなので、 Python Client Library のREST APIを使ってジョブを実行します。

学習処理は以下のようになります。

main.py

import logging
from datetime import datetime

from googleapiclient import discovery
from googleapiclient import errors


def f(event, context):
    try:
        response = train()
        logging.debug(response)
    except errors.HttpError as err:
        logging.error('There was an error creating the training job. Check the details:')
        logging.error(err._get_reason())

def train():
    training_inputs = {'scaleTier': 'BASIC',
                       'packageUris': ['gs://my-ml-bucket/training_package/trainer-0.1.tar.gz'],
                       'pythonModule': 'xgboost_trainer.training',
                       'region': 'us-central1',
                       'jobDir': 'gs://my-ml-bucket/xgboost_job_dir',
                       'runtimeVersion': '1.12',
                       'pythonVersion': '2.7'}

    now =  datetime.now().strftime("%Y%m%d_%H%M%S")
    job_spec = {'jobId': 'grammar_' + now, 'trainingInput': training_inputs}

    project_name = 'my-project-123'
    project_id = 'projects/{}'.format(project_name)
    cloudml = discovery.build('ml', 'v1')
    request = cloudml.projects().jobs().create(body=job_spec, parent=project_id)
    return request.execute()

学習用スクリプト

事前に学習用のスクリプトをPython packageとしてCloud Storageにアップロードしておく必要があります。上記の例では 'packageUris': ['gs://my-ml-bucket/training_package/trainer-0.1.tar.gz'] でパッケージを指定しています。今回は紹介していませんが、この部分も学習用スクリプトが更新されたら自動的にパッケージを作成してアップロードできる仕組みを用意しておくとよいでしょう。

Pythonのパッケージを作成するには

xgboost_trainer/training.py
setup.py

のようなディレクトリ構成を作成したあと

$ python setup.py sdist

を実行します。パッケージは sdist/ に作成されます。詳しくは setup.py について調べてみてください。

学習済みモデルの保存

また、学習用スクリプトで学習した学習済みモデルは自動では保存されません。自前でCloud Storageなどに保存する必要があります。

以下の例では学習済みモデルの my_model.bst をCloud Storageにアップロードしています。

# -*- coding: utf-8 -*-

import os
import subprocess
import sys

import pandas as pd


gcs_model_path = os.path.join('gs://', 'my-bucket', 'data.csv')
subprocess.check_call(['gsutil', 'cp', gcs_model_path, 'data.csv'], stderr=sys.stdout)

df = pd.read_csv('data.csv', sep='\t', encoding='utf-8')

# train something...

# 保存した学習済みモデルをCloud Storageにアップロード
model_filename = 'my_model.bst'
gcs_model_path = os.path.join('gs://', 'my-ml-model-bucket', model_filename)
subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)

4. 学習モデルを新しいバージョンにアップデート

学習が終わるとCloud Storageにモデルが保存されます。これをCloud FunctionsでトリガーしてCloud Machine Learning Engineに新しいバージョンとしてデプロイします。デプロイ時に新しいバージョンをデフォルトのバージョンにしておきます。こうすることで、REST APIで特定のバージョンを指定しない限り、常に新しい学習済みモデルを使用して予測ができるようになります。

main.py

import logging
import time
from datetime import datetime

from googleapiclient import discovery
from googleapiclient import errors


def f(event, context):
    try:
        response = deploy()
        logging.debug(response)
    except errors.HttpError as err:
        logging.error('There was an error creating the training job. Check the details:')
        logging.error(err._get_reason())

def deploy():
    project_name = 'my-project-123'
    model_name = 'my-xgboost-model'


    # TODO もしCloud  Machine Learning Engineにモデルが無ければ先に作成するようにしたい

    version_name = 'v' + datetime.now().strftime("%Y%m%d_%H%M%S")
    body = {
        'name': version_name,
        'deploymentUri': 'gs://my-ml-model-bucket',
        'runtimeVersion': '1.12',
        'framework': 'XGBOOST',
        'pythonVersion': '2.7',
    }

    # 新しいバージョンをデプロイする
    cloudml = discovery.build('ml', 'v1')
    parent = 'projects/{}/models/{}'.format(project_name, model_name)
    request = cloudml.projects().models().versions().create(body=body, parent=parent)
    request.execute()

    # バージョンのデプロイが完了するのを待つ
    while True:
        time.sleep(5)
        name = 'projects/{}/models/{}/versions/{}'.format(project_name, model_name, version_name)
        request = cloudml.projects().models().versions().get(name=name)
        response = request.execute()
        logging.info(response)

        if response['state'] == 'READY':
            break

    # 新しいバージョンをデフォルトのバージョンにする
    name = 'projects/{}/models/{}/versions/{}'.format(project_name, model_name, version_name)
    request = cloudml.projects().models().versions().setDefault(name=name)
    return request.execute()

デプロイしたバージョンが if response['state'] == 'READY': でデプロイが完了するまで待っていますが、これには1分ほどかかります。Cloud Functionsのデフォルトの処理のタイムアウト時間が60秒なので120秒ぐらいに変更しておく必要があります。

また、モデルのバージョンの名前には v201901017_092356 のように v + 日時 を使うと管理しやすいです。

これでREST APIとして利用できる準備ができました。あとはApp Engineや外部からCloud Machine Learning Engineのオンライン予測APIにリクエストすれば予測結果が返ってきます。

今後問題となりそうなところ

前提条件の"専属で運用する人がいない"と言うところがまだ解決できていません。ドキュメントの整備はもちろんですが、Google Cloud Platformを構築する時に手間がかかります。この部分もコード化してGoogle Cloud Platform上で全てのサービスをコマンド一発で立ち上げられるようにできると良さそうです。これにはTerraformを使うと良いかな?と思っています。