Exchange의 topic(토픽)

AMQP_URL='amqp'

# SCHOOL.NOTIFICATIONS exchange 이름
EXCHANGE_SCHOOL_NOTIFICATION='SCHOOL.NOTIFICATIONS'
# 공지사항 이벤트 routing key
SCHOOL_NOTIFICATIONS_TOPIC_NOTICE='notice'
# 기숙사 공지사항 routing key
SCHOOL_NOTIFICATIONS_TOPIC_NOTICE_DORMITORY='notice.dormitory'
# 셔틀 시간표 변경 routing key
SCHOOL_NOTIFICATIONS_TOPIC_SHUTTLE_SCHEDULE='shuttlebus'

만약 새로운 공지사항에 대한 이벤트를 소비하고 싶다면

const assertQueue = await this.ch.assertQueue(
  "", //
  { autoDelete: true, durable: true },
);

await this.ch.bindQueue(
  assertQueue.queue,
  "SCHOOL.NOTIFICATIONS",
  "notice",
);

this.ch.consume(assertQueue.queue, async (msg) => {
  const content = msg?.content?.toString();
  if (!content) {
    console.log("Consumer cancelled by server");
  } else {
    await callback(content);
  }
});

node.js 코드

export class AmqpService {
  public static EXCHANGE_NAME = "SCHOOL.NOTIFICATIONS";
  private ch: Channel;

  public async publishEvent(
    routeKey: keyof typeof EVENT_TOPIC,
    content: string,
  ) {
    this.ch.publish(
      AmqpService.EXCHANGE_NAME, //
      EVENT_TOPIC[routeKey],
      Buffer.from(content),
    );
  }

  public async subscribeEvent(
    routeKey: keyof typeof EVENT_TOPIC,
    // eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
    callback: Function,
  ) {
    // queue 생성
    const assertQueue = await this.ch.assertQueue(
      "", // 임의의 큐 (랜덤 문자열 Queue 생성으로 소비자가 없으면 자동으로 제거되는 큐를 생성한다.)
      { autoDelete: true, durable: true },
    );

    // queue 바인딩
    await this.ch.bindQueue(
      assertQueue.queue,
      AmqpService.EXCHANGE_NAME,
      EVENT_TOPIC[routeKey],
    );

    this.ch.consume(assertQueue.queue, async (msg) => {
      const content = msg?.content?.toString();
      if (!content) {
        console.log("Consumer cancelled by server");
      } else {
        await callback(content);
      }
    });
  }

  async onModuleInit(): Promise<void> {
    this.ch = await this.ch.assertExchange(
      AmqpService.EXCHANGE_NAME, //
      "topic",
      // { durable: true }
    );
    // exchange 생성 (있다면 있는것으로 조회)
    await this.ch.assertExchange(
      AmqpService.EXCHANGE_NAME, //
      "topic",
      // { durable: true }
    );
  }
}

export const EVENT_TOPIC = {
  NOTICE: process.env.SCHOOL_NOTIFICATIONS_TOPIC_NOTICE ?? "",
  NOTICE_DORMITORY:
    process.env.SCHOOL_NOTIFICATIONS_TOPIC_NOTICE_DORMITORY ?? "",
  SHUTTLE_SCHEDULE:
    process.env.SCHOOL_NOTIFICATIONS_TOPIC_SHUTTLE_SCHEDULE ?? "",
} as const;

https://www.rabbitmq.com/tutorials

이벤트 구독, 발행 서비스 개발에 대한 고찰 (이벤트 중앙 집중)

RabbitMQ 를 이용한 비동기 이벤트 처리

그림, 코드로 이해하는 이벤트 발행, 구독 로직