# =========================================================
# Recod.ai/LUC - Scientific Image Forgery Detection
# Deep-dive EDA (robust, English plots, no seaborn dependency)
# - Paths auto-detect on Kaggle
# - Safe image/mask IO (.png/.npy), union of multi-masks
# - Component-level analytics (area/box/aspect/centroid/border)
# - Global forgery heatmap (where forgeries tend to appear)
# - Mask-vs-background color stats
# - Resolution/size correlations & numeric corr matrix
# - Optional texture: entropy & autocorr side-peak (sampled)
# =========================================================
# ----------------------------
# 0) Imports & config
# ----------------------------
import os, re, math, random, warnings
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (9, 6)
plt.rcParams["axes.grid"] = True
from PIL import Image
try:
import cv2
except Exception:
cv2 = None
try:
from skimage.measure import label as sk_label, regionprops
except Exception:
sk_label = None
regionprops = None
# reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED)
# sampling caps for heavy ops
SAMPLE_MAX_SLOW = 800
SAMPLE_MAX_TEXTURE = 200
# ----------------------------
# 1) Locate competition paths
# ----------------------------
def find_comp_root():
base = Path("/kaggle/input")
if base.exists():
# pick the first matching directory
cands = [p for p in base.iterdir()
if p.is_dir() and "recodai-luc-scientific-image-forgery-detection" in p.name]
if cands:
return cands[0]
return Path("./recodai-luc-scientific-image-forgery-detection")
COMP_ROOT = find_comp_root()
DIR_TRAIN_IMG = COMP_ROOT / "train_images"
DIR_TRAIN_MASK = COMP_ROOT / "train_masks"
DIR_SUP_IMG = COMP_ROOT / "supplemental_images"
DIR_SUP_MASK = COMP_ROOT / "supplemental_masks"
DIR_TEST_IMG = COMP_ROOT / "test_images"
print("[PATH] COMP_ROOT:", COMP_ROOT)
for p in [DIR_TRAIN_IMG, DIR_TRAIN_MASK, DIR_SUP_IMG, DIR_SUP_MASK, DIR_TEST_IMG]:
print(" -", p, ":", "OK" if p.exists() else "Missing")
# ----------------------------
# 2) IO helpers (robust)
# ----------------------------
IMG_EXTS = {".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"}
MASK_EXTS = {".png", ".npy"}
def get_case_id(path: Path):
m = re.search(r"\\d+", path.stem)
return int(m.group()) if m else path.stem
def read_image(path: Path):
im = Image.open(path).convert("RGB")
return np.array(im) # HWC, uint8
def read_mask_file(path: Path):
# returns (H,W) uint8 in {0,1}
if path.suffix.lower() == ".npy":
m = np.load(path)
if m.ndim == 3 and m.shape[0] == 1: # (1,H,W)
m = m[0]
if m.ndim == 3 and m.shape[-1] == 1: # (H,W,1)
m = m[..., 0]
return (m > 0).astype(np.uint8)
else:
if cv2 is not None:
m = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
if m is None:
m = np.array(Image.open(path).convert("L"))
elif m.ndim == 3:
m = cv2.cvtColor(m, cv2.COLOR_BGR2GRAY)
else:
m = np.array(Image.open(path).convert("L"))
return (m > 0).astype(np.uint8)
def list_mask_files(case_id, mask_dir: Path):
if not mask_dir.exists():
return []
cands = []
cands += list(mask_dir.glob(f"{case_id}.*"))
cands += list(mask_dir.glob(f"{case_id}_*.*"))
cands += [p for p in mask_dir.glob("*.*")
if p.suffix.lower() in MASK_EXTS and re.search(rf"\\b{case_id}\\b", p.stem)]
files = []
for p in cands:
if p.suffix.lower() in MASK_EXTS:
files.append(p)
# unique, sorted by name
files = sorted({str(p): p for p in files}.values(), key=lambda x: x.name)
return files
def read_union_mask(case_id, target_hw=None):
files = list_mask_files(case_id, DIR_TRAIN_MASK) + list_mask_files(case_id, DIR_SUP_MASK)
if len(files) == 0:
return None, 0
masks = []
for f in files:
m = read_mask_file(f)
if target_hw is not None and (m.shape[0], m.shape[1]) != tuple(target_hw):
if cv2 is not None:
m = cv2.resize(m, (target_hw[1], target_hw[0]), interpolation=cv2.INTER_NEAREST)
else:
m = np.array(Image.fromarray(m).resize((target_hw[1], target_hw[0]), resample=Image.NEAREST))
masks.append((m > 0).astype(np.uint8))
union = np.zeros_like(masks[0], dtype=np.uint8)
for m in masks:
union = np.maximum(union, m)
return union, len(files)
# ----------------------------
# 3) Metadata table
# ----------------------------
def collect_images(root: Path, split_name: str):
if not root.exists():
return []
files = []
for ext in IMG_EXTS:
files += list(root.glob(f"*{ext}"))
recs = []
for p in sorted(files, key=lambda x: (len(x.stem), x.stem)):
cid = get_case_id(p)
recs.append({"case_id": cid, "img_path": p, "split": split_name})
return recs
records = []
records += collect_images(DIR_TRAIN_IMG, "train")
records += collect_images(DIR_SUP_IMG, "supplemental")
records += collect_images(DIR_TEST_IMG, "test")
meta = pd.DataFrame.from_records(records)
print("[META] images:", len(meta))
if len(meta) == 0:
raise RuntimeError("No images found. Check dataset mount.")
def probe_fast(path: Path):
try:
with Image.open(path) as im:
im = im.convert("RGB")
w, h = im.size
size_mb = path.stat().st_size / (1024**2)
return pd.Series({"width": w, "height": h, "filesize_mb": size_mb})
except Exception:
return pd.Series({"width": np.nan, "height": np.nan, "filesize_mb": np.nan})
meta = pd.concat([meta, meta["img_path"].apply(probe_fast)], axis=1)
# basic mask stats per image
def mask_row_stats(row):
if row["split"] == "test":
return pd.Series({"mask_count": 0, "coverage": 0.0, "n_comp": 0})
try:
H, W = int(row["height"]), int(row["width"])
union, mcnt = read_union_mask(row["case_id"], target_hw=(H, W))
if union is None:
return pd.Series({"mask_count": 0, "coverage": 0.0, "n_comp": 0})
area = int((union > 0).sum())
cov = float(area) / float(max(1, H * W))
if cv2 is not None:
num, _ = cv2.connectedComponents((union > 0).astype(np.uint8), connectivity=8)
ncomp = max(0, num - 1)
elif sk_label is not None:
ncomp = sk_label(union, connectivity=2).max()
else:
ncomp = int(area > 0)
return pd.Series({"mask_count": mcnt, "coverage": cov, "n_comp": ncomp})
except Exception:
return pd.Series({"mask_count": 0, "coverage": 0.0, "n_comp": 0})
mask_stats = meta.apply(mask_row_stats, axis=1)
meta = pd.concat([meta, mask_stats], axis=1)
meta["is_forged"] = (meta["mask_count"] > 0).astype(int)
meta["aspect"] = meta["width"] / meta["height"]
print(meta.head())
# ----------------------------
# 4) Plot helpers (English labels)
# ----------------------------
def hist1(series, title, bins=30, xlabel=None):
vals = pd.to_numeric(series, errors="coerce").dropna()
if len(vals) == 0:
print("[SKIP] empty for", title); return
plt.figure(figsize=(8,5))
plt.hist(vals, bins=bins)
plt.title(title); plt.xlabel(xlabel or series.name); plt.ylabel("count")
plt.tight_layout(); plt.show()
def scatter_xy(x, y, title, xlabel, ylabel, s=14, alpha=0.7):
xs = pd.to_numeric(x, errors="coerce")
ys = pd.to_numeric(y, errors="coerce")
m = xs.notna() & ys.notna()
if m.sum() == 0:
print("[SKIP] empty for", title); return
plt.figure(figsize=(7,6))
plt.scatter(xs[m], ys[m], s=s, alpha=alpha)
plt.title(title); plt.xlabel(xlabel); plt.ylabel(ylabel)
plt.tight_layout(); plt.show()
def corr_heatmap(df, title="Correlation (numeric)"):
num = df.select_dtypes(include=[np.number]).copy()
if num.shape[1] < 2:
print("[SKIP] not enough numeric columns for corr"); return
c = num.corr(numeric_only=True)
plt.figure(figsize=(7,6))
im = plt.imshow(c, cmap="coolwarm", vmin=-1, vmax=1)
plt.colorbar(im, fraction=0.046, pad=0.04)
plt.xticks(range(len(c.columns)), c.columns, rotation=90)
plt.yticks(range(len(c.columns)), c.columns)
plt.title(title)
plt.tight_layout(); plt.show()
# ----------------------------
# 5) Component-level analytics
# ----------------------------
def component_props(binary_mask):
"""Return list of per-component dicts:
area_px, bbox_w, bbox_h, aspect, cx, cy (0~1), touches_border(bool), ecc(optional)
"""
res = []
if binary_mask is None or binary_mask.max() == 0:
return res
m = (binary_mask > 0).astype(np.uint8)
H, W = m.shape[:2]
if regionprops is not None:
lab = sk_label(m, connectivity=2)
for rp in regionprops(lab):
area = int(rp.area)
minr, minc, maxr, maxc = rp.bbox
bw, bh = (maxc - minc), (maxr - minr)
aspect = float(bw) / float(max(1, bh))
cy, cx = rp.centroid
touches = (minr == 0) or (minc == 0) or (maxr == H) or (maxc == W)
ecc = float(getattr(rp, "eccentricity", np.nan))
res.append({
"area_px": area,
"bbox_w": bw, "bbox_h": bh, "aspect": aspect,
"cx": cx / W, "cy": cy / H,
"touches_border": int(touches),
"eccentricity": ecc
})
elif cv2 is not None:
num, labels, stats, centroids = cv2.connectedComponentsWithStats(m, connectivity=8)
for comp in range(1, num):
x, y, w, h, area = stats[comp]
cx, cy = centroids[comp]
aspect = float(w) / float(max(1, h))
touches = (x == 0) or (y == 0) or (x + w == W) or (y + h == H)
# eccentricity approx from moments (fallback)
ecc = np.nan
res.append({
"area_px": int(area),
"bbox_w": int(w), "bbox_h": int(h), "aspect": aspect,
"cx": float(cx) / W, "cy": float(cy) / H,
"touches_border": int(touches),
"eccentricity": float(ecc)
})
else:
ys, xs = np.where(m > 0)
if len(xs) > 0:
res.append({
"area_px": int(len(xs)),
"bbox_w": int(xs.max() - xs.min() + 1),
"bbox_h": int(ys.max() - ys.min() + 1),
"aspect": float((xs.max() - xs.min() + 1)) / float(max(1, ys.max() - ys.min() + 1)),
"cx": float(xs.mean()) / m.shape[1],
"cy": float(ys.mean()) / m.shape[0],
"touches_border": int((xs.min()==0) or (ys.min()==0) or (xs.max()==m.shape[1]-1) or (ys.max()==m.shape[0]-1)),
"eccentricity": np.nan
})
return res
# per-image -> per-component dataframe (sampled to keep runtime bounded)
non_test_ids = meta.query("split!='test' and mask_count>0")["case_id"].tolist()
comp_sample_ids = random.sample(non_test_ids, k=min(len(non_test_ids), SAMPLE_MAX_SLOW))
comp_rows = []
for cid in comp_sample_ids:
# need image size to align masks exactly
ipath = meta.loc[meta["case_id"] == cid, "img_path"].iloc[0]
H, W = np.array(Image.open(ipath).convert("RGB")).shape[:2]
union, _ = read_union_mask(cid, target_hw=(H, W))
for d in component_props(union):
d["case_id"] = cid
d["area_pct"] = d["area_px"] / float(max(1, H*W))
comp_rows.append(d)
comp_df = pd.DataFrame(comp_rows)
print("[COMP] components sampled:", len(comp_df))
# quick plots for component properties
if len(comp_df):
hist1(comp_df["area_pct"], "Component area ratio", bins=40, xlabel="area / image")
hist1(comp_df["aspect"], "Component bbox aspect", bins=40, xlabel="w / h")
hist1(comp_df["touches_border"], "Component touches border (0/1)", bins=3, xlabel="0/1")
# centroid scatter (normalized)
scatter_xy(comp_df["cx"], comp_df["cy"], "Component centroid distribution", "cx (0~1)", "cy (0~1)")
else:
print("[INFO] no components to analyze")
# ----------------------------
# 6) Global forgery heatmap (where forgeries tend to appear)
# ----------------------------
def aggregate_forgery_heatmap(case_ids, grid=64):
acc = np.zeros((grid, grid), dtype=np.float32)
total = 0
for cid in case_ids:
ipath = meta.loc[meta["case_id"] == cid, "img_path"].iloc[0]
img = read_image(ipath)
H, W = img.shape[:2]
union, _ = read_union_mask(cid, target_hw=(H, W))
if union is None or union.max() == 0:
continue
# resize union mask to fixed grid and accumulate
if cv2 is not None:
m_small = cv2.resize((union>0).astype(np.uint8), (grid, grid), interpolation=cv2.INTER_NEAREST)
else:
m_small = np.array(Image.fromarray((union>0).astype(np.uint8)).resize((grid, grid), resample=Image.NEAREST))
acc += m_small.astype(np.float32)
total += 1
return acc, total
heatmap, hm_n = aggregate_forgery_heatmap(comp_sample_ids, grid=64)
if hm_n > 0:
plt.figure(figsize=(6,5))
plt.imshow(heatmap / heatmap.max(), cmap="magma")
plt.title(f"Global forgery heatmap (n={hm_n})")
plt.axis("off"); plt.tight_layout(); plt.show()
else:
print("[INFO] heatmap skipped (no masks)")
# ----------------------------
# 7) Mask vs. background color statistics
# ----------------------------
def mask_color_stats(cid):
ipath = meta.loc[meta["case_id"] == cid, "img_path"].iloc[0]
img = read_image(ipath).astype(np.float32) / 255.0
H, W = img.shape[:2]
union, _ = read_union_mask(cid, target_hw=(H, W))
if union is None or union.max() == 0:
return None
m = (union > 0)
fg = img[m]
bg = img[~m]
if len(fg) == 0 or len(bg) == 0:
return None
return {
"case_id": cid,
"fg_mean_r": fg[:,0].mean(), "fg_mean_g": fg[:,1].mean(), "fg_mean_b": fg[:,2].mean(),
"bg_mean_r": bg[:,0].mean(), "bg_mean_g": bg[:,1].mean(), "bg_mean_b": bg[:,2].mean(),
"fg_std_r": fg[:,0].std(), "fg_std_g": fg[:,1].std(), "fg_std_b": fg[:,2].std(),
"bg_std_r": bg[:,0].std(), "bg_std_g": bg[:,1].std(), "bg_std_b": bg[:,2].std(),
}
color_rows = []
color_ids = random.sample(non_test_ids, k=min(len(non_test_ids), SAMPLE_MAX_SLOW))
for cid in color_ids:
d = mask_color_stats(cid)
if d is not None:
color_rows.append(d)
color_df = pd.DataFrame(color_rows)
print("[COLOR] samples:", len(color_df))
if len(color_df):
# distribution of foreground - background mean difference per channel
for ch in ["r","g","b"]:
diff = color_df[f"fg_mean_{ch}"] - color_df[f"bg_mean_{ch}"]
hist1(diff, f"Mean difference (mask - background) [{ch.upper()}]", bins=40, xlabel="mean_fg - mean_bg")
else:
print("[INFO] color diff skipped (no data)")
# ----------------------------
# 8) Resolution/size relations + numeric correlation
# ----------------------------
hist1(meta["aspect"], "Image aspect ratio (w/h)", bins=40, xlabel="aspect")
scatter_xy(meta["width"], meta["filesize_mb"], "Resolution vs File size", "width (px)", "file size (MiB)")
scatter_xy(meta["width"]*meta["height"], meta["filesize_mb"], "Pixels vs File size", "#pixels", "file size (MiB)")
# corr matrix on selected numeric columns
corr_cols = ["width","height","filesize_mb","coverage","n_comp","aspect"]
corr_heatmap(meta[corr_cols], title="Correlation (selected numeric)")
# ----------------------------
# 9) Texture: local entropy & autocorr side-peak (sampled)
# ----------------------------
def local_entropy(gray, win=9):
k = win
pad = k//2
g = gray.astype(np.uint8)
# histogram-based entropy in sliding window (fast-ish via integral histogram is heavy; do downsample)
# -> downsample to keep runtime bounded
small = cv2.resize(g, (g.shape[1]//2, g.shape[0]//2), interpolation=cv2.INTER_AREA) if cv2 is not None else np.array(Image.fromarray(g).resize((g.shape[1]//2, g.shape[0]//2)))
from collections import Counter
H, W = small.shape
ent = np.zeros_like(small, dtype=np.float32)
for y in range(pad, H-pad):
for x in range(pad, W-pad):
patch = small[y-pad:y+pad+1, x-pad:x+pad+1].ravel()
cnt = Counter(patch.tolist())
ps = np.array(list(cnt.values()), dtype=np.float32)
ps = ps / ps.sum()
ent[y, x] = -(ps * np.log2(ps + 1e-12)).sum()
return ent.mean()
def autocorr_sidepeak(gray):
# FFT-based autocorrelation; measure 2nd highest peak ratio
f = np.fft.fft2(gray.astype(np.float32))
ac = np.fft.ifft2(np.abs(f)**2).real
ac = np.fft.fftshift(ac)
# zero-lag peak at center
cy, cx = np.array(ac.shape)//2
center = ac[cy, cx]
# mask out small central window to search side peaks
rad = 5
ac2 = ac.copy()
ac2[cy-rad:cy+rad+1, cx-rad:cx+rad+1] = -np.inf
peak2 = np.max(ac2)
if not np.isfinite(peak2) or center == 0:
return np.nan
return float(peak2 / center)
tex_rows = []
tex_ids = random.sample(meta.index.tolist(), k=min(len(meta), SAMPLE_MAX_TEXTURE))
for i in tex_ids:
try:
img = read_image(meta.loc[i,"img_path"])
gray = (0.299*img[...,0] + 0.587*img[...,1] + 0.114*img[...,2]).astype(np.float32)
ent = local_entropy(gray, win=9) if cv2 is not None else np.nan
acp = autocorr_sidepeak(gray)
tex_rows.append({"case_id": meta.loc[i,"case_id"], "entropy_mean": ent, "autocorr_side_peak": acp})
except Exception:
continue
tex_df = pd.DataFrame(tex_rows)
print("[TEXTURE] samples:", len(tex_df))
if len(tex_df):
hist1(tex_df["entropy_mean"], "Local entropy (mean)", bins=30, xlabel="entropy")
hist1(tex_df["autocorr_side_peak"], "Autocorr side-peak ratio", bins=30, xlabel="peak2 / center")
else:
print("[INFO] texture plots skipped")
# ----------------------------
# 10) Summary prints
# ----------------------------
def pct(x): return f"{100.0*float(x):.2f}%"
train_n = (meta["split"]=="train").sum()
sup_n = (meta["split"]=="supplemental").sum()
test_n = (meta["split"]=="test").sum()
non_test = meta["split"]!="test"
forged_n = int((non_test).sum() - (meta.loc[non_test, "mask_count"]==0).sum())
auth_n = int((non_test).sum() - forged_n)
print("\\n================ SUMMARY ================")
print(f"Train: {train_n:,} | Supplemental: {sup_n:,} | Test: {test_n:,}")
print(f"Forged images: {forged_n:,} | Authentic images: {auth_n:,}")
print(f"Median coverage (non-test): {meta.loc[non_test,'coverage'].median():.5f}")
print(f"Mean #components (non-test, with mask): {meta.loc[(non_test)&(meta['mask_count']>0),'n_comp'].mean():.3f}")
if len(comp_df):
print(f"Component area (median pct): {np.median(comp_df['area_pct']):.5f}")
print(f"Touching border ratio: {comp_df['touches_border'].mean():.3f}")
print("=========================================")
# =========================================================
# Recod.ai/LUC - Scientific Image Forgery Detection
# Deep-dive EDA (robust, English plots, no seaborn dependency)
# - Paths auto-detect on Kaggle
# - Safe image/mask IO (.png/.npy), union of multi-masks
# - Component-level analytics (area/box/aspect/centroid/border)
# - Global forgery heatmap (where forgeries tend to appear)
# - Mask-vs-background color stats
# - Resolution/size correlations & numeric corr matrix
# - Optional texture: entropy & autocorr side-peak (sampled)
# =========================================================
# ----------------------------
# 0) Imports & config
# ----------------------------
import os, re, math, random, warnings
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (9, 6)
plt.rcParams["axes.grid"] = True
from PIL import Image
try:
import cv2
except Exception:
cv2 = None
try:
from skimage.measure import label as sk_label, regionprops
except Exception:
sk_label = None
regionprops = None
# reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED)
# sampling caps for heavy ops
SAMPLE_MAX_SLOW = 800
SAMPLE_MAX_TEXTURE = 200
# ----------------------------
# 1) Locate competition paths
# ----------------------------
def find_comp_root():
base = Path("/kaggle/input")
if base.exists():
# pick the first matching directory
cands = [p for p in base.iterdir()
if p.is_dir() and "recodai-luc-scientific-image-forgery-detection" in p.name]
if cands:
return cands[0]
return Path("./recodai-luc-scientific-image-forgery-detection")
COMP_ROOT = find_comp_root()
DIR_TRAIN_IMG = COMP_ROOT / "train_images"
DIR_TRAIN_MASK = COMP_ROOT / "train_masks"
DIR_SUP_IMG = COMP_ROOT / "supplemental_images"
DIR_SUP_MASK = COMP_ROOT / "supplemental_masks"
DIR_TEST_IMG = COMP_ROOT / "test_images"
print("[PATH] COMP_ROOT:", COMP_ROOT)
for p in [DIR_TRAIN_IMG, DIR_TRAIN_MASK, DIR_SUP_IMG, DIR_SUP_MASK, DIR_TEST_IMG]:
print(" -", p, ":", "OK" if p.exists() else "Missing")
# ----------------------------
# 2) IO helpers (robust)
# ----------------------------
IMG_EXTS = {".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"}
MASK_EXTS = {".png", ".npy"}
def get_case_id(path: Path):
m = re.search(r"\\d+", path.stem)
return int(m.group()) if m else path.stem
def read_image(path: Path):
im = Image.open(path).convert("RGB")
return np.array(im) # HWC, uint8
def read_mask_file(path: Path):
# returns (H,W) uint8 in {0,1}
if path.suffix.lower() == ".npy":
m = np.load(path)
if m.ndim == 3 and m.shape[0] == 1: # (1,H,W)
m = m[0]
if m.ndim == 3 and m.shape[-1] == 1: # (H,W,1)
m = m[..., 0]
return (m > 0).astype(np.uint8)
else:
if cv2 is not None:
m = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
if m is None:
m = np.array(Image.open(path).convert("L"))
elif m.ndim == 3:
m = cv2.cvtColor(m, cv2.COLOR_BGR2GRAY)
else:
m = np.array(Image.open(path).convert("L"))
return (m > 0).astype(np.uint8)
def list_mask_files(case_id, mask_dir: Path):
if not mask_dir.exists():
return []
cands = []
cands += list(mask_dir.glob(f"{case_id}.*"))
cands += list(mask_dir.glob(f"{case_id}_*.*"))
cands += [p for p in mask_dir.glob("*.*")
if p.suffix.lower() in MASK_EXTS and re.search(rf"\\b{case_id}\\b", p.stem)]
files = []
for p in cands:
if p.suffix.lower() in MASK_EXTS:
files.append(p)
# unique, sorted by name
files = sorted({str(p): p for p in files}.values(), key=lambda x: x.name)
return files
def read_union_mask(case_id, target_hw=None):
files = list_mask_files(case_id, DIR_TRAIN_MASK) + list_mask_files(case_id, DIR_SUP_MASK)
if len(files) == 0:
return None, 0
masks = []
for f in files:
m = read_mask_file(f)
if target_hw is not None and (m.shape[0], m.shape[1]) != tuple(target_hw):
if cv2 is not None:
m = cv2.resize(m, (target_hw[1], target_hw[0]), interpolation=cv2.INTER_NEAREST)
else:
m = np.array(Image.fromarray(m).resize((target_hw[1], target_hw[0]), resample=Image.NEAREST))
masks.append((m > 0).astype(np.uint8))
union = np.zeros_like(masks[0], dtype=np.uint8)
for m in masks:
union = np.maximum(union, m)
return union, len(files)
# ----------------------------
# 3) Metadata table
# ----------------------------
def collect_images(root: Path, split_name: str):
if not root.exists():
return []
files = []
for ext in IMG_EXTS:
files += list(root.glob(f"*{ext}"))
recs = []
for p in sorted(files, key=lambda x: (len(x.stem), x.stem)):
cid = get_case_id(p)
recs.append({"case_id": cid, "img_path": p, "split": split_name})
return recs
records = []
records += collect_images(DIR_TRAIN_IMG, "train")
records += collect_images(DIR_SUP_IMG, "supplemental")
records += collect_images(DIR_TEST_IMG, "test")
meta = pd.DataFrame.from_records(records)
print("[META] images:", len(meta))
if len(meta) == 0:
raise RuntimeError("No images found. Check dataset mount.")
def probe_fast(path: Path):
try:
with Image.open(path) as im:
im = im.convert("RGB")
w, h = im.size
size_mb = path.stat().st_size / (1024**2)
return pd.Series({"width": w, "height": h, "filesize_mb": size_mb})
except Exception:
return pd.Series({"width": np.nan, "height": np.nan, "filesize_mb": np.nan})
meta = pd.concat([meta, meta["img_path"].apply(probe_fast)], axis=1)
# basic mask stats per image
def mask_row_stats(row):
if row["split"] == "test":
return pd.Series({"mask_count": 0, "coverage": 0.0, "n_comp": 0})
try:
H, W = int(row["height"]), int(row["width"])
union, mcnt = read_union_mask(row["case_id"], target_hw=(H, W))
if union is None:
return pd.Series({"mask_count": 0, "coverage": 0.0, "n_comp": 0})
area = int((union > 0).sum())
cov = float(area) / float(max(1, H * W))
if cv2 is not None:
num, _ = cv2.connectedComponents((union > 0).astype(np.uint8), connectivity=8)
ncomp = max(0, num - 1)
elif sk_label is not None:
ncomp = sk_label(union, connectivity=2).max()
else:
ncomp = int(area > 0)
return pd.Series({"mask_count": mcnt, "coverage": cov, "n_comp": ncomp})
except Exception:
return pd.Series({"mask_count": 0, "coverage": 0.0, "n_comp": 0})
mask_stats = meta.apply(mask_row_stats, axis=1)
meta = pd.concat([meta, mask_stats], axis=1)
meta["is_forged"] = (meta["mask_count"] > 0).astype(int)
meta["aspect"] = meta["width"] / meta["height"]
print(meta.head())
# ----------------------------
# 4) Plot helpers (English labels)
# ----------------------------
def hist1(series, title, bins=30, xlabel=None):
vals = pd.to_numeric(series, errors="coerce").dropna()
if len(vals) == 0:
print("[SKIP] empty for", title); return
plt.figure(figsize=(8,5))
plt.hist(vals, bins=bins)
plt.title(title); plt.xlabel(xlabel or series.name); plt.ylabel("count")
plt.tight_layout(); plt.show()
def scatter_xy(x, y, title, xlabel, ylabel, s=14, alpha=0.7):
xs = pd.to_numeric(x, errors="coerce")
ys = pd.to_numeric(y, errors="coerce")
m = xs.notna() & ys.notna()
if m.sum() == 0:
print("[SKIP] empty for", title); return
plt.figure(figsize=(7,6))
plt.scatter(xs[m], ys[m], s=s, alpha=alpha)
plt.title(title); plt.xlabel(xlabel); plt.ylabel(ylabel)
plt.tight_layout(); plt.show()
def corr_heatmap(df, title="Correlation (numeric)"):
num = df.select_dtypes(include=[np.number]).copy()
if num.shape[1] < 2:
print("[SKIP] not enough numeric columns for corr"); return
c = num.corr(numeric_only=True)
plt.figure(figsize=(7,6))
im = plt.imshow(c, cmap="coolwarm", vmin=-1, vmax=1)
plt.colorbar(im, fraction=0.046, pad=0.04)
plt.xticks(range(len(c.columns)), c.columns, rotation=90)
plt.yticks(range(len(c.columns)), c.columns)
plt.title(title)
plt.tight_layout(); plt.show()
# ----------------------------
# 5) Component-level analytics
# ----------------------------
def component_props(binary_mask):
"""Return list of per-component dicts:
area_px, bbox_w, bbox_h, aspect, cx, cy (0~1), touches_border(bool), ecc(optional)
"""
res = []
if binary_mask is None or binary_mask.max() == 0:
return res
m = (binary_mask > 0).astype(np.uint8)
H, W = m.shape[:2]
if regionprops is not None:
lab = sk_label(m, connectivity=2)
for rp in regionprops(lab):
area = int(rp.area)
minr, minc, maxr, maxc = rp.bbox
bw, bh = (maxc - minc), (maxr - minr)
aspect = float(bw) / float(max(1, bh))
cy, cx = rp.centroid
touches = (minr == 0) or (minc == 0) or (maxr == H) or (maxc == W)
ecc = float(getattr(rp, "eccentricity", np.nan))
res.append({
"area_px": area,
"bbox_w": bw, "bbox_h": bh, "aspect": aspect,
"cx": cx / W, "cy": cy / H,
"touches_border": int(touches),
"eccentricity": ecc
})
elif cv2 is not None:
num, labels, stats, centroids = cv2.connectedComponentsWithStats(m, connectivity=8)
for comp in range(1, num):
x, y, w, h, area = stats[comp]
cx, cy = centroids[comp]
aspect = float(w) / float(max(1, h))
touches = (x == 0) or (y == 0) or (x + w == W) or (y + h == H)
# eccentricity approx from moments (fallback)
ecc = np.nan
res.append({
"area_px": int(area),
"bbox_w": int(w), "bbox_h": int(h), "aspect": aspect,
"cx": float(cx) / W, "cy": float(cy) / H,
"touches_border": int(touches),
"eccentricity": float(ecc)
})
else:
ys, xs = np.where(m > 0)
if len(xs) > 0:
res.append({
"area_px": int(len(xs)),
"bbox_w": int(xs.max() - xs.min() + 1),
"bbox_h": int(ys.max() - ys.min() + 1),
"aspect": float((xs.max() - xs.min() + 1)) / float(max(1, ys.max() - ys.min() + 1)),
"cx": float(xs.mean()) / m.shape[1],
"cy": float(ys.mean()) / m.shape[0],
"touches_border": int((xs.min()==0) or (ys.min()==0) or (xs.max()==m.shape[1]-1) or (ys.max()==m.shape[0]-1)),
"eccentricity": np.nan
})
return res
# per-image -> per-component dataframe (sampled to keep runtime bounded)
non_test_ids = meta.query("split!='test' and mask_count>0")["case_id"].tolist()
comp_sample_ids = random.sample(non_test_ids, k=min(len(non_test_ids), SAMPLE_MAX_SLOW))
comp_rows = []
for cid in comp_sample_ids:
# need image size to align masks exactly
ipath = meta.loc[meta["case_id"] == cid, "img_path"].iloc[0]
H, W = np.array(Image.open(ipath).convert("RGB")).shape[:2]
union, _ = read_union_mask(cid, target_hw=(H, W))
for d in component_props(union):
d["case_id"] = cid
d["area_pct"] = d["area_px"] / float(max(1, H*W))
comp_rows.append(d)
comp_df = pd.DataFrame(comp_rows)
print("[COMP] components sampled:", len(comp_df))
# quick plots for component properties
if len(comp_df):
hist1(comp_df["area_pct"], "Component area ratio", bins=40, xlabel="area / image")
hist1(comp_df["aspect"], "Component bbox aspect", bins=40, xlabel="w / h")
hist1(comp_df["touches_border"], "Component touches border (0/1)", bins=3, xlabel="0/1")
# centroid scatter (normalized)
scatter_xy(comp_df["cx"], comp_df["cy"], "Component centroid distribution", "cx (0~1)", "cy (0~1)")
else:
print("[INFO] no components to analyze")
# ----------------------------
# 6) Global forgery heatmap (where forgeries tend to appear)
# ----------------------------
def aggregate_forgery_heatmap(case_ids, grid=64):
acc = np.zeros((grid, grid), dtype=np.float32)
total = 0
for cid in case_ids:
ipath = meta.loc[meta["case_id"] == cid, "img_path"].iloc[0]
img = read_image(ipath)
H, W = img.shape[:2]
union, _ = read_union_mask(cid, target_hw=(H, W))
if union is None or union.max() == 0:
continue
# resize union mask to fixed grid and accumulate
if cv2 is not None:
m_small = cv2.resize((union>0).astype(np.uint8), (grid, grid), interpolation=cv2.INTER_NEAREST)
else:
m_small = np.array(Image.fromarray((union>0).astype(np.uint8)).resize((grid, grid), resample=Image.NEAREST))
acc += m_small.astype(np.float32)
total += 1
return acc, total
heatmap, hm_n = aggregate_forgery_heatmap(comp_sample_ids, grid=64)
if hm_n > 0:
plt.figure(figsize=(6,5))
plt.imshow(heatmap / heatmap.max(), cmap="magma")
plt.title(f"Global forgery heatmap (n={hm_n})")
plt.axis("off"); plt.tight_layout(); plt.show()
else:
print("[INFO] heatmap skipped (no masks)")
# ----------------------------
# 7) Mask vs. background color statistics
# ----------------------------
def mask_color_stats(cid):
ipath = meta.loc[meta["case_id"] == cid, "img_path"].iloc[0]
img = read_image(ipath).astype(np.float32) / 255.0
H, W = img.shape[:2]
union, _ = read_union_mask(cid, target_hw=(H, W))
if union is None or union.max() == 0:
return None
m = (union > 0)
fg = img[m]
bg = img[~m]
if len(fg) == 0 or len(bg) == 0:
return None
return {
"case_id": cid,
"fg_mean_r": fg[:,0].mean(), "fg_mean_g": fg[:,1].mean(), "fg_mean_b": fg[:,2].mean(),
"bg_mean_r": bg[:,0].mean(), "bg_mean_g": bg[:,1].mean(), "bg_mean_b": bg[:,2].mean(),
"fg_std_r": fg[:,0].std(), "fg_std_g": fg[:,1].std(), "fg_std_b": fg[:,2].std(),
"bg_std_r": bg[:,0].std(), "bg_std_g": bg[:,1].std(), "bg_std_b": bg[:,2].std(),
}
color_rows = []
color_ids = random.sample(non_test_ids, k=min(len(non_test_ids), SAMPLE_MAX_SLOW))
for cid in color_ids:
d = mask_color_stats(cid)
if d is not None:
color_rows.append(d)
color_df = pd.DataFrame(color_rows)
print("[COLOR] samples:", len(color_df))
if len(color_df):
# distribution of foreground - background mean difference per channel
for ch in ["r","g","b"]:
diff = color_df[f"fg_mean_{ch}"] - color_df[f"bg_mean_{ch}"]
hist1(diff, f"Mean difference (mask - background) [{ch.upper()}]", bins=40, xlabel="mean_fg - mean_bg")
else:
print("[INFO] color diff skipped (no data)")
# ----------------------------
# 8) Resolution/size relations + numeric correlation
# ----------------------------
hist1(meta["aspect"], "Image aspect ratio (w/h)", bins=40, xlabel="aspect")
scatter_xy(meta["width"], meta["filesize_mb"], "Resolution vs File size", "width (px)", "file size (MiB)")
scatter_xy(meta["width"]*meta["height"], meta["filesize_mb"], "Pixels vs File size", "#pixels", "file size (MiB)")
# corr matrix on selected numeric columns
corr_cols = ["width","height","filesize_mb","coverage","n_comp","aspect"]
corr_heatmap(meta[corr_cols], title="Correlation (selected numeric)")
# ----------------------------
# 9) Texture: local entropy & autocorr side-peak (sampled)
# ----------------------------
def local_entropy(gray, win=9):
k = win
pad = k//2
g = gray.astype(np.uint8)
# histogram-based entropy in sliding window (fast-ish via integral histogram is heavy; do downsample)
# -> downsample to keep runtime bounded
small = cv2.resize(g, (g.shape[1]//2, g.shape[0]//2), interpolation=cv2.INTER_AREA) if cv2 is not None else np.array(Image.fromarray(g).resize((g.shape[1]//2, g.shape[0]//2)))
from collections import Counter
H, W = small.shape
ent = np.zeros_like(small, dtype=np.float32)
for y in range(pad, H-pad):
for x in range(pad, W-pad):
patch = small[y-pad:y+pad+1, x-pad:x+pad+1].ravel()
cnt = Counter(patch.tolist())
ps = np.array(list(cnt.values()), dtype=np.float32)
ps = ps / ps.sum()
ent[y, x] = -(ps * np.log2(ps + 1e-12)).sum()
return ent.mean()
def autocorr_sidepeak(gray):
# FFT-based autocorrelation; measure 2nd highest peak ratio
f = np.fft.fft2(gray.astype(np.float32))
ac = np.fft.ifft2(np.abs(f)**2).real
ac = np.fft.fftshift(ac)
# zero-lag peak at center
cy, cx = np.array(ac.shape)//2
center = ac[cy, cx]
# mask out small central window to search side peaks
rad = 5
ac2 = ac.copy()
ac2[cy-rad:cy+rad+1, cx-rad:cx+rad+1] = -np.inf
peak2 = np.max(ac2)
if not np.isfinite(peak2) or center == 0:
return np.nan
return float(peak2 / center)
tex_rows = []
tex_ids = random.sample(meta.index.tolist(), k=min(len(meta), SAMPLE_MAX_TEXTURE))
for i in tex_ids:
try:
img = read_image(meta.loc[i,"img_path"])
gray = (0.299*img[...,0] + 0.587*img[...,1] + 0.114*img[...,2]).astype(np.float32)
ent = local_entropy(gray, win=9) if cv2 is not None else np.nan
acp = autocorr_sidepeak(gray)
tex_rows.append({"case_id": meta.loc[i,"case_id"], "entropy_mean": ent, "autocorr_side_peak": acp})
except Exception:
continue
tex_df = pd.DataFrame(tex_rows)
print("[TEXTURE] samples:", len(tex_df))
if len(tex_df):
hist1(tex_df["entropy_mean"], "Local entropy (mean)", bins=30, xlabel="entropy")
hist1(tex_df["autocorr_side_peak"], "Autocorr side-peak ratio", bins=30, xlabel="peak2 / center")
else:
print("[INFO] texture plots skipped")
# ----------------------------
# 10) Summary prints
# ----------------------------
def pct(x): return f"{100.0*float(x):.2f}%"
train_n = (meta["split"]=="train").sum()
sup_n = (meta["split"]=="supplemental").sum()
test_n = (meta["split"]=="test").sum()
non_test = meta["split"]!="test"
forged_n = int((non_test).sum() - (meta.loc[non_test, "mask_count"]==0).sum())
auth_n = int((non_test).sum() - forged_n)
print("\\n================ SUMMARY ================")
print(f"Train: {train_n:,} | Supplemental: {sup_n:,} | Test: {test_n:,}")
print(f"Forged images: {forged_n:,} | Authentic images: {auth_n:,}")
print(f"Median coverage (non-test): {meta.loc[non_test,'coverage'].median():.5f}")
print(f"Mean #components (non-test, with mask): {meta.loc[(non_test)&(meta['mask_count']>0),'n_comp'].mean():.3f}")
if len(comp_df):
print(f"Component area (median pct): {np.median(comp_df['area_pct']):.5f}")
print(f"Touching border ratio: {comp_df['touches_border'].mean():.3f}")
print("=========================================")

๋ฌด์์ ๊ทธ๋ฆฐ ๊ทธ๋ํ?
ํ ์ด๋ฏธ์ง ์์ ์๋ ์์กฐ ๋ฉ์ด๋ฆฌ(connected component) ํ๋ํ๋๊ฐ
์ ์ฒด ์ด๋ฏธ์ง์์ ๋ช ํผ์ผํธ๋ฅผ ์ฐจ์งํ๋์ง(area / image)๋ฅผ ํ์คํ ๊ทธ๋จ์ผ๋ก
ํต์ฌ ์ฉ์ด
๊ด์ฐฐ ๊ฒฐ๊ณผ
ํด์