DB 없이 인메모리에서 실제 센서와 유사하게 동작하는 시뮬레이터를 구현했습니다. 실제 센서는 노이즈가 있고, 정상 범위로 회귀하는 성질이 있습니다. 이를 Gaussian Drift 알고리즘으로 모델링했습니다.

1. 센서 설정 (SENSOR_CONFIG)

4개 센서(pH, 탁도, 유량, 수온)각 정상/경고/위험 임계값을 정의합니다.

SENSOR_CONFIG = {
    SensorType.PH: {
        "unit": "pH",
        "range": (4.0, 11.0),    # 시뮬레이션 생성 가능 범위
        "normal": (6.5, 8.5),   # 정상 범위
        "warning": (6.0, 9.0),  # 경고 범위
    },
    SensorType.TURBIDITY: {
        "unit": "NTU",
        "range": (0.0, 15.0),
        "normal": (0.0, 5.0),
        "warning": (0.0, 10.0),
    },
    SensorType.FLOW: {
        "unit": "m³/h",
        "range": (10.0, 200.0),
        "normal": (50.0, 150.0),
        "warning": (30.0, 180.0),
    },
    SensorType.TEMP: {
        "unit": "°C",
        "range": (5.0, 35.0),
        "normal": (15.0, 25.0),
        "warning": (10.0, 30.0),
    },
}

상태 분류 로직

def _classify(value: float, config: dict) -> SensorStatus:
    n_lo, n_hi = config["normal"]
    w_lo, w_hi = config["warning"]

    if n_lo <= value <= n_hi:
        return SensorStatus.NORMAL
    elif w_lo <= value <= w_hi:
        return SensorStatus.WARNING
    else:
        return SensorStatus.DANGER
조건 상태
normal_min <= value <= normal_max NORMAL
warning_min <= value <= warning_max WARNING
범위 밖 DANGER

2. Gaussian Drift 알고리즘

실제 센서는 이전 값에서 조금씩 변하며, 장기적으로 정상 범위로 돌아옵니다. 이를 Gaussian 분포로 모델링합니다.

def _generate_value(config: dict, prev_value: float | None = None) -> float:
    r_lo, r_hi = config["range"]
    n_lo, n_hi = config["normal"]

    if prev_value is None:
        # 최초 실행: 정상 범위 내에서 샘플링
        base = random.uniform(n_lo, n_hi)
        noise = random.gauss(0, (n_hi - n_lo) * 0.3)
        return round(max(r_lo, min(r_hi, base + noise)), 2)

    # 이전 값에서 drift
    drift = random.gauss(0, (n_hi - n_lo) * 0.15)  # 로컬 노이즈
    center = (n_lo + n_hi) / 2
    pull = (center - prev_value) * 0.05              # 중심 복구력
    new_val = prev_value + drift + pull

    return round(max(r_lo, min(r_hi, new_val)), 2)  # 범위 제한

알고리즘 구성 요소

요소 수식 역할
초기값 base + N(0, σ²) 정상 범위 내에서 시작
σ (표준편차) (n_hi - n_lo) × 0.3 정상 범위의 30%
Drift 노이즈 N(0, σ²) (n_hi - n_lo) × 0.15 — 새 습값마다 조금씩 즈투름
Center Pull (center - prev) × 0.05 정상 중심으로 5% 복원력
Boundary Clamp max(r_lo, min(r_hi, ...)) 생성 범위 박으로 나가지 못하게 강제 제한

직관적 이해

실제 pH 센서의 거동 예:

시간  값     상태
 t=0   7.2   NORMAL
 t=1   7.35  NORMAL  (↑ drift)
 t=2   7.21  NORMAL  (↓ drift + center pull)
 t=3   7.08  NORMAL  (↓ drift)
 t=4   6.89  NORMAL  (↓ drift 이어집)
 t=5   6.72  WARNING (임계값 6.5 근접)
 t=6   6.98  WARNING (center pull로 회복 중)
 t=7   7.15  NORMAL  (회복)

3. SensorSimulator 클래스

class SensorSimulator:
    def __init__(self) -> None:
        self._current: dict[SensorType, float] = {}
        # deque: 자동으로 최대 100개만 유지
        self._history: dict[SensorType, deque] = {
            st: deque(maxlen=100) for st in SensorType
        }
        self._tick()  # 초기 데이터 생성

    def _tick(self) -> None:
        """4개 센서 모두 새 데이터 생성."""
        now = datetime.now(timezone.utc)
        for sensor_type, config in SENSOR_CONFIG.items():
            prev = self._current.get(sensor_type)
            value = _generate_value(config, prev)
            status = _classify(value, config)
            self._current[sensor_type] = value
            self._history[sensor_type].append({
                "value": value,
                "status": status,
                "timestamp": now,
            })

상태 관리 전략

설계 결정 이유
Singleton 인스턴스 simulator = SensorSimulator() — 앱 전체에서 하나의 상태 공유
deque(maxlen=100) 최근 100개만 자동 유지, 오래된 데이터 자동 제거
UTC 타임스탬프 모든 데이터에 UTC 기준 시간 기록
매 요청마다 tick get_all_sensors() 호출 시 새 데이터 생성 → 3초 폴링으로 실시간성 확보