초기 receiver는 timer가 한 번 돌 때마다 recvfrom()을 1번만 호출하는 구조였습니다. 이 방식은 UDP datagram이 짧은 시간에 많이 들어올 때 처리량이 부족해서, 한 프레임을 구성하는 chunk를 끝까지 모으기 전에 다음 datagram들이 밀리는 문제가 있었습니다. 특히 프레임 하나가 수백 개 chunk로 분할되던 구간에서는 received_datagrams는 계속 증가하지만 assembled_payloads는 0인 상태가 반복됐습니다.
이를 해결하기 위해 receiver를 non-blocking 소켓으로 바꾸고, timer callback 안에서 소켓이 빌 때까지 while True로 datagram을 연속해서 읽도록 수정했습니다. 즉, “한 tick에 1개 처리”에서 “한 tick에 가능한 만큼 모두 drain”하는 구조로 바꿨습니다. 추가로 chunk-level 로그와 datagram 카운터를 넣어서, UDP가 아예 안 오는지, 오는데 조립이 안 되는지를 바로 구분할 수 있게 했습니다. 또 UdpChunkAssembler의 stale timeout도 늘려서 느린 환경에서 partial frame이 중간에 버려지지 않도록 했습니다.
초기 sender는 원본에 가까운 큰 payload를 전송해서, 프레임 하나가 매우 많은 chunk로 분할됐습니다. 이 때문에 receiver가 프레임 조립을 완료하기 어려웠습니다. 이를 줄이기 위해 sender 쪽 전송량을 세 방향으로 낮췄습니다.
첫째, send_hz를 낮춰 초당 전송 프레임 수 자체를 줄였습니다. 둘째, resize_scale=0.5를 적용해 RGB와 Depth 이미지를 절반 해상도로 축소한 뒤 전송하도록 바꿨습니다. 예를 들어 640x480 입력이면 대략 320x240 크기로 줄여 보내게 됩니다. 셋째, 축소된 이미지에 맞게 camera_info도 함께 스케일링해서 수신 측 좌표 계산이 깨지지 않도록 했습니다.
이 경량화 결과, 이전에는 한 프레임이 250개 이상 chunk로 쪼개지던 상황에서, 경량화 후에는 약 8 chunk 수준까지 줄어든 구간이 확인됐습니다. 이 변화가 receiver의 조립 성공으로 직접 이어졌고, 결국 chair_detection_json 발행 복구의 핵심 개선점이 됐습니다.
def on_timer(self):
try:
datagram, _ = self.recv_sock.recvfrom(65_535)
except socket.timeout:
return
except Exception as exc:
self.get_logger().warn(f"UDP receive error: {exc}")
return
try:
payload = self.assembler.push(datagram)
if payload is None:
return
packet = parse_frame_payload(payload)
if should_skip_packet(packet, self.args.max_frame_age_sec, self.args.max_rgb_depth_skew_sec):
return
result = self.detect_single_chair(packet)
if result["detection"] is None:
return
msg = String()
msg.data = json.dumps(result)
self.publisher.publish(msg)
except Exception as exc:
self.get_logger().warn(f"Detection pipeline error: {exc}")