JS/TIL(Today I Learned)

2025-03-04 <최종프로젝트 D-10>

프린스 알리 2025. 3. 4.

BullMQ 도입 및 사용법

BullMQ는 Redis 기반의 Job Queue 라이브러리로, 비동기 작업을 관리하는 데 적합하다. 네비게이션 서버가 길찾기 요청을 효율적으로 처리하고, 메인 서버가 결과를 안정적으로 받을 수 있도록 BullMQ를 활용하는 방법을 설명하겠다.


1. BullMQ 설치

BullMQ는 bull의 개선된 버전으로, TypeScript 지원과 여러 기능이 강화되었다.

yarn add bullmq
# 또는 npm 사용 시
npm install bullmq

2. 기본적인 개념

BullMQ는 Redis를 이용해 작업을 큐에 추가하고, 이를 비동기적으로 처리한다. 주요 개념은 다음과 같다.

  • Queue: 작업(Job)을 추가하는 역할
  • Worker: 큐에서 작업을 가져와 실행하는 역할
  • Job: 실제 실행할 작업 데이터
  • Events: 작업 진행 상태를 감지하는 기능

3. 메인 서버에서 길찾기 요청을 큐에 추가

메인 서버는 플레이어가 이동할 때 Queue를 이용해 길찾기 요청을 보낸다.

import { Queue } from "bullmq";
import IORedis from "ioredis";

// Redis 연결 설정
const redisConnection = new IORedis(
    host: "192.168.1.100",  // Redis 서버 IP
  port: 6379,
  password: "yourpassword",
  maxRetriesPerRequest: null, // 비동기 작업이 끊기지 않도록 설정
);

// 네비게이션 요청을 위한 큐 생성
const navigationQueue = new Queue("navigationQueue", { connection: redisConnection });

// 플레이어가 이동할 때 길찾기 요청 추가
export async function requestPathFinding(startPos, endPos) {
  const job = await navigationQueue.add("findPath", {
    start: startPos,
    end: endPos
  });

  console.log(`Job ${job.id} added to the queue`);
}

4. 네비게이션 서버에서 경로 계산 처리 (Worker)

네비게이션 서버는 Worker를 사용해 메인 서버에서 들어온 요청을 처리한다.

import { Worker } from "bullmq";
import IORedis from "ioredis";
import { findPath, loadNavMesh } from "./navMeshLoader.js";

// Redis 연결 설정
const redisConnection = new IORedis(
    host: "192.168.1.100",  // Redis 서버 IP
  port: 6379,
  password: "yourpassword",
  maxRetriesPerRequest: null, // 비동기 작업이 끊기지 않도록 설정
);

// NavMesh 로드
const navMesh = townNavMesh;

// Worker 생성 (네비게이션 서버에서 실행)
const navigationWorker = new Worker(
  "navigationQueue",
  async (job) => {
    console.log(`Processing job ${job.id}`);

    const { start, end } = job.data;
    const path = await findPath(navMesh, start, end);

    return path; // 결과 반환
  },
  { connection: redisConnection }
);

5. 메인 서버에서 결과 받아오기

BullMQ는 작업이 완료되면 Job 객체에서 결과를 가져올 수 있다.

import { Job } from "bullmq";

async function getPathResult(jobId) {
  const job = await Job.fromId(navigationQueue, jobId);
  if (job && job.returnvalue) {
    console.log(`Job ${jobId} completed, path:`, job.returnvalue);
    return job.returnvalue;
  }
  return null;
}

예제 흐름

  1. 메인 서버에서 requestPathFinding(start, end) 실행 → Job이 생성됨.
  2. 네비게이션 서버의 Worker가 Job을 처리하고 경로를 계산.
  3. 작업이 끝나면 job.returnvalue로 결과를 반환.
  4. 메인 서버가 getPathResult(jobId)를 통해 결과를 조회.

6. 이벤트 기반으로 Job 상태 모니터링

BullMQ는 QueueEvents를 제공하여 작업의 진행 상태를 감시할 수 있다.

import { QueueEvents } from "bullmq";
import IORedis from "ioredis";

const connection = new IORedis();
const queueEvents = new QueueEvents("navigationQueue", { connection });

queueEvents.on("completed", ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed with result:`, returnvalue);
});

queueEvents.on("failed", ({ jobId, failedReason }) => {
  console.error(`Job ${jobId} failed: ${failedReason}`);
});

7. BullMQ Dashboard 활용 (옵션)

BullMQ에는 시각적으로 큐 상태를 확인할 수 있는 대시보드가 있다.

npx bull-board --redis=localhost:6379

브라우저에서 http://localhost:3000으로 접속하면 큐 상태를 확인할 수 있다.


8. 최적화 및 확장

  • Job 제한: 너무 많은 요청이 발생하면 limiter를 설정하여 큐의 부담을 줄일 수 있다.
const navigationQueue = new Queue("navigationQueue", {
  connection,
  limiter: {
    max: 10, // 초당 10개의 작업만 실행
    duration: 1000, // 1초
  },
});

 

  • 다중 Worker 지원: 여러 개의 네비게이션 서버를 두고 분산 처리할 수도 있다.
new Worker("navigationQueue", async (job) => { /* 작업 */ }, { concurrency: 5 });

 

  • Job 우선순위: 중요한 요청을 먼저 처리하려면 priority 옵션을 사용할 수 있다.
await navigationQueue.add("findPath", { start, end }, { priority: 1 });

 

댓글