Node.js

[Node.js] RabbitMQ

hyun_ji 2021. 7. 25. 19:22
반응형
SMALL

목표 : RabbitMQ를 이용하여, 데이터를 임시로 저장하고 확인해봅시다.

 

 

 

RabbitMQ는 디비 또는 데이터 처리에 문제가 생겼을때 데이터를 임시로 처리해줄때 많이 사용합니다.

예를 들면, 채팅방에 한명은 실시간 접속 상태지만 한명은 접속 상태가 아닐때,

중간에 들어오면 채팅 내용이 보여야하는데 데이터가 느리게 들어오면 실시간 통신에 문제가 됩니다.

그래서 실시간 채팅은 항시, RabbitMQ 혹은 Redis 처럼 캐싱기능이 있는 저장소에 담을 필요가 있습니다.

 

우선적으로 RabbitMQ 서버를 설치하여, 실행시켜야합니다.

https://www.rabbitmq.com/#getstarted

 

완료되었으면 디렉토리를 생성하여 기본 노드 서버를 세팅합시다.

app.js

const express = require("express");
const app = express();
const cors = require("cors");
const dotenv = require('dotenv');
const { PORT  } = process.env;
dotenv.config();
const http_server = require('http')
.createServer(app)
.listen(PORT || 8081, () => {
  console.log('server on');
});

app.use(cors());
app.use(express.json());
app.use(express.urlencoded({ extended: false }));

db.sequelize
.authenticate()
.then(async () => {
  try{
    console.log('db connect ok');
    await db.sequelize.sync({force : false});
  }catch(err){
    console.log('err');
  }  
})
.catch(err => {
    console.log('db' + err);
});

RabbitMQ를 사용하기 위해 모듈을 설치합니다.

npm i amqplib

프로젝트에 필요한 다른 모듈들도 설치합니다.

package.json

{
  "main": "app.js",
  "version": "0.0.1",
  "name": "message",
  "dependencies": {
    "amqplib": "^0.8.0",
    "cors": "^2.8.5",
    "dotenv": "^10.0.0",
    "express": "^4.17.1",
    "mysql2": "^2.2.5",
    "nodemon": "^2.0.12",
    "sequelize": "^6.6.5"
  },
  "scripts": {
    "start": "nodemon app.js"
  }
}

RabbitMQ를 설치했으니, rabbitmq의 객체 클래스 정의 파일을 작성합니다.

 

rabbitmq.js

const amqp = require("amqplib");

class RabbitmqWrapper {
  constructor(url, queueName, options) {
    // 객체 초기화
    this._url = url;
    this._queueName = queueName;
    this._options = options || {};

    // public
    this.channel = undefined;
    this.queue = undefined;
  }

  // 커넥트 생성하고 채널 연결
  async setup() {
    const connect = await amqp.connect(this._url); //mysqlconnect
    const channel = await connect.createChannel(); //mysql-database
    this.channel = channel;
  }

  // 채널에다가 queue 만들어주기 queue는 메세지를 수신 받을 수 있는 이름
  async assertQueue() {
    const queue = await this.channel.assertQueue(this._queueName, {
      durable: false, // false는 볼 때까지 보관, true는 일정시간이 지나면 사라짐
    });
    this.queue = queue;
  }

  // queue에 데이터보내기
  async sendToQueue(msg) {
    const sending = await this.channel.sendToQueue(
      this._queueName,
      this.encode(msg),
      {
        persistent: true,
      }
    );
    return sending;
  }

    // queue에 있는 데이터 가져오기
  async recvFromQueue() {
    const message = await this.channel.get(this._queueName, {});
    if (message) {
      this.channel.ack(message);
      console.log(message.content);
      console.log(message.content.toString())
      return message.content.toString();
    } else {
      return null;
    }
  }

  // 문자를 Buffer로 바꿈
  encode(doc) {
    return Buffer.from(JSON.stringify(doc));
  }

  // 메세지보내기
  async send_message(msg) {
    await this.setup(); //레빗엠큐 연결
    await this.assertQueue(); //큐생성
    await this.sendToQueue(msg); //생성큐메세지전달
  }

  // 메세지 가져오기
  async recv_message() {
    await this.setup();
    return await this.recvFromQueue();
  }
}

module.exports = RabbitmqWrapper;

위에서 작성한 rabbitmq의 객체 클래스 정의 파일을 사용하기 위해 파일을 생성하고 api를 선언합니다.

간단히 메세지를 주고받는 로직만 작성해보도록 하겠습니다.

 

rabbitmq-api.js

const Rabbitmq = require("./rabbitmq");
const url = "amqp://localhost"; //rabbitmq url
const queue = "web_msg"; //임시 queue이름이고 필요한 상황에 맞게 이름 따로 지정해줘야 한다.
module.exports = {
  send_message: async (req, res) => {
    try {
      let { msg } = req.body;
      const conn = new Rabbitmq(url, queue);

      await conn.send_message(msg);
      res.status(200).json({ result: true });
    } catch (error) {
      console.log(error);
    }
  },
  recv_message: async (req, res) => {
    try {
      const conn = new Rabbitmq(url, queue);
      const msg = await conn.recv_message();
      res.status(200).json({ result: msg });
    } catch (error) {}
  },
};

작성한 api를 app.js에 불러와 라우트 주소을 지정해주고 postman으로 잘 동작하는지 확인해봅시다.

 

app.js

const mq = require("./rabbitmq-api");

app.post("/send_msg", mq.send_message);
app.get("/get_msg", mq.recv_message);

postman

잘작동하네요.

 

참고로 문자를 Buffuer로 바꿔주는 로직이 아래였다면,

 // 문자를 Buffer로 바꿈
  encode(doc) {
    return Buffer.from(JSON.stringify(doc));
  }

Buffer를 문자를 바꿔주는 로직은 아래의 message.content.toString();에 해당합니다.

  async recvFromQueue() {
    const message = await this.channel.get(this._queueName, {});
    if (message) {
      this.channel.ack(message);
      console.log(message.content);
      console.log(message.content.toString())
      return message.content.toString();
    } else {
      return null;
    }
  }

console을 통해 변환되는 것을 확인할 수 있습니다.

 

http://localhost:15672/

로 RabbitMQ 관리페이지에 접속할 수 있습니다. 아이디 비밀번호는 guest입니다.

선언했었던 queue의 이름인 "web_msg"가 목록에 추가되어있는게 보일겁니다.

클릭하면 

이러한 페이지가 나올건데요.

send_msg로 메세지를 추가적으로 몇개 넣어놓고 확인해보면

Total 개수를 보면 몇개 쌓여있는지 확인할 수 있습니다.

밑에 Get Messages를 펼쳐보면 Get Messages버튼이 있습니다.

Messages 옵션에서 볼 메세지 개수를 지정해주고

Get Messages버튼을 클릭해보면 쌓여있는 메세지들을 확인할 수 있어요.

get_msg를 사용하여 메세지들을 확인하면

총 메세지들이 사라진걸 확인할 수 있습니다.

 

이렇게 임시적으로 데이터를 저장하고 확인해보는 시간을 가졌습니다.

 

추가적으로, 디비나 데이터에 문제가 생겨서 RabbitMQ에 저장하는 과정을 눈으로 확인해봐요.

rabbitmq-api.js에 이러한 코드를 추가해주세요.

const {chatting} = require("./model/chatting");
const db = require('./model'); 

module.exports = {
test : async (req, res) => {
    try{
      let { msg } = req.body;
      const rows = await db.chatting.create({
        chatting : msg
      })
      return res.status(200).json({result : "dbtrue"});  //디비가 멀쩡할때의 결과
    }catch(err){
      if(err.name.includes('SequelizeDatabaseError')) {
      let {msg} = req.body
      const conn = new Rabbitmq(url, queue);    
      await conn.send_message(msg);
      res.status(200).json({ result: "mqtrue" }); //디비에 문제있을때의 결과
      }
    }
  }
};

시퀄라이즈를 이용해서 생성한 chatting 테이블을 불러왔습니다.

테이블은 아래처럼 작성했습니다.

chatting colum의 데이터타입 STRING을 글자 수 제한을 1글자로 지정합니다. 그러면 1글자 넘어갔을때 디비에 에러가 생기겠죠.

chatting.js

module.exports = (sequelize, DataTypes) => {
  const chatting = sequelize.define(
    "chatting",
    {
      idx: {
        type:DataTypes.INTEGER,
        primaryKey: true,
        allowNull: false,
        autoIncrement : true
      },
      chatting: {
        type:DataTypes.STRING(1),
        allowNull: false,
      },
    },
    {
      freezeTableName: true,
      timestamps: true,
      comment: '채팅로그',
    }
    );
  return chatting;
};

이제 app.js에 라우트 주소를 지정해주고 포스트맨으로 확인합니다.

app.post("/test", mq.test);

디비에 문제있을때의 결과인 mqtrue가 결과값으로 나왔습니다.

이렇게 디비에 문제가 생기면 RabbitMQ에 데이터가 들어가는 로직이 완성되었습니다.

 

전체 코드는 여기서 확인하세요↓

https://github.com/fkwsur/rabbit-mq-format

반응형
LIST