時間序列數據隨處可見:網站每分鐘的訪問量、傳感器讀數、股票價格、人流計數、服務器 CPU 使用率,都是典型場景。
多數時候這類數據遵循某種規律。異常檢測的目標就是找到規律被打破的那些時刻。
![]()
什么是時間序列數據中的異常?
異常指的是與正常行為產生明顯偏離的數據點或數據序列。舉幾個例子:凌晨 3 點網站流量突然飆升;傳感器因設備故障出現讀數驟降;已關門的商店內人流量異常激增。
為什么時間序列異常檢測很困難
時間序列數據天然包含趨勢(緩慢的上升或下降)、季節性(日級或周級的周期模式)以及噪聲(隨機波動)。這三個成分疊加在一起,讓"正常"本身就在不斷變化。
一個值的高低本身不構成異常,它是否異常取決于出現的時間點。中午有 1000 個訪客是正常的,凌晨 3 點有 1000 個訪客就不正常了。
![]()
學習"正常"的樣子
在檢測異常之前,系統需要先建立對正常行為的認知——預期的數值范圍、長期趨勢走向以及重復出現的季節性模式。
不同的數據特征對應不同的檢測策略。
![]()
方法 1:統計閾值法(Z-Score)
最簡單的做法,假設數據服從正態分布。
import numpy as np
def z_score_anomaly(data, threshold=3):
mean = np.mean(data)
std = np.std(data)
z_scores = (data - mean) / std
anomalies = np.abs(z_scores) > threshold
return anomalies
適用場景:沒有趨勢的平穩數據。
方法 2:移動平均 + 殘差
適用于帶有平滑趨勢的數據。
import pandas as pd
def moving_average_anomaly(series, window=10, threshold=2):
rolling_mean = series.rolling(window).mean()
residual = series - rolling_mean
std = residual.std()
return abs(residual) > threshold * std
它的優勢在于,每個數據點比較的是自身的局部上下文而非全局均值。
方法 3:季節性分解
周期性模式明顯的數據最適合用這個方法。
from statsmodels.tsa.seasonal import seasonal_decompose
def seasonal_anomaly(series, period=24):
result = seasonal_decompose(series, model='additive', period=period)
residual = result.resid
threshold = 3 * residual.std()
return abs(residual) > threshold
季節性分解把原始序列拆成趨勢、季節性和殘差三個分量,異常通常藏在殘差里。
方法 4:機器學習(Isolation Forest)
不依賴任何分布假設,直接隔離罕見模式。
from sklearn.ensemble import IsolationForest
def isolation_forest_anomaly(data, contamination=0.02):
model = IsolationForest(contamination=contamination)
preds = model.fit_predict(data.reshape(-1, 1))
return preds == -1
適用場景:模式未知、數據不規則,或者多變量時間序列。
方法 5:深度學習(自編碼器)
自編碼器學習重建正常序列,重建誤差高的部分即為異常。
import numpy as np
def reconstruction_error(original, reconstructed):
return np.mean((original - reconstructed) ** 2)
適合處理模式復雜、維度較多、存在長期依賴關系的時間序列。
示例:人流量分析
import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import seasonal_decompose
# 生成商店人流量數據(1 周,每小時)
hours = pd.date_range('2024-01-01', periods=24*7, freq='H')
hour_of_day = hours.hour
# 正常:上午 9 點到晚上 9 點繁忙,夜間安靜
base = 100 + 80 * ((hour_of_day >= 9) & (hour_of_day <= 21))
traffic = pd.Series(base + np.random.normal(0, 10, len(hours)), index=hours)
# 注入異常
traffic.iloc[15] = 200 # 凌晨 3 點飆升(攝像頭問題)
traffic.iloc[75] = 5 # 營業時間下降(故障)
# 檢測
result = seasonal_decompose(traffic, model='additive', period=24)
residual = result.resid
anomalies = abs(residual) > 3 * residual.std()
print(f"Detected {anomalies.sum()} anomalies")
減少誤報
誤報是異常檢測在生產環境中最常見的痛點。三種思路可以緩解。
調整靈敏度:控制標記比例:
model = IsolationForest(contamination=0.02) # 僅標記 2%
要求持續性:只有連續多個點都表現異常時才觸發告警:
# 僅當異常持續 3 個及以上連續點時才標記
consecutive_count >= 3
集成投票:多種方法同時判斷,取多數一致的結果:
# 投票:如果 2 個及以上方法一致則標記
votes = method1 + method2 + method3
anomalies = votes >= 2
總結
異常檢測的核心不在于找出"奇怪的數字",而在于理解每個時間點上什么才算正常。先對數據做可視化探索,從移動平均或季節性分解入手;如果數據模式復雜,引入 Isolation Forest;生產系統中建議組合多種方法以降低誤判。
異常檢測要做的,是識別那些偏離了時間、趨勢和行為規律的數據點。
https://avoid.overfit.cn/post/a6de4ac94dd64768a768593e39b6c7cb
by Bhargavi Guddati
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.