전체 데이터 전처리 파일 저장
import os, glob, pandas as pd
raw_dir = './data/raw_data/train'
prep_dir = './data/preprocessed/train'
os.makedirs(prep_dir, exist_ok=True)
for src in glob.glob(os.path.join(raw_dir, '*_chg.csv')):
base = os.path.basename(src)
dst = os.path.join(prep_dir, base)
if os.path.exists(dst):
continue # 이미 만들어둔 건 건너뜀
df_raw = pd.read_csv(src)
### --- 여기서 removeConstant / handleMissingValue 등
### 이전에 했던 전처리 함수 호출 ---------------
df1 = removeConstant(df_raw, 1)
df1 = handleMissingValue(df1)
# voltage + temperature 컬럼만 추출
cols = df1.columns[18:226] # 기존 인덱스 규칙 그대로
df_train = df1[cols]
df_train.to_csv(dst, index=False)
print('✅ saved:', dst)
# =========================================================
# MTadGAN 학습 루프를 함수화
# =========================================================
def train_mtadgan(X_train,
epochs=30,
batch_size=64,
n_critics=5,
latent_dim=20):
"""
• X_train : (num_win, WIN_SIZE, FEAT_DIM) 형태의 학습 윈도우
• epochs, batch_size, n_critics : 하이퍼파라미터
• latent_dim : 기존 코드에서 20으로 사용
학습 완료 후 내부에서 가중치 .save_weights() 3종 저장
"""
# -- ❶ 모델·옵티마이저 꺼내오기 (이미 빌드돼 있다고 가정) --
global encoder, generator, critic_x, critic_z
global critic_x_model, critic_z_model, encoder_generator_model
global optimizer_cx, optimizer_cz, optimizer_gen
fake = np.ones((batch_size, 1), dtype=np.float32)
valid = -np.ones((batch_size, 1), dtype=np.float32)
delta = np.ones((batch_size, 1), dtype=np.float32)
for epoch in range(1, epochs + 1):
# ❷ 셔플 & 미니배치
X_shuffle = np.copy(X_train)
np.random.shuffle(X_shuffle)
g_losses, cx_losses, cz_losses = [], [], []
mb_size = batch_size * n_critics
n_mb = X_shuffle.shape[0] // mb_size
for i in range(n_mb):
minibatch = X_shuffle[i*mb_size:(i+1)*mb_size]
# --- critic 학습 ---
critic_x.trainable = critic_z.trainable = True
generator.trainable = encoder.trainable = False
for j in range(n_critics):
x = minibatch[j*batch_size:(j+1)*batch_size]
z = np.random.normal(size=(batch_size, latent_dim, 1))
cx_losses.append(critic_x_train_on_batch(
x, z, valid, fake, delta, optimizer_cx))
cz_losses.append(critic_z_train_on_batch(
x, z, valid, fake, delta, optimizer_cz))
# --- encoder·generator 학습 ---
critic_x.trainable = critic_z.trainable = False
generator.trainable = encoder.trainable = True
g_losses.append(enc_gen_train_on_batch(
x, z, valid, optimizer_gen))
# epoch 평균 출력
print(f'Epoch {epoch}/{epochs} | '
f'Dx {np.mean(cx_losses):.3f} | '
f'Dz {np.mean(cz_losses):.3f} | '
f'G {np.mean(g_losses):.3f}')
# ❸ 학습 완료 → 가중치 저장
ckpt_dir = os.path.join(os.getcwd(), 'checkpoints')
os.makedirs(ckpt_dir, exist_ok=True)
critic_x_model.save_weights(os.path.join(ckpt_dir, 'critic_x_model.weights.h5'))
critic_z_model.save_weights(os.path.join(ckpt_dir, 'critic_z_model.weights.h5'))
encoder_generator_model.save_weights(
os.path.join(ckpt_dir, 'encoder_generator_model.weights.h5'))
import os, glob, math, numpy as np, pandas as pd
from tqdm import tqdm
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
# -------------------------------
# 0) 전역 하이퍼파라미터
# -------------------------------
WIN_SIZE = 10
FEAT_DIM = 3 # PCA 차원
EPOCHS = 30
BATCH_SIZE = 64
N_CRITICS = 5
# -------------------------------
# 1) 학습 데이터 로드 & 전처리
# -------------------------------
train_dir = './data/preprocessed/train' # ✅
train_files = sorted(glob.glob(os.path.join(train_dir, '*_chg.csv')))
print(f'✔️ 학습 파일 {len(train_files)}개 로딩')
df_train = pd.concat([pd.read_csv(f) for f in train_files], ignore_index=True)
# diff/smooth → PCA fit → scaler fit
df_proc = diff_smooth_df(df_train, lags_n=0, diffs_n=0, smooth_n=0)
pca = PCA(n_components=FEAT_DIM).fit(df_proc)
X_pca = pca.transform(df_proc)
scaler = MinMaxScaler(feature_range=(-1, 1)).fit(
SimpleImputer().fit_transform(X_pca))
X_norm = scaler.transform(SimpleImputer().fit_transform(X_pca))
X_win, _, _, _ = rolling_window_sequences(
X_norm, np.arange(len(X_norm)),
window_size=WIN_SIZE, target_size=1,
step_size=1, target_column=0)
X_win = X_win.reshape((-1, WIN_SIZE, FEAT_DIM))
print('학습 윈도우 shape:', X_win.shape)
# -------------------------------
# 2) MTadGAN 학습
# -------------------------------
train_mtadgan(X_win, epochs=EPOCHS,
batch_size=BATCH_SIZE, n_critics=N_CRITICS)
# (↑ 위 함수는 기존 training loop를 함수화한 것이라고 가정)
# 학습 후 모델 가중치 저장됨
# -------------------------------
# 3) 테스트 루프
# -------------------------------
test_dir = './data/preprocessed/test'
signal_files = sorted(glob.glob(os.path.join(test_dir, 'Test*_dchg.csv')))
label_files = {os.path.basename(f).replace('.csv',''): f
for f in glob.glob(os.path.join(test_dir, 'Test*_Label.csv'))}
records = [] # 결과 테이블을 여기에 저장
for sig_path in tqdm(signal_files, desc='테스트 파일별 평가'):
base = os.path.basename(sig_path).replace('.csv','')
lbl_path = label_files.get(base)
if lbl_path is None:
print(f'❌ {base} 라벨 파일 없음 → 스킵')
continue
# (1) 테스트 전처리
df_sig = pd.read_csv(sig_path)
df_proc = diff_smooth_df(df_sig, 0, 0, 0)
X_pca = pca.transform(df_proc) # 학습한 PCA 재사용
X_norm = scaler.transform(SimpleImputer().fit_transform(X_pca))
X_win, _, X_idx, _ = rolling_window_sequences(
X_norm, np.arange(len(X_norm)),
WIN_SIZE, 1, 1, 0)
# (2) 예측
y_hat, critic = predict(X_win)
# (3) 이상 점수
anom = Anomaly()
scores, true_idx, _, _ = anom.score_anomalies(
X_win, y_hat, critic, X_idx, comb="mult")
# (4) 간단 threshold (μ+3σ)
thr = np.mean(scores) + 3*np.std(scores)
pred_bin = (scores > thr).astype(int)
# (5) 정답 라벨 로드 & 메트릭
gt = pd.read_csv(lbl_path)['label'].values[:len(pred_bin)]
tp = np.sum((pred_bin==1)&(gt==1))
fp = np.sum((pred_bin==1)&(gt==0))
fn = np.sum((pred_bin==0)&(gt==1))
tn = np.sum((pred_bin==0)&(gt==0))
prec = tp/(tp+fp+1e-8); rec = tp/(tp+fn+1e-8)
f1 = 2*prec*rec/(prec+rec+1e-8)
acc = (tp+tn)/len(gt)
records.append({'file':base,
'Accuracy':round(acc,4),
'Precision':round(prec,4),
'Recall':round(rec,4),
'F1':round(f1,4),
'thr':round(thr,4)})
# -------------------------------
# 4) 결과 출력
# -------------------------------
df_results = pd.DataFrame(records).sort_values('file').reset_index(drop=True)
print('\\n📊 전체 테스트 결과')
print(df_results.to_string(index=False))
단계 | 내용 |
---|---|
① 학습 데이터 병합 | 1000~1050_chg.csv 모두 읽어 하나의 DataFrame으로 통합 |
② 전처리 | diff_smooth_df → PCA.fit → scaling.fit → windowing |
③ MTadGAN 학습 | 30 epoch, critic 5:1 비율, weights 저장 |
④ 테스트 루프 | 모든 Test CSV에 대해 • PCA.transform & scaler.transform 재사용 • predict → score_anomalies → thresholding |
⑤ 성능 테이블 | Accuracy / Precision / Recall / F1을 df_results 로 출력 |
# ---------- 하이퍼파라미터 ----------
WIN_SIZE, FEAT_DIM = 10, 3 # window 길이 / PCA 차원
EPOCHS, BATCH_SIZE, N_CRITICS = 10, 32, 2
class RandomWeightedAverage(Layer):
def __init__(self, batch_size):
"""initialize Layer
Args:
batch_size: 64
"""
super().__init__()
self.batch_size = batch_size
def call(self, inputs, **kwargs):
"""calculate random weighted average
Args:
inputs[0] x: original input
inputs[1] x_: predicted input
"""
batch = tf.shape(inputs[0])[0]
alpha = K.random_uniform((self.batch_size, 1, 1))
return (alpha * inputs[0]) + ((1-alpha) * inputs[1])
print("RandomWeightedAverage() Class defined, Done! ")
지표 | 정상(0) | 이상(1) | 전체 |
---|---|---|---|
정확도 | – | – | 1.00 (99.9%) |
정밀도 | 1.00 | 0.00 | – |
재현율 | 1.00 | 0.00 | – |
F1 점수 | 1.00 | 0.00 | – |
지원 수 | 325,588 | 236 | 325,824 |
⇒ 따라서 기존에 있는 걸 사용하는게 적합한데,….
이 문제를 해결할 것인가 말것인가를 논의해봐야함.
djEJg