DDTW(Derivative Dynamic Time Warping)

Timeseries

DDTW(Derivative Dynamic Time Warping)

作成日: 最終更新: 読了時間: 4 分
まとめ
  • Derivative Dynamic Time Warping(DDTW)は DTW の距離計算を「値の差」ではなく「一次微分(傾き)の差」で行い、波形の形状に敏感な整列を実現する。
  • 各時点の中央差分で傾きを近似し、通常の DTW と同じ動的計画法で累積コストとワーピングパスを求める。
  • Python で DDTW を実装し、単なるオフセット差よりも形状差に重みが乗ることを比較例で確認する。

Keogh, Eamonn J., and Michael J. Pazzani. “Derivative Dynamic Time Warping.” Proceedings of the 2001 SIAM International Conference on Data Mining. (PDF)


DDTW とは #

Dynamic Time Warping(DTW)は 2 本の時系列を整列させ、対応付けた点同士の距離を最小化します。ただし値の差をそのまま距離にすると、縦方向のオフセットが大きいケースで「形は近いのに距離が離れてしまう」ことがあります。Derivative Dynamic Time Warping(DDTW)は距離の定義を「傾きの差」に置き換えることで、形状の近さに注目します。

メリット:

  • ベースラインが異なっても、上り坂/下り坂のパターンが似ていれば距離が小さくなる。
  • ノイズが少なければピークや谷といった形状特徴を強調できる。

もちろん絶対値の差が重要な場面(異常検知など)では DTW が適しているので、目的に合わせて使い分けます。


アルゴリズムのポイント #

DDTW は局所距離を「傾きの差」で定義します。

1. 傾き(一次微分)の近似 #

系列 \(X\) の \(i\) 番目の傾きは中央差分で以下のように近似します。

$$ D_X(i) = \frac{(x_i - x_{i-1}) + (x_{i+1} - x_{i-1})/2}{2} $$

端点は近傍の値を利用して同様に計算します。局所距離はこの傾きの差の絶対値(または二乗誤差)とします。

2. 累積コストの計算 #

傾きから作成した距離行列 \(D\) を用い、通常の DTW と同じ遷移式で累積コスト \(C\) を計算します。

$$ C_{i,j} = D_{i,j} + \min{C_{i-1,j},\ C_{i,j-1},\ C_{i-1,j-1}} $$

3. ワーピングパス #

累積コスト行列の右下から逆向きにたどり、最小コスト経路(ワーピングパス)を復元します。距離評価が傾きを基準にしているため、値のオフセットより形状の違いが強調されます。

計算量は DTW と同じ \(O(nm)\)。長い系列では Sakoe–Chiba バンドなどの制約を加えると高速化できます。


Python で DDTW を実装する #

import numpy as np

def derivative_series(x: np.ndarray) -> np.ndarray:
    """中央差分で一次微分を近似する。"""
    x = np.asarray(x, dtype=float)
    if x.size < 3:
        raise ValueError("系列の長さは 3 以上にしてください。")

    d = np.empty_like(x)
    d[0] = ((x[1] - x[0]) + (x[2] - x[0]) / 2.0) / 2.0
    d[-1] = ((x[-1] - x[-2]) + (x[-1] - x[-3]) / 2.0) / 2.0
    for i in range(1, len(x) - 1):
        d[i] = ((x[i] - x[i - 1]) + (x[i + 1] - x[i - 1]) / 2.0) / 2.0
    return d

def ddtw_distance(
    x: np.ndarray,
    y: np.ndarray,
    window: int | None = None,
) -> tuple[float, list[tuple[int, int]]]:
    dx, dy = derivative_series(x), derivative_series(y)
    n, m = len(dx), len(dy)

    if window is None:
        window = max(n, m)
    window = max(window, abs(n - m))

    cost = np.full((n + 1, m + 1), np.inf)
    cost[0, 0] = 0.0

    for i in range(1, n + 1):
        for j in range(max(1, i - window), min(m + 1, i + window + 1)):
            dist = abs(dx[i - 1] - dy[j - 1])
            cost[i, j] = dist + min(cost[i - 1, j], cost[i, j - 1], cost[i - 1, j - 1])

    i, j = n, m
    path: list[tuple[int, int]] = []
    while i > 0 and j > 0:
        path.append((i - 1, j - 1))
        step = np.argmin((cost[i - 1, j], cost[i, j - 1], cost[i - 1, j - 1]))
        if step == 0:
            i -= 1
        elif step == 1:
            j -= 1
        else:
            i -= 1
            j -= 1
    path.reverse()
    return cost[n, m], path

DTW と DDTW を比較する #

python --version  # 例: Python 3.13.0
pip install matplotlib
import matplotlib.pyplot as plt

np.random.seed(0)
t = np.linspace(0, 4 * np.pi, 120)

series_a = np.sin(t)
series_b = np.sin(t) + 0.6
series_c = np.sin(1.2 * t + 0.3)

plt.figure(figsize=(10, 3))
plt.plot(t, series_a, label="Series A", color="black")
plt.plot(t, series_b, label="Series B (offset)", color="tab:green")
plt.plot(t, series_c, label="Series C (shape change)", color="tab:orange")
plt.legend()
plt.title("比較する 3 つの時系列")
plt.tight_layout()
plt.show()

DTW と DDTW を比較する figure

def dtw_distance(x: np.ndarray, y: np.ndarray) -> float:
    n, m = len(x), len(y)
    cost = np.full((n + 1, m + 1), np.inf)
    cost[0, 0] = 0.0
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            dist = abs(x[i - 1] - y[j - 1])
            cost[i, j] = dist + min(cost[i - 1, j], cost[i, j - 1], cost[i - 1, j - 1])
    return cost[n, m]

dtw_ab = dtw_distance(series_a, series_b)
dtw_ac = dtw_distance(series_a, series_c)
ddtw_ab, _ = ddtw_distance(series_a, series_b)
ddtw_ac, _ = ddtw_distance(series_a, series_c)

print(f"DTW (A, B):  {dtw_ab:.3f}")
print(f"DTW (A, C):  {dtw_ac:.3f}")
print(f"DDTW(A, B):  {ddtw_ab:.3f}")
print(f"DDTW(A, C):  {ddtw_ac:.3f}")
DTW (A, B):  72.248
DTW (A, C):  51.158
DDTW(A, B):   0.002
DDTW(A, C):  13.632

DTW では振幅が異なる Series B の距離が大きく、形状が異なる Series C の距離の方が小さくなっています。DDTW では傾きが似ている Series B との距離がほぼゼロに近づき、形状の異なる Series C の距離が大きくなる点が大きな違いです。


ワーピングパスの可視化 #

def ddtw_cost_matrix(x: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, list[tuple[int, int]]]:
    dx, dy = derivative_series(x), derivative_series(y)
    n, m = len(dx), len(dy)
    cost = np.full((n + 1, m + 1), np.inf)
    cost[0, 0] = 0.0

    for i in range(1, n + 1):
        for j in range(1, m + 1):
            dist = abs(dx[i - 1] - dy[j - 1])
            cost[i, j] = dist + min(cost[i - 1, j], cost[i, j - 1], cost[i - 1, j - 1])

    i, j = n, m
    path: list[tuple[int, int]] = []
    while i > 0 and j > 0:
        path.append((i - 1, j - 1))
        step = np.argmin((cost[i - 1, j], cost[i, j - 1], cost[i - 1, j - 1]))
        if step == 0:
            i -= 1
        elif step == 1:
            j -= 1
        else:
            i -= 1
            j -= 1
    path.reverse()
    return cost[1:, 1:], path

cost_ac, path_ac = ddtw_cost_matrix(series_a, series_c)

plt.figure(figsize=(5, 4))
plt.imshow(cost_ac, origin="lower", cmap="viridis")
path_arr = np.array(path_ac)
plt.plot(path_arr[:, 1], path_arr[:, 0], color="white", linewidth=1.5)
plt.title("DDTW cumulative cost (Series A vs Series C)")
plt.xlabel("Series index j")
plt.ylabel("Series index i")
plt.colorbar(shrink=0.8, label="Cumulative cost")
plt.tight_layout()
plt.show()

ワーピングパスの可視化 figure

Series A と Series C の形状が異なるため、パスが水平・垂直方向に大きく移動しながら対応づけていることが分かります。これは形状の違いを埋め合わせようとする DDTW の挙動を示しています。


まとめ #

  • DDTW は傾き(微分)に基づいて距離を測るため、時系列の形状の違いを捉えやすい。
  • 実装は DTW とほぼ同じで、局所距離を傾きの差に置き換えるだけでよい。
  • オフセットに強く形状差に敏感なので、タスクに応じて DTW と使い分けよう。