DB 없이 인메모리에서 실제 센서와 유사하게 동작하는 시뮬레이터를 구현했습니다. 실제 센서는 노이즈가 있고, 정상 범위로 회귀하는 성질이 있습니다. 이를 Gaussian Drift 알고리즘으로 모델링했습니다.
4개 센서(pH, 탁도, 유량, 수온)각 정상/경고/위험 임계값을 정의합니다.
SENSOR_CONFIG = {
SensorType.PH: {
"unit": "pH",
"range": (4.0, 11.0), # 시뮬레이션 생성 가능 범위
"normal": (6.5, 8.5), # 정상 범위
"warning": (6.0, 9.0), # 경고 범위
},
SensorType.TURBIDITY: {
"unit": "NTU",
"range": (0.0, 15.0),
"normal": (0.0, 5.0),
"warning": (0.0, 10.0),
},
SensorType.FLOW: {
"unit": "m³/h",
"range": (10.0, 200.0),
"normal": (50.0, 150.0),
"warning": (30.0, 180.0),
},
SensorType.TEMP: {
"unit": "°C",
"range": (5.0, 35.0),
"normal": (15.0, 25.0),
"warning": (10.0, 30.0),
},
}
def _classify(value: float, config: dict) -> SensorStatus:
n_lo, n_hi = config["normal"]
w_lo, w_hi = config["warning"]
if n_lo <= value <= n_hi:
return SensorStatus.NORMAL
elif w_lo <= value <= w_hi:
return SensorStatus.WARNING
else:
return SensorStatus.DANGER
| 조건 | 상태 |
|---|---|
normal_min <= value <= normal_max |
NORMAL |
warning_min <= value <= warning_max |
WARNING |
| 범위 밖 | DANGER |
실제 센서는 이전 값에서 조금씩 변하며, 장기적으로 정상 범위로 돌아옵니다. 이를 Gaussian 분포로 모델링합니다.
def _generate_value(config: dict, prev_value: float | None = None) -> float:
r_lo, r_hi = config["range"]
n_lo, n_hi = config["normal"]
if prev_value is None:
# 최초 실행: 정상 범위 내에서 샘플링
base = random.uniform(n_lo, n_hi)
noise = random.gauss(0, (n_hi - n_lo) * 0.3)
return round(max(r_lo, min(r_hi, base + noise)), 2)
# 이전 값에서 drift
drift = random.gauss(0, (n_hi - n_lo) * 0.15) # 로컬 노이즈
center = (n_lo + n_hi) / 2
pull = (center - prev_value) * 0.05 # 중심 복구력
new_val = prev_value + drift + pull
return round(max(r_lo, min(r_hi, new_val)), 2) # 범위 제한
| 요소 | 수식 | 역할 |
|---|---|---|
| 초기값 | base + N(0, σ²) |
정상 범위 내에서 시작 |
| σ (표준편차) | (n_hi - n_lo) × 0.3 |
정상 범위의 30% |
| Drift 노이즈 | N(0, σ²) |
(n_hi - n_lo) × 0.15 — 새 습값마다 조금씩 즈투름 |
| Center Pull | (center - prev) × 0.05 |
정상 중심으로 5% 복원력 |
| Boundary Clamp | max(r_lo, min(r_hi, ...)) |
생성 범위 박으로 나가지 못하게 강제 제한 |
실제 pH 센서의 거동 예:
시간 값 상태
t=0 7.2 NORMAL
t=1 7.35 NORMAL (↑ drift)
t=2 7.21 NORMAL (↓ drift + center pull)
t=3 7.08 NORMAL (↓ drift)
t=4 6.89 NORMAL (↓ drift 이어집)
t=5 6.72 WARNING (임계값 6.5 근접)
t=6 6.98 WARNING (center pull로 회복 중)
t=7 7.15 NORMAL (회복)
class SensorSimulator:
def __init__(self) -> None:
self._current: dict[SensorType, float] = {}
# deque: 자동으로 최대 100개만 유지
self._history: dict[SensorType, deque] = {
st: deque(maxlen=100) for st in SensorType
}
self._tick() # 초기 데이터 생성
def _tick(self) -> None:
"""4개 센서 모두 새 데이터 생성."""
now = datetime.now(timezone.utc)
for sensor_type, config in SENSOR_CONFIG.items():
prev = self._current.get(sensor_type)
value = _generate_value(config, prev)
status = _classify(value, config)
self._current[sensor_type] = value
self._history[sensor_type].append({
"value": value,
"status": status,
"timestamp": now,
})
| 설계 결정 | 이유 |
|---|---|
| Singleton 인스턴스 | simulator = SensorSimulator() — 앱 전체에서 하나의 상태 공유 |
| deque(maxlen=100) | 최근 100개만 자동 유지, 오래된 데이터 자동 제거 |
| UTC 타임스탬프 | 모든 데이터에 UTC 기준 시간 기록 |
| 매 요청마다 tick | get_all_sensors() 호출 시 새 데이터 생성 → 3초 폴링으로 실시간성 확보 |