PyMCのトラブル対処記録(1)

「岩波データサイエンス Vol.1 [特集] ベイズ推論とMCMCのフリーソフト」という本を借りたので読んでみると、PyMCというライブラリを使えばPythonでもできると書かれていたので、サンプルコードがBUGSやRで書かれている例題を含めて全てPyMCでやってみることにした。
BUGSやRのコードを読み解きながら、p.39〜p.46辺りの、時系列データに状態空間モデルのトレンドモデルや、観測モデルに対数正規分布を用いたモデルを当てはめてパラメーター推定する例題まで順調に進められたが、p.47〜p.48の「観測値が二項分布のモデル」でPyMCのトラブルに遭遇した。その回避方法がわかったので、ここに記す。

まず成功例として、p.45辺りの、観測モデルに対数正規分布を用いた例を紹介する。
なお、今回はなるべくこの本のサンプルコードと比較できるように、サンプルコードに近い形で書くことを目指している為、PyMCを上手く使う書き方にはしていない。例えば、PyMCではモデル作成時にループを使うことが推奨されていないようである。そもそも筆者はPyMCを使い始めたばかりで、ではどう書くのが良いのかがわからない。

import matplotlib.pyplot as plt
# 年輪データ
# https://github.com/iwanami-datascience/vol1/blob/master/ito/nenrin.data より借用

nenrin_data = [
    4.71, 7.7 , 7.97, 8.35, 5.7 , 7.33, 3.1 , 4.98, 3.75, 3.35, 1.84, 3.28, 2.77, 2.72, 2.54,
    3.23, 2.45, 1.9 , 2.56, 2.12, 1.78, 3.18, 2.64, 1.86, 1.69, 0.81, 1.02, 1.4 , 1.31, 1.57]
index = range(1960, 1990)

plt.plot(index, nenrin_data, color='k')
plt.ylabel('Tree-ring width')
plt.show()

こういう、京都市内で採取された杉の木の年輪幅の推移のデータがあり、これに次のような状態空間モデルを当てはめる。

αtが年輪幅で、1行目はαtのシステムモデルとしてトレンドモデルを仮定するもの、2行目は1行目を変形したもの、3行目は観測モデルである。 このαtを次のコードで推定する。

●コード
import pymc as pm
import pandas as pd

N = nenrin_data.shape[0]

with pm.Model() as model:
    # 事前分布
    alpha = [pm.Uniform('alpha1', 0, 10), pm.Uniform('alpha2', 0, 10)]
    sigma = [pm.Uniform('sigma1', 0, 10), pm.Uniform('sigma2', 0, 10)]  # σ_ε, σ_η
             
    # システムモデル
    for i in range(2, N):
        alpha.append(pm.Normal(f'alpha{i+1}', mu=2 * alpha[i-1] - alpha[i-2], sigma=sigma[1]))

    # 観測モデル
    y = []
    for i in range(N):
        y.append(pm.LogNormal(f'y{i+1}', mu=alpha[i], sigma=sigma[0], observed=nenrin_data['width'].iloc[i]))

with model:
    trace = pm.sample(10000, progressbar=False)
●結果
# 結果の可視化
# alphaはyの対数に対応しているのでexp(alpha)に変換している
import numpy as np

plt.plot(index, nenrin_data, color='k', label='observed')

# 事後分布の平均
alpha_mean = [trace['posterior'][f'alpha{i+1}'].mean() for i in range(N)]
alpha_mean_exp = np.exp(alpha_mean)
plt.plot(index, alpha_mean_exp, color='b', linewidth=2, label='posterior')

# 95%ベイズ信用区間
alpha_025 = [np.percentile(trace['posterior'][f'alpha{i+1}'], 2.5) for i in range(N)]
alpha_975 = [np.percentile(trace['posterior'][f'alpha{i+1}'], 97.5) for i in range(N)]
alpha_025_exp = np.exp(alpha_025)
alpha_975_exp = np.exp(alpha_975)
plt.fill_between(index, alpha_025_exp, alpha_975_exp, color='cyan', alpha=0.15, label='95% credible interval')

plt.ylabel('Tree-ring width')
plt.legend()
plt.show()

この結果は本に記載されている図とほぼ同じである。

次に、1つ目の問題が発生した、p.47〜p.48の「観測値が二項分布のモデル」の例である。

ある林にいる鳥の数をカウントするとする。 鳥の個体を発見できる確率をp、1回の観測につき5回(i=1〜5)繰り返しカウントする、それを50回(t=1〜50)観測するとする。 実際には観測できない真の個体数Ntは期待値λtのポアソン分布に従うとする。
t回目の観測におけるi回目のカウントでの発見個体数をNtiobs とすると、観測モデルは次のようになる。

λtは対数スケールで、分散σ2の正規分布に従ってランダムウォークするとすると、システムモデルは次のようになる。

●データ
import numpy as np

# 模擬データ作成
np.random.seed(5)

Nt = 50
Nr = 5
p = 0.6
sigma = 0.1

# log λ_t
log_lambda = [np.log(30)]
for i in range(1, Nt):
    log_lambda.append(log_lambda[i-1] + np.random.normal(0, sigma))

# 真の個体数
N_true = [np.random.poisson(np.exp(log_lambda[i])) for i in range(Nt)] 

# 発見個体数
Nobs = np.array([np.random.binomial(N_true[t], p, Nr) for t in range(50)])

# 可視化
import matplotlib.pyplot as plt

plt.plot(range(1, Nt+1), N_true, color='k', linewidth=0.5, label='true')
plt.scatter([[t+1]*5 for t in range(50)], Nobs, marker='.', alpha=0.5, label='observed')

plt.xlabel('time')
plt.title('Bird count')
plt.legend(loc='upper center')
plt.show()

このデータから真の個体数を推定するとする。
上の成功例と同じ要領で、次のようなコードを実行すると、CompileErrorが発生した。

●コード
# 発見個体数からのパラメーター推定
import pymc as pm

# CompileError対策
# import pytensor
# pytensor.config.gcc__cxxflags += ' -fbracket-depth=384'

Nt = 50  # Nt >= 32だとCompileErrorになる
Nr = 5

p = None
log_lambda = None
sigma = None
N = None

with pm.Model() as model:
    # 事前分布
    p = pm.Uniform('p', 0, 1)
    log_lambda = [pm.Normal('loglambda[1]', 0, 100)]  # sigma=100は書籍だと1e-4だが、それだと結果がかなりずれる
    sigma = pm.Uniform('sigma', 0, 100)

    # システムモデル
    for t in range(1, Nt):
        log_lambda.append(pm.Normal(f'loglambda[{t+1}]', mu=log_lambda[t-1], sigma=sigma))

    # 観測モデル
    l = [pm.Deterministic(f'lambda[{t+1}]', log_lambda[t].exp()) for t in range(Nt)]
    N = [pm.Poisson(f'N[{t+1}]', mu=l[t]) for t in range(Nt)]
    Y = [pm.Binomial(f'Nobs[{t+1}]', n=N[t], p=p, observed=Nobs[t]) for t in range(Nt)]

# 初期値を与えないとSamplingErrorになる
initvals = {f'N[{t+1}]': 100 for t in range(Nt)}

with model:
    trace = pm.sample(10000, initvals=initvals)
●結果
CompileError: Compilation failed (return status=1):

/Users/ynomura/.pytensor/compiledir_macOS-14.5-arm64-arm-64bit-arm-3.10.14-64/tmp7dc45x_s/mod.cpp:26982:32: fatal error: bracket nesting level exceeded maximum of 256
        if (!PyErr_Occurred()) {
                               ^
/Users/ynomura/.pytensor/compiledir_macOS-14.5-arm64-arm-64bit-arm-3.10.14-64/tmp7dc45x_s/mod.cpp:26982:32: note: use -fbracket-depth=N to increase maximum nesting level

HINT: Use a linker other than the C linker to print the inputs' shapes and strides.
HINT: Re-running with most PyTensor optimizations disabled could provide a back-trace showing when this node was created. This can be done by setting the PyTensor flag 'optimizer=fast_compile'. If that does not work, PyTensor optimizations can be disabled with 'optimizer=None'.
HINT: Use the PyTensor flag `exception_verbosity=high` for a debug print-out and storage map footprint of this Apply node.

筆者の環境は、macOS 14.5 + python 3.10.14 + PyMC 5.15.0 + PyTensor 2.20.0 だった。PyMCはpipでインストールしたものである。
コードのコメントにも書いたが、色々試した結果、 Nt = 31 なら発生しないことがわかったので、Pythonのコードが間違っている訳ではなさそうだった。

Webで検索すると、このCompileErrorが発生したと書かれているページがいくつか見つかったが、具体的な解決方法が見つけられなかった。
PyMCのサイトにAnacondaやMiniforgeを使ってインストールすることを推奨すると書かれているので、Miniforgeを使ったインストールも試したが、結果は同じだった。その環境は python 3.12.3 + PyMC 5.15.1 + PyTensor 2.22.1だった。

上のエラーメッセージから、Cコンパイラの{}のネストが最大ネスト数の256を超えたというエラーが出ており、コンパイラの引数に-fbracket-depth=Nを加えると最大ネスト数が変えられると書いてある。色々調べた結果、次の2行(上のコードではコメントアウトしている)を加えると、このCompileErrorを回避できた。

import pytensor
pytensor.config.gcc__cxxflags += ' -fbracket-depth=384'
●結果
import matplotlib.pyplot as plt
import numpy as np

x = range(1, Nt+1)
plt.plot(x, N_true[:Nt], color='k', linewidth=0.5, label='true')
plt.scatter([[t+1]*5 for t in range(50)], Nobs, marker='.', alpha=0.5, label='observed')

lambda_mean = [trace['posterior'][f'lambda[{t+1}]'].mean() for t in range(Nt)]
plt.plot(x, lambda_mean, color='b', linewidth=2, label='posterior lambda')

# 95%ベイズ信用区間
lambda_025 = [np.percentile(trace['posterior'][f'lambda[{t+1}]'], 2.5)  for t in range(Nt)]
lambda_975 = [np.percentile(trace['posterior'][f'lambda[{t+1}]'], 97.5)  for t in range(Nt)]
plt.fill_between(x, lambda_025, lambda_975, color='cyan', alpha=0.15, label='95% credible interval')

plt.xlabel('time')
plt.title('Bird count')
plt.legend()
plt.show()

平日、子供を毎朝7:30までに起こす為に、「めざましテレビ」の7:30〜7:59の間に2回ある「めざましじゃんけん」をさせている。勝ったら20pt、引き分けなら10pt、負けたら5ptで、週内100ptでクリアである。
5日×2回/日=10回の点数の期待値は10*(20+10+5)/3 > 116なので、まあまあの確率でクリアできそうであるが、今週は負けまくって、月〜金で無事に10回やり切ったのに、10回やった中では初めてクリアできなかった。

そこで、10回でクリアできる確率はどれくらいだろうかと思って考え始めたら、場合の数を数え上げる以外の方法が思い付かなかった。

クリアできない方が少ないので、クリアできない方を数え上げる。

4勝以上だと全敗でもクリアなので、0
3勝だと7敗のみなので、10C3 = 120
2勝だと3分以下なので、10C2 * (8C3 + 8C2 + 8C1 + 8C0) = 4185
1勝だと6分以下=2敗以下以外なので、10C1 * (29 - 9C2 - 9C1 - 9C0) = 4660
0勝だと9分以下=0敗以外なので、210 - 1 = 1023
全ての場合の数は310 = 59049なので、クリアできない確率は (120 + 4185 + 4660 + 1023) / 59049 = 9988 / 59049 = 0.169 、クリアできる確率はその逆で0.831である。

こんな計算方法しか無かったっけ?

カルマンフィルターの例題として、何らかの物理的な物体があり、内部状態として位置xと速度vがあり、それぞれがノイズの影響を受け、観測できるのはxに観測誤差を加えたzのみとし、位置xを一定の目標値に制御したいとする。

次のコードのPhysicalModelを制御対象とする。xを取得するメソッドがあるが、制御する際に観測できるのはzだけであり、xの本当の値は観測できないとする。

import numpy as np
import matplotlib.pyplot as plt

class PhysicalModel(object):
    x_std = 1.0  # 位置xのノイズの標準偏差
    v_std = 0.5  # 速度vのノイズの標準偏差
    z_std = 2.0  # xの観測誤差の標準偏差

    def __init__(self, timesteps, init_x, init_v, random_seed=1):
        np.random.seed(random_seed)
        self.x = np.zeros(timesteps)  # 位置x(便宜上過去の値を保存)
        self.v = np.zeros(timesteps)  # 速度v
        self.a = np.zeros(timesteps)  # 加速度a
        self.z = np.zeros(timesteps)  # xの観測値z

        # t=0
        self.t = 0  # 時刻
        self.x[0] = init_x
        self.v[0] = init_v
        self.z[0] = self.x[0]

    # 時刻tを1つ進める
    def step(self):
        self.t += 1
        self.x[t] = self.x[t-1] + 0.99 * self.v[t-1] + 1/2 * self.a[t-1] + self.x_std * np.random.randn(1)[0]
        self.v[t] =               0.98 * self.v[t-1] +       self.a[t-1] + self.v_std * np.random.randn(1)[0]
        self.z[t] = self.x[t] + self.z_std * np.random.randn(1)[0]
        return self.z[t]  # 1つ進めた時刻の観測値を返す

    # 時刻tの加速度を入力する
    def set_accel(self, a):
        self.a[t] = min(10, max(-10, a))  # 実際には -10 <= a <= 10 に制限されるとする

    # t=offset以降のxの目標値からの平均二乗誤差
    def MSE(self, mean, offset=0):
        return sum((self.x[offset:] - x_target) ** 2) / self.x[offset:].shape[0]

    # グラフ表示用
    def get_x_history(self):
        return self.x[:t+1]
    
    def get_v_history(self):
        return self.v[:t+1]
    
    def get_a_history(self):
        return self.a[:t+1]
    
    def get_z_history(self):
        return self.z[:t+1]

まず、カルマンフィルターを用いず、観測値zを使って制御してみる。
xの初期値は20、目標値は5とする。
制御入力は、ノイズやvの減衰を無視すると
xt+1 = xt + vt + (1/2) at
なので
at = 2 (xt+1 - (xt + vt))
であり、次の時刻(t+1)で目標値に到達させるなら
at = 2 (x_target - (xt + vt))
だが、観測値zから推測したx,vはノイズを含むので、その影響を小さくする為、入力するaは上の式の半分とする。(実際、上の式の2を色々変えてみても1.0前後くらいが良さそうだった。時刻(t+2)で目標値に到達させるべく、
xt+2 = xt + 2vt + 2 atから
at = (1/2) (x_target - (xt + 2vt))
とするのも有効そうだったが、今回は単純な例で試す。)

●コード
timesteps = 50
init_x = 20
x_target = 5
model = PhysicalModel(timesteps, init_x=init_x, init_v=0)

estimated_v = [0]  # vの推定値、グラフ表示用に保存する
z_prev = init_x  # t=0の観測値
a = 0  # t=0の加速度

for t in range(1, timesteps):
    z = model.step()

    # x,vの推定
    # z = z_prev + v_prev + a/2
    # v = v_prev + a
    # -> v = z - z_prev - a/2 + a
    x = z
    v = z - z_prev + a/2
    estimated_v.append(v)

    # 制御入力
    a = 1.0 * (x_target - (x + v))
    model.set_accel(a)

    z_prev = z

print("MSE =", model.MSE(x_target, offset=10))

fig = plt.figure(figsize=(10,4))
ax1 = fig.gca()
ax2 = ax1.twinx()
ax1.plot(model.get_z_history(), label='observed')
ax1.plot(model.get_x_history(), color='r', linewidth=2, label='true_x')
ax2.plot(model.get_v_history(), 'y', alpha=0.5, label='true_v')
ax2.plot(estimated_v, 'y', linestyle='--', label='estimated_v')
ax1.grid(axis='y')
ax1.set_ylim((-2, 22))
ax1.set_xlabel('time')
ax1.set_ylabel('x')
ax2.set_ylabel('v')
ax1.legend(loc='upper right', bbox_to_anchor=(1, 0.7))
ax2.legend(loc='upper right')
ax1.set_title("Controlling x to 5")
plt.show()

# 別の制御方式との比較用
true_x_1 = model.get_x_history()
●実行結果
MSE = 5.654622082289254

赤線が制御対象のxである。一応、xが5辺りに制御されている。

次に、カルマンフィルターを用いてx,vを推定しながら制御してみる。
制御対象を状態空間モデルで表すと、このようになる。
\left\{
\begin{array}{rcll}
X_{t+1} & = & F X_t + u_t + w_t & (w_t \sim N(0, Q))\\
Z_t & = & H X_t + V_t & (V_t \sim N(0, R))\\
\end{array}
\right.
上の式が状態方程式、下の式が観測方程式と呼ばれるものである。
ここでのXは位置xと速度vを含む状態ベクトル、
X = \left( \begin{array}{c} x\\ v\\ \end{array} \right)
であり、時刻tからt+1の間にシステム行列Fによる状態遷移と、制御入力uと、平均0、分散Qの多変量正規分布に従うノイズwが加わるとする。
Zは状態Xの観測値で、Xが観測行列Hにより変換され、平均0、分散Rの多変量正規分布に従うノイズVが加わるとする。(このVは小文字のvで表記されることが多いが、今回の例では速度のvと紛らわしいので、ここでは大文字で記す)
制御入力uは加速度aにより、vの減衰を無視すると、ノイズを除いた部分は
xt+1 = xt + vt + (1/2) at
vt+1 = vt + at
となるので、
F = \left( \begin{array}{cc} 1 & 1\\ 0 & 1\\ \end{array} \right)
u = \left( \begin{array}{c} \frac{1}{2}a\\ a\\ \end{array} \right)
となる。
状態はxしか観測できないので、観測行列Hは[1 0]となる。
Q,Rは最小二乗法で推定する方法があるらしいが、今回は経験的にこれくらいだと知っているものとして、それぞれ[[1, 0], [0, 0.1]]と[5]とする。

次のコードはpykalmanを用いた例である。

●コード
# pykalmanのカルマンフィルターを使う例
from pykalman import KalmanFilter

model = PhysicalModel(timesteps, init_x=init_x, init_v=0)

kf = KalmanFilter(
    transition_matrices=[[1, 1], [0, 1]],
    observation_matrices=[1, 0],
    transition_covariance=[[1, 0], [0, 0.1]],
    observation_covariance=[5],
#     initial_state_mean=[init_x, 0],  # 今回の使い方では関係無かった
#     initial_state_covariance=[[0, 0], [0, 0]]  # 何にしても変わらない
)

filtered_state = np.zeros((timesteps, 2))  # x,vの推定値、グラフ表示用に保存する
filtered_state[0] = [init_x, 0]
filtered_state_cov = np.zeros((timesteps, 2, 2))  # x,vの推定値の分散

a = 0  # t=0の加速度

for t in range(1, timesteps):
    z = model.step()

    # x,vの推定
    filtered_state[t], filtered_state_cov[t] = kf.filter_update(
        filtered_state[t-1],
        filtered_state_cov[t-1],
        z,
        transition_offset=np.array([1/2 * a, a])
    )
    x, v = filtered_state[t]

    # 制御入力
    a = 1.0 * (x_target - (x + v))
    model.set_accel(a)
    
print("MSE =", model.MSE(x_target, offset=10))

# 安定確認
plt.figure(figsize=(8,2))
plt.plot(filtered_state_cov[:,0,0], label='filtered_state_cov_x')
plt.plot(filtered_state_cov[:,1,1], label='filtered_state_cov_v')
plt.legend()
plt.show()

fig = plt.figure(figsize=(10,4))
ax1 = fig.gca()
ax2 = ax1.twinx()
ax1.plot(model.get_z_history(), label='observed')
ax1.plot(filtered_state[:,0], linestyle='--',  label='filtered_x')
ax1.plot(true_x_1, color='r', alpha=0.3, label='true_x(prev)')
ax1.plot(model.get_x_history(), color='r', linewidth=2, label='true_x')
ax2.plot(model.get_v_history(), 'y', alpha=0.5, label='true_v')
ax2.plot(filtered_state[:,1], 'y', linestyle='--', label='filtered_v')
ax1.grid(axis='y')
ax1.set_ylim((-2, 22))
ax1.set_xlabel('time')
ax1.set_ylabel('x')
ax2.set_ylabel('v')
ax1.legend(loc='upper right', bbox_to_anchor=(1, 0.7))
ax2.legend(loc='upper right')
ax1.set_title("Controlling x to 5")
plt.show()
●実行結果
MSE = 3.238811619518345

濃い赤線が制御対象のx、青線が観測値、オレンジの点線がxの推測値である。全体的には推定値が観測値より本当のxに近いように見える。薄い赤線が上のカルマンフィルターを使わない例のxであり、何となく、今回の方がよりx=5の近くに制御されているように見える。また、黄色の実線が本当のv、黄色の点線が推測したvであり、観測していないvがそれにりに正しく推測され、本当のvも0付近で安定している。
出力のt≧10のMSEで見ても、誤差が小さくなっている。