まとめ rupturesライブラリを使って時系列の変化点を自動検出する。 複数の検出アルゴリズム(Pelt・Binseg・Dynpなど)を比較し、精度と速度のトレードオフを確認する。 検出結果を可視化して、構造変化のタイミングを特定する。 Truong, Charles, Laurent Oudre, and Nicolas Vayatis. “ruptures: change point detection in Python.” arXiv preprint arXiv:1801.00826 (2018). (arXiv )
ruptures
# 公式ドキュメント: ruptures documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# seaborn内部のpandas使用箇所でFutureWarningが出ていたので表示しないようにする
import warnings
from datetime import datetime , timedelta
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import ruptures as rpt
import seaborn as sns
warnings . simplefilter ( action = "ignore" , category = FutureWarning )
# generate signal
n_samples , dim , sigma = 1000 , 3 , 4
n_bkps = 4 # number of breakpoints
signal , bkps = rpt . pw_constant ( n_samples , dim , n_bkps , noise_std = sigma )
# detection
algo = rpt . Pelt ( model = "rbf" ) . fit ( signal )
result = algo . predict ( pen = 10 )
# display
rpt . display ( signal , bkps , result )
plt . show ()
さまざまなデータでの比較
# 変化点検出+可視化のコード
# 1
2
3
4
5
6
7
8
9
def vizualize_change_point (
signals , true_change_points = [], method = "rbf" , pen = 0.5 , min_size = 3
):
algo = rpt . Pelt ( model = method , min_size = min_size ) . fit ( signals )
result = algo . predict ( pen = pen )
rpt . display ( signals , true_change_points , result )
plt . suptitle ( f "method= { method } , penalty= { pen } , " , fontsize = 20 )
plt . tight_layout ()
plt . show ()
サンプルデータの作成
# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
X = []
dates = [ datetime ( 2021 , 1 , 1 ) + timedelta ( days = i ) for i in range ( 1000 )]
for i , di in enumerate ( dates ):
value = i // 100
X . append (
[
np . sin ( di . weekday () * 0.001 ) * 2
+ np . random . rand () * 0.1
+ value * np . cos ( i ),
np . sin ( di . day * 0.01 ) * 3 + np . random . rand () * 0.1 + value * np . cos ( i ),
]
)
data = pd . DataFrame ( X )
data . columns = [ f "f { c } " for c in data . columns ]
data . index = dates
plt . figure ( figsize = ( 12 , 3 ))
sns . lineplot ( data = data )
<Axes: >
ペナルティの違いによる結果の違い
# 1
2
for pen in np . linspace ( 0.1 , 1 , 5 ):
vizualize_change_point ( data . values , method = "rbf" , pen = pen )
動的計画法を使った変化点の検出
# 動的計画法を使う場合、変化点の数(n_bkps)をあらかじめ指定する必要がある。
1
2
3
4
5
6
7
8
9
10
11
12
13
def vizualize_change_point_dynp (
signals , true_change_points = [], method = "rbf" , min_size = 3 , n_bkps = 3
):
algo = rpt . Dynp ( model = method , min_size = min_size ) . fit ( signals )
result = algo . predict ( n_bkps = n_bkps )
rpt . display ( signals , true_change_points , result )
plt . suptitle ( f "method= { method } , penalty= { pen } , " , fontsize = 20 )
plt . tight_layout ()
plt . show ()
vizualize_change_point_dynp (
data . values , true_change_points = [], method = "rbf" , min_size = 3 , n_bkps = 8
)
人工データでの実験2
# 移動平均は大体いつもおなじだけれど、分散がある時点で変化するデータです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
X2 = []
dates2 = [ datetime ( 2021 , 1 , 1 ) + timedelta ( days = i ) for i in range ( 1000 )]
for i , di in enumerate ( dates2 ):
if i == 550 :
X2 . append (
[
np . sin ( di . weekday () * 0.001 ) * 2 + np . random . rand () * 0.1 ,
np . sin ( di . day * 0.01 ) * 3 + np . random . rand () * 0.1 ,
]
)
elif 500 < i < 600 :
X2 . append (
[
np . sin ( di . weekday () * 0.001 ) * 2
+ np . random . rand () * 0.1
+ np . random . randint ( - 2 , 2 ) * 0.1 ,
np . sin ( di . day * 0.01 ) * 3 + np . random . rand () * 0.1 ,
]
)
else :
X2 . append (
[
np . sin ( di . weekday () * 0.001 ) * 2 + np . random . rand () * 0.1 ,
np . sin ( di . day * 0.01 ) * 3 + np . random . rand () * 0.1 ,
]
)
data2 = pd . DataFrame ( X2 )
data2 . columns = [ f "f { c } " for c in data2 . columns ]
data2 . index = dates2
plt . figure ( figsize = ( 12 , 3 ))
sns . lineplot ( data = data2 )
plt . ylim ( 0 , 1 )
(0.0, 1.0)
1
2
3
# 注意点:値の絶対値や損失関数によって指定すべきpenaltyの値は異なる
vizualize_change_point ( data2 . values , method = "rbf" , pen = 1.5 , min_size = 50 )
vizualize_change_point ( data2 . values , method = "l2" , pen = 1.5 , min_size = 50 )
1
2
3
data2 [ "var" ] = data2 [ "f0" ] . rolling ( window = 10 ) . var () . fillna ( 0 )
vizualize_change_point ( data2 . values , method = "rbf" , pen = 1.5 , min_size = 50 )
vizualize_change_point ( data2 . values , method = "l2" , pen = 0.1 , min_size = 100 )
1
vizualize_change_point_dynp ( data2 . values , min_size = 90 , n_bkps = 2 )
実データでの実験
# Yahoo financeのDDOGのデータ を使って変化点を見てみようと思います。
1
2
3
4
5
6
7
8
9
10
ddog = pd . read_csv ( "./DDOG.csv" )
ddog . index = ddog [ "Date" ]
ddog . drop ([ "Date" ], axis = 1 , inplace = True )
ddog [ "Volume" ] = 100 * ddog [ "Volume" ] / ddog [ "Volume" ] . median ()
ddog = ddog [[ "Open" , "Close" , "Volume" ]]
plt . figure ( figsize = ( 12 , 4 ))
sns . lineplot ( data = ddog )
plt . xticks ( rotation = 90 )
plt . show ()
1
vizualize_change_point ( ddog . values , method = "rbf" , pen = 0.5 , min_size = 40 )