コンシューマ駆動契約テストに入門してみた〜状態管理編〜

タム

2024.09.14

35

こんにちは、タムです。

前回の投稿からだいぶ間が空いてしまいました。

今回はコンシューマ駆動契約テストシリーズの第3回目の投稿です。

最初の投稿で課題として上げていた、プロバイダの状態の管理について

ほんの触りですがようやく実装できたため紹介させていただきます。


システムは以前からサンプルとして使用しているものを今回も使います。

フロントエンドはSSRのNextJSです。

バックエンドはPythonのChalice APIです。

今の所この2つのみでシステムが完結しているため、

コンシューマ駆動契約の用語で言うならフロントエンド=コンシューマ、バックエンド=プロバイダ

ということになります。


さて、前回までは契約としてpingリクエストを送ったらpongレスポンスが返ってくるみたいな、

いわゆるHello World的な振る舞いしか契約として実装していませんでしたが、

このシステムはユーザが短い投稿を送れて、以前送った投稿を表示できる、

というX(Twitter)のようなイメージのWebアプリです。

そこで、今回は投稿を送る、及び送った投稿の一覧を取得するAPIに対して契約を作成してみます。

振る舞いとしてはこれ以上無いほどシンプルですが、契約を作る(検証する)にあたっては

考慮しなければいけない問題がいくつかあります。

認可の問題

トークンの有効期限問題

まずは認可の問題です。

このシステムではユーザが投稿リクエストを送る権限があるかどうかと

ユーザの特定をアクセストークンを使って行っています。

認証・認可にはAuth0を使っており、ユーザがログインしたタイミングで取得した

アクセストークンをバックエンドに送信しています。


Pactの契約を作る際、リクエスト内容と対応するレスポンスを定義しますが、

リクエスト内容のAuthorizationヘッダにAuth0から取得したアクセストークンを書き込んだとしても、

アクセストークンには有効期限があります。

有効期限が切れた途端に失敗しかしなくなるテストコードになってしまいます。

対処法として以下の2パターンを考えました。

投稿APIのプロバイダ側の契約テストで必要になるアクセストークンの問題の対処法:

  1. リクエストヘッダをバックエンド側で上書きする
    1. pactmanの場合はpact_verifier.verifyの第3引数に辞書形式で追加のリクエストヘッダを指定できるようです。
    2. ただしこのやり方だと逆に常に認証が成功してしまい、認証失敗するケースをテストできなくなってしまいます。
  2. 認証コンポーネントをモック化する
    1. モック化する場合、認証のロジックを別で担保するのかどうかを検討する必要があります。
    2. 実行中のAPIの内部状態を変更する必要があります。

どちらのパターンにおいてもデメリットがありました。

(1のリクエストヘッダ上書きの方は、ケースごとに上書きするヘッダを指定できたらいいのにな・・・と思います)


次に対処法2のデメリットである、実行中のAPIの内部状態を変更する方法について検討しました。


django(公式ドキュメントの例)の場合

pactmanのドキュメントでは、djangoの場合のlive_server pytest fixtureを使った例が書いてあります。

自分はdjangoは全くの未経験ですが、軽く調べたところによるとlive_serverはテストとは異なるスレッドで実行されるようです。

今回のように内部状態を変えたい場合はどうやって対応するんだろうと思ってもう少し調べてみたら、

transactional_db fixtureに依存することで、DBの内容を操作できるようになっているようです。

(その内部実装がどうなっているかまでは追えていません・・・)


Chalice(今回のプロジェクト)の場合

テスト用のClientがあるようです。

こちらは先程のlive_serverとは異なり、実際にサーバを起動するのではなく、

あくまでクライアントのモックとしてリクエストに対するレスポンスを返してくれるみたいです。

なので同じプロセスになるので、モンキーパッチなどで外部からデータを書き換えるのは簡単そうです。

しかし、pactmanの公式がlive_serverを例として出していることからも想像できるように、

pactmanは別プロセスとして実際に動くAPIを前提とした実装になっています。


pact_verifierの実装を少し深堀したところ、単純に内部でrequestsの関数を呼び出しているだけでした。

したがって、もしChalice提供のテスト用Clientを使うのであれば、

requestsのget関数を呼び出した際にテスト用Clientのgetが呼ばれる、みたいに

うまく噛み合うようなパッチを手探りで作っていかなければなりません。


自分は少しだけ試してみましたが無理そうだったのでテスト用Clientは使わないことにしました。

そうなると、テスト用Clientではなく実際にchalice localでサーバを起動するやり方になります。

そうすると、別プロセスで動いているChaliceプログラムの内部状態を変えなければいけないという

難しい問題に直面しなければならず、こちらもかなり厄介な実装になりそうだったので、

アクセストークン問題の対処法2は断念し、1で対応する方針に切り替えました。


対処法1のデメリットとしては認証失敗ケースのテストができないというのがありますが、

今回は別で(手動テストなどで)担保するので問題ないとしました。

(というより、それしか現実的な方法がないのですが)

アクセストークンの取得方法

さて、少し話が本題とそれてしまいますが、

リクエストヘッダを上書きするとして、そのアクセストークンはどうやって取得すればいいでしょうか?

何を言っているんだと思われるかもしれませんが、今回プロデューサ側のテストを考えています。

つまり、アクセストークンは送られてくるわけではなく、プロデューサ自身の側で生成しなければいけません。

今回は投稿権限を持ったテスト用のユーザをAuth0側に実際に作成し、

(画面はないので)OAuth2.0のリソースオーナーパスワードフローを使って直接トークンを取得する形にしました。

パスワードフローを使うにあたって色々と設定などが必要だったのですが、そのあたりの話は完全に脱線してしまうのでここでは省略します。

pactmanのextra_provider_headersが上書きされる問題

アクセストークンが無事に取得できたとき、また別の壁が立ちはだかりました。

pact_verifier.verifyの第3引数に設定するextra_provider_headersは、(なんと)pactに記載の内容で上書きされてしまいます

つまりどういうことかというと、コンシューマ側で契約を定義しますが、その際にAuthorizationヘッダに"Bearer hoge"と設定していたとすると、

extra_provider_headersを指定していたとしても"Bearer hoge"の方が勝ってしまうということです。

更に悪いことに、extra_provider_headers@propertyで「保護されている」ため、パッチするのも困難になってしまっています。

自分の場合は仕方なく、契約にはAuthorizationヘッダを含めないようにしました。

実際と異なる契約になってしまうため、できればやりたくなかったのですが。

自分の理解が及んでいないだけかもしれませんが、契約側よりプロバイダ側の注入の方が勝つようにしてほしいです・・・

pactmanの今後の改修に期待したいです。


ようやくアクセストークンを正しく設定できるようになったところで、別のエラーになりました。

ユーザが自身の投稿の一覧を取得するAPIの検証結果です:

Failed: body size 0 is smaller than minimum size 1 at body


ただここまでくれば勝ったも同然ですね。

今はユーザの投稿を1件も保存していない状態なので、想定通りのエラーです。

DBの問題

接続先DBの検討

さてようやくDBについてどうするかを考えるタイミングが来ました。

APIは別プロセスでchalice localで動いているので、現状DBもモックではなく本物(といってもDynamoDB localですが)に繋がっています。

まずDBにモックを使わないのは妥当かどうかですが、このDBは内部システムとみなせるため、実物を使用すべしという結論になります。

このあたりは以前の投稿をご参照ください。

次に、DBのコンテナについて考えます。

現状はローカル開発環境用のDBにchaliceが接続してしまっていますが、ローカル開発環境用とは切り離したいです。

そこで単純にテスト用のdockerコンテナを別途作成し、chaliceのstageに紐づく環境変数を(.chalice/config.jsonで)切り分けることで、

ローカル開発環境chaliceとテスト用chaliceで別々のDBコンテナを見に行くようにしました。

CI/CD環境をどうするか

上に述べたような試行錯誤の末ようやくローカルで投稿APIのプロバイダ側のテストが通るようになりました。

CodeBuild環境でもテストを通したいのですが、今のままだと通りません。

それは、ローカルではpythonコンテナ上でテストを実行していますが、CodeBuild環境では現状localhostで直接テストを実行しているためです。

もし現状のままlocalhostで実行するのであれば、DBコンテナに接続するためには

DBコンテナをポートフォワーディングし、対象ポートに接続する必要があります。

接続先は環境変数で切り替えればいいのでそこまで難易度は高くありませんが、

今後も開発を進めていく中でローカル環境とCodeBuild環境の環境差異が原因で

テストが通らなくなることがありそうです。

手元の環境でないと中々デバッグもし辛いですし、できればこの際ローカルと同じように

CodeBuild環境でもDocker上でテストを実行するようにしたいです。

CodeBuildでNoRegionError

CodeBuild環境でもDocker上でテストを実行するようにしたところ、

conftest.pyのboto3をインスタンス化している箇所でNoRegionErrorが発生しました。

これはCodeBuildのAWSプロファイルがコンテナに引き継がれないためです。

CodeBuildの場合、認証情報が環境変数に設定されているわけでも~/.aws/配下に保存されているわけでもないようです。

したがってCodeBuild環境の認証情報をDockerに引き継ぐ方法はありません。

コンテナからawsコマンドを実行するには、コンテナ用のIAMロールを作る必要があります。

AWS公式にやり方が書いてあります。

ローカルだと通るがcodebuildだとDynamoDB-localへの接続がタイムアウトする

curlならIP、ホスト名指定共にpythonコンテナからDBコンテナへの接続が成功するのですが、

なぜかpynamodb(botocore)経由だとタイムアウトしてしまう現象に相当悩まされました。

様々な試行錯誤の末、最終的に最小限の構成で新規プロジェクトをローカルに立ち上げたところ、

単純にdataディレクトリのパーミッションを変更していなかったことが原因と気づきました。

プロバイダ側のテストが不安定に失敗することがある

投稿APIに対してリクエストを投げた際に、以下のようなエラーが発生しテストが頻繁に失敗するようになりました。

E           requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=8008): Max retries exceeded with url: /auth0/microposts (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x727b9434f610>: Failed to establish a new connection: [Errno 111] Connection refused'))


DB更新などを行った際にchaliceの監視対象が更新したと判定され、ホットリロードによりchaliceが再起動するようです。

そして再起動している間にリクエストが来ると、応答できずにエラーになってしまうものと思われます。

そのためchaliceの実行オプションに--no-autoreloadを追加し、リロードされないようにしたところ失敗しなくなりました。

プロバイダの状態管理

ユーザの投稿一覧を取得するAPIの契約で、ユーザの投稿が存在する場合としない場合の

両方のケースについてプロバイダ側で契約を満たしているか確認する必要があります。

このように前提としているプロバイダ側の状態がある場合、どのようにコントロールしたらいいでしょうか?


Pactではもちろんこのようなケースに対応できます。

pactmanの場合はpact_verifier.verifyの第二引数にprovider_state関数を渡してあげることで対応できます。

provider_state関数の第一引数のnameに、契約に規定されたproviderStates[].nameが渡ってくるので、

そのラベルで分岐することで必要な状態を作成することが可能です。


例えば、今回の例だとユーザの投稿が存在する場合には"ユーザの投稿が存在する場合"という文字列が渡ってくるので、

provider_state関数の中でproviderStates[].name"ユーザの投稿が存在する場合"だった場合には事前に投稿APIをPOSTすることで

投稿が存在する状態を作っています。

def provider_state(token):
    def _provider_state(name, **params):
        if name == "ユーザの投稿が存在する場合":
            requests.post(f"{CHALICE_URL}/auth0/microposts", json={"content": "hello, world"}, headers={'Authorization': f"Bearer {token}"})
    return _provider_state


なお、今回はprovider_state関数がtokenを受け取って関数を返す関数になっていますが、

これは投稿APIをPOSTする際にtoken fixtureで生成したトークンを渡す必要があるためです。

今回行った修正

コンシューマ側

プロダクションコード

app/api/microposts/service.ts

import axios from "axios";

export default function MicropostService(baseUrl: string) {
    const ping = async (): Promise<{message: string}> => {
        const res = await fetch(`${baseUrl}/ping`, {
            headers: {
              "Content-Type": "application/json",
            }
        });
        return res.json();
    }

    const getAll = async (accessToken: string|undefined): Promise<Array<{content: string, postedAt: string}>> => {
        const response = await axios.get(`${baseUrl}/auth0/microposts`, {
            headers: {
              "Authorization": `Bearer ${accessToken}`,
              "Content-Type": "application/json",
            }
        });
        return response.data;
    }

    const post = async (accessToken: string|undefined, reqJson: any) => {
        await axios.post(`${baseUrl}/auth0/microposts`, reqJson, {
            headers: {
              "Authorization": `Bearer ${accessToken}`,
              "Content-Type": "application/json",
            },
        });
    }

    return {ping, getAll, post}
}


テストコード

tests/api.pact.spec.ts

import MicropostService from "@/app/api/microposts/service";
import { MatchersV3, PactV3 } from "@pact-foundation/pact";
import path from "path";

const {eachLike, string, regex, timestamp} = MatchersV3;

const provider = new PactV3({
    dir: path.resolve(process.cwd(), 'pacts'),
    consumer: 'auth0-next-custom',
    provider: 'google-login-back'
});

describe('GET /ping', () => {
    it('200レスポンス(pong)を返すこと', async () => {
        provider.addInteraction({
            states: [{description: 'pongレスポンスに成功する場合'}],
            uponReceiving: 'pingリクエストを送信する',
            withRequest: {
                method: 'GET',
                path: '/ping',
            },
            willRespondWith: {
                status: 200,
                headers: {'Content-Type': 'application/json'},
                body: MatchersV3.equal({message: 'pong'}),
            }
        });

        await provider.executeTest(async (mockserver) => {
            const service = MicropostService(mockserver.url);
            const response = await service.ping();
            expect(response.message).toStrictEqual('pong');
        })
    })
});

describe('GET /auth0/microposts', () => {
    it('micropostsを返すこと', async () => {
        provider.addInteraction({
            states: [{description: 'ユーザの投稿が存在する場合'}],
            uponReceiving: 'ユーザの全ての投稿をリクエストする',
            withRequest: {
                method: 'GET',
                path: '/auth0/microposts',
                // 実際はAuthorizationヘッダにBearerトークンを含める必要がある
            },
            willRespondWith: {
                status: 200,
                headers: {'Content-Type': 'application/json'},
                body: eachLike({
                    content: string('Hello, World.'),
                    postedAt: timestamp('YYYY-MM-DD HH:mm:ss', '2024-08-21 23:01:01'),
                })
            }
        });

        await provider.executeTest(async (mockserver) => {
            const service = MicropostService(mockserver.url);
            const response = await service.getAll('hoge');
            expect(JSON.stringify(response)).toStrictEqual(
                JSON.stringify([{content: 'Hello, World.', postedAt: '2024-08-21 23:01:01'}])
            );
        })
    });

    it('空配列を返すこと', async () => {
        provider.addInteraction({
            states: [{description: 'ユーザの投稿が存在しない場合'}],
            uponReceiving: 'ユーザの全ての投稿をリクエストする',
            withRequest: {
                method: 'GET',
                path: '/auth0/microposts',
                // 実際はAuthorizationヘッダにBearerトークンを含める必要がある
            },
            willRespondWith: {
                status: 200,
                headers: {'Content-Type': 'application/json'},
                body: [],
            }
        });

        await provider.executeTest(async (mockserver) => {
            const service = MicropostService(mockserver.url);
            const response = await service.getAll('hoge');
            expect(response).toStrictEqual([]);
        })
    })
})

describe('POST /auth0/microposts', () => {
    it('正常レスポンスを返すこと', async () => {
        provider.addInteraction({
            states: [{description: 'リクエスト内容がcontentを含んでいる場合'}],
            uponReceiving: 'ユーザの投稿を投げる',
            withRequest: {
                method: 'POST',
                path: '/auth0/microposts',
                body: {
                    content: string('hello'),
                }
                // 実際はAuthorizationヘッダにBearerトークンを含める必要がある
            },
            willRespondWith: {
                status: 201,
                headers: {'Content-Type': 'application/json'},
                body: null,
            }
        });

        await provider.executeTest(async (mockserver) => {
            const service = MicropostService(mockserver.url);
            await expect(service.post('hoge', {content: 'world'})).resolves.not.toThrow();
        })
    });


    it('500レスポンスを返すこと', async () => {
        provider.addInteraction({
            states: [{description: 'リクエスト内容がcontentを含んでいない場合'}],
            uponReceiving: 'ユーザの投稿を投げる',
            withRequest: {
                method: 'POST',
                path: '/auth0/microposts',
                body: {
                    greeting: string('hello'),
                }
                // 実際はAuthorizationヘッダにBearerトークンを含める必要がある
            },
            willRespondWith: {
                status: 500,
                headers: {'Content-Type': 'application/json'},
            }
        });

        await provider.executeTest(async (mockserver) => {
            const service = MicropostService(mockserver.url);
            await expect(service.post('hoge', {greeting: 'world'})).rejects.toThrow('Request failed with status code 500');
        })
    })
})


プロバイダ側

テストコード

tests/conftest.py

import pytest
import subprocess
import time
import os
from datetime import datetime
import requests
import boto3

ssm = boto3.client("ssm")

AUTH0_CLIENT_ID = ssm.get_parameter(Name="/auth0/client_id")["Parameter"]["Value"]
AUTH0_CLIENT_SECRET = ssm.get_parameter(Name="/auth0/client_secret")["Parameter"]["Value"]
AUTH0_TEST_USER_NAME = ssm.get_parameter(Name="/auth0/test-user01/name")["Parameter"]["Value"]
AUTH0_TEST_USER_PASSWORD = ssm.get_parameter(Name="/auth0/test-user01/password")["Parameter"]["Value"]

STAGE = os.getenv('STAGE')

DB_ENDPOINT = "http://dynamo-test:8000"


@pytest.fixture()
def refresh_table():
    sub_env = os.environ.copy()
    sub_env["DB_ENDPOINT"] = DB_ENDPOINT
    subprocess.run(args=["python", "refresh_table.py"], env=sub_env)
    yield


@pytest.fixture(scope="session", autouse=True)
def start_chalice_local():
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    filename = f"/var/log/{timestamp}.log"

    print(f"stage: {STAGE}")

    with open(filename, "w") as f:
        process = subprocess.Popen(
            ["chalice", "local", "--host=0.0.0.0", f"--port={os.getenv('TEST_PORT')}", f"--stage={STAGE}", "--no-autoreload"],
            stdout=f,
            stderr=subprocess.STDOUT,
            text=True,
        )

    # サービスが完全に起動するまで待機
    time.sleep(5)
    yield
    process.terminate()
    process.wait()

    with open(filename, "r") as f:
        print("\nchalice output:\n", f.read())


@pytest.fixture(scope="session")
def token():
    data = {
        "client_id": AUTH0_CLIENT_ID,
        "client_secret": AUTH0_CLIENT_SECRET,
        "audience": "my-custom-api",
        "username": AUTH0_TEST_USER_NAME,
        "password": AUTH0_TEST_USER_PASSWORD,
        "grant_type": "password",
    }

    res = requests.post("https://dev-XXXXXXXXXXX.us.auth0.com/oauth/token", data)

    yield res.json()["access_token"]


@pytest.fixture(scope="session", autouse=True)
def create_table():
    sub_env = os.environ.copy()
    sub_env["DB_ENDPOINT"] = DB_ENDPOINT
    subprocess.run(args=["python", "create_table.py"], env=sub_env)
    yield


tests/test_pacts.py

import os
import requests


CHALICE_URL = f"http://localhost:{os.getenv('TEST_PORT')}"


def provider_state(token):
    def _provider_state(name, **params):
        if name == "ユーザの投稿が存在する場合":
            requests.post(f"{CHALICE_URL}/auth0/microposts", json={"content": "hello, world"}, headers={'Authorization': f"Bearer {token}"})
    return _provider_state


def test_pacts(pact_verifier, token, refresh_table):
    pact_verifier.verify(CHALICE_URL, provider_state(token), extra_provider_headers={'Authorization': f"Bearer {token}"})


さいごに

今回は状態の管理が必要なコンシューマ駆動契約の実装をしてみました。

一見簡単そうでしたが実際にやってみるとかなりハマりポイントが存在するなと思いました。

扱いづらさの一番の原因はテストとは別のプロセスでAPIサーバやDBサーバを起動する必要が

あったためではないかなと思います。

とはいえコンシューマ・プロバイダの相互作用に関するテストなのである程度致し方ないかなと。


今回実装してみてコンシューマ駆動契約もそれなりにコストがかかることがわかったので、

実際のPJで導入するかどうかはコストに見合ったメリットがあるかどうかを慎重に検討した上で判断すべきだと思いました。

この記事をシェアする