Spring WebFlux로 Chatting service 만들기

WebFlux란?

비동기 non-blocking 메세지 처리가 가능하며, 반응형 서비스를 만들 때 유용하다.

HTTP와 다르게 연결성이기 때문에    한번 연결한 이후에는 따로 요청을 보내지 않아도 서버에서 응답을 보내면 데이터가 들어온다.

netty서버에서 실행이 가능하며, SSE 프로토콜을 사용한다.

 

Non-blocking vs Async

non-blocking: 작업이 완료되지 않더라도 다른 작업을 처리할 수 있음

ex) 클라이언트가 서버로 메세지 전송 후 응답을 받지 않더라도 다음 작업을 진행할 수 있다.

 

async: 작업이 완료되어야 다음 작업으로 넘어가지만, 작업을 처리하는 동안 다른 적업이 실행될 수 있음
ex) 메세지 전송 후 응답을 받고 다음 작업으로 넘어간다.

 

Flux와 Mono의 차이

Flux는 0~N개의 데이터를 전달하므로 return 값이 여러 개 일 때 사용한다. ex) 채팅 목록 불러오기.

Mono는 0~1개의 데이터를 전달하므로 return 값이 1개 일 때 사용한다.  ex) POST 요청을 보냈을 때

 

Chat Class

package com.example.chatapp.domain;

import lombok.Data;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;

@Data
@Document(collection = "chat")
public class Chat {
    @Id
    private ObjectId id;
    private int senderId;
    private ObjectId roomId;
    private String msg;
    private boolean isRead = false;
    private LocalDateTime createdAt;
}

@Data : lombok 라이브러리로 getter, setter, constructor, equals, toString 등을 사용할 수 있다.

@Document : MongoDB의 chat collection으로 등록 

@Id : id를 PK로 지정, MongoDB Id값은 ObjectId라는 타입을 가진다. 

 

Room Class

package com.example.chatapp.domain;

import lombok.Data;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;


@Data
@Document(collection = "room")
public class Room {
    @Id
    private ObjectId _id;
    private String title;
    private int postId;
    private boolean isSold = false;
    private User[] users = new User[2];
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
}

 

User Class

package com.example.chatapp.domain;

import lombok.Data;

@Data
public class User {
    private int userId;
    private boolean isAttendance = true;
}

 

ChatService

package com.example.chatapp.Service;

import com.example.chatapp.domain.Chat;
import com.example.chatapp.domain.Room;
import com.mongodb.client.result.UpdateResult;
import lombok.AllArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;


@Service
@AllArgsConstructor
public class ChatService {

    private final ReactiveMongoTemplate mongoTemplate;

    /**채팅 내역 불러오기*/
    public Flux<Chat> findChatsByRoomId(ObjectId roomId){
        return mongoTemplate.find(
                Query.query(Criteria. where("roomId").is(roomId))
                        .with(Sort.by(Sort.Direction.ASC, "createdAt")), Chat.class);
    }

    /**채팅 전송*/
    public Mono<Chat> createChat(Chat chat){
        return mongoTemplate.insert(chat, "chat");
    }

    /**채팅방 마지막 채팅 시간 업데이트*/
    public Mono<UpdateResult> updateTime(ObjectId roomId){
        Query query = new Query(Criteria.where("_id").is(roomId));
        Update update = Update.update("updatedAt", LocalDateTime.now());

        return mongoTemplate.updateFirst(query, update, Room.class);
    }

    /**메세지 읽음 표시*/
    public Mono<UpdateResult> updateIsRead(ObjectId roomId, Integer userId) {
        Query query = new Query(Criteria.where("roomId").is(roomId)
                                    .and("senderId").ne(userId));
        Update update = Update.update("isRead", true);

        return mongoTemplate.updateMulti(query, update, Chat.class);
    }
}

 

MongoTemplate 사용법:  https://www.baeldung.com/spring-data-mongodb-tutorial

Spring MongoDB: https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#mongo.reactive.template

MongoDB manual: https://www.mongodb.com/docs/manual/tutorial/getting-started/

RoomService

package com.example.chatapp.Service;

import com.example.chatapp.domain.Room;
import com.mongodb.client.result.UpdateResult;
import lombok.AllArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
@AllArgsConstructor
public class RoomService {

    private final ReactiveMongoTemplate mongoTemplate;


    /**채팅방 목록 마지막 대화 순서대로 불러오기*/
    public Flux<Room> findListByUserId(Integer userId){
        return mongoTemplate.find(
                Query.query(Criteria. where("users.userId").is(userId)
                                .and("users.isAttendance").is(true))
                        .with(Sort.by(Sort.Direction.DESC, "updatedAt")), Room.class);
    }

    /**채팅방 생성*/
    public Mono<Room> createChatRoom(Room room){
        return mongoTemplate.save(room, "room");
    }

    /**채팅방 나가기*/
    public Mono<UpdateResult> updateIsAttendance(ObjectId roomId, Integer userId){
        Query query = new Query(Criteria.where("_id").is(roomId)
                .and("users.userId").in(userId));

        Update update = new Update();
        update.set("users.$.isAttendance", false);

        return mongoTemplate.updateFirst(query, update, Room.class);
    }

    /**게시글 판매완료*/
    public Mono<UpdateResult> updateIsSold(Integer postId) {
        Query query = new Query(Criteria.where("postId").is(postId));
        Update update = Update.update("isSold", true);

        return mongoTemplate.updateMulti(query, update, Room.class);
    }

    /**게시글 제목 변경*/
    public Mono<UpdateResult> updateTitle(Integer postId, String title){
        Query query = new Query(Criteria.where("postId").is(postId));
        Update update = Update.update("title", title);

        return mongoTemplate.updateMulti(query, update, Room.class);
    }
}

ChatController

package com.example.chatapp.controller;

import com.example.chatapp.Service.ChatService;
import com.example.chatapp.domain.Chat;
import com.mongodb.client.result.UpdateResult;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.bson.types.ObjectId;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.LocalDateTime;


@RequiredArgsConstructor
@RestController
@Slf4j
@RequestMapping("/chat-service/chat")
public class ChatController {
    private final ChatService chatService;

    //GET Method

    /**채팅목록 불러오기*/
    @CrossOrigin
    @GetMapping(value = "/list/room/{roomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Chat> findByRoomId(@PathVariable ObjectId roomId){
        log.info("request: load chat list");
        return chatService.findChatsByRoomId(roomId).subscribeOn(Schedulers.boundedElastic());
    }

    //POST Method

    /**메세지 전송*/
    @CrossOrigin
    @PostMapping("/")
    public Mono<Chat> sendMsg(@RequestBody Chat chat){
        log.info("request: send message");
        chat.setCreatedAt(LocalDateTime.now());
        return chatService.createChat(chat);
    }

    //PUT Method

    /**메세지 읽음 요청*/
    @CrossOrigin
    @PutMapping(value = "/read/room/{roomId}/user/{userId}")
    public Mono<UpdateResult> updateIsRead(@PathVariable ObjectId roomId, @PathVariable Integer userId){
        log.info("request: read message");
        return chatService.updateIsRead(roomId, userId);
    }

    /**채팅 마지막 채팅 날짜 업데이트 요청*/
    @CrossOrigin
    @PutMapping(value = "/update/room/{roomId}")
    public Mono<UpdateResult> updateUpdatedAt(@PathVariable ObjectId roomId){
        log.info("request: update updatedAt");
        return chatService.updateTime(roomId);

    }
}

 

RoomController

package com.example.chatapp.controller;

import com.example.chatapp.Service.RoomService;
import com.example.chatapp.domain.Room;
import com.mongodb.client.result.UpdateResult;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.bson.types.ObjectId;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.LocalDateTime;



@RequiredArgsConstructor
@RestController
@Slf4j
@RequestMapping("/chat-service/room")
public class RoomController {

    private final RoomService roomService;

    //GET Method

    /**채팅방 목록 불러오기*/
    @CrossOrigin
    @GetMapping(value = "/list/user/{userId}")
    public Flux<Room> findListByUserId(@PathVariable Integer userId){
        //데이터를 계속 보내줘야 되면 "chat"처럼 스레드 사용 고려
        log.info("request: load room list");
        return roomService.findListByUserId(userId).subscribeOn(Schedulers.boundedElastic());
    }

    //POST Method

    /**채팅방 생성*/
    @CrossOrigin
    @PostMapping(value = "/new")
    public Mono<Room> createRoom(@RequestBody Room room){
        log.info("request: create room");
        room.setCreatedAt(LocalDateTime.now());
        room.setUpdatedAt(LocalDateTime.now());
        return roomService.createChatRoom(room);
    }

    //PUT Method

    /**채팅방 나가기*/  //ObjectId는 Object("")를 빼고 쌍따옴표 안의 값만 전송
    @CrossOrigin
    @PutMapping(value = "/exit/room/{roomId}/user/{userId}")
    public Mono<UpdateResult> completedTransaction(@PathVariable ObjectId roomId, @PathVariable Integer userId){
        log.info("request: exit room");
        return roomService.updateIsAttendance(roomId, userId);
    }

    /**채팅방 해당 장터글 거래완료*/
    @CrossOrigin
    @PutMapping(value = "/sold-out/post/{postId}")
    public Mono<UpdateResult> completedTransaction(@PathVariable Integer postId){
        log.info("request: sold-out");
        return roomService.updateIsSold(postId);
    }

    /**채팅방 제목 업데이트(장터글 제목 변경 시)*/
    @CrossOrigin
    @PutMapping(value = "/update-title/post/{postId}/title/{title}")
    public Mono<UpdateResult> updateTitle(@PathVariable Integer postId, @PathVariable String title){
        log.info("request: update title");
        return roomService.updateTitle(postId, title);
    }
}

 

MongoConfig

package com.example.chatapp.config;

import lombok.AllArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.convert.DbRefResolver;
import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;

@Configuration
@AllArgsConstructor
public class MongoConfig {

    MongoDatabaseFactory mongoDbFactory;
    MongoMappingContext mongoMappingContext;

    @Bean
    public MappingMongoConverter mappingMongoConverter() {

        DbRefResolver dbRefResolver = new DefaultDbRefResolver(mongoDbFactory);
        MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, mongoMappingContext);
        converter.setTypeMapper(new DefaultMongoTypeMapper(null));

        return converter;
    }
}

mongoDB 문서와 java객체 간의 맵핑 및 변환을 위해 MappingMongoConverter 등록

 

 

코딩 중 나왔던 에러

1. connet ENCONNREFUSED 127.0.0.1:27017   

해결: https://www.mongodb.com/community/forums/t/connect-econnrefused-127-0-0-1-27017-in-mongodb-compass/166773

 

Connect ECONNREFUSED 127.0.0.1:27017 in Mongodb Compass

I restarted my computer to try to update it, and before restarting, I was also working on another project. Now, my project with a node.js backend is giving me this error: reason: TopologyDescription { type: 'Single', setName: null, maxSetVersion: null, max

www.mongodb.com

 

2. Tailable Error 

원인: https://dalsacoo-log.tistory.com/entry/Spring-Data-MongoDB-Tailable-Cursors

 

Spring Data MongoDB Tailable Cursors (MongoDB 테일러 커서)

Introduction Spring Data MongoDB의 맞춤식 커서를 활용해 MongoDB를 infinite(무한한) data stream으로 사용하는 방법에 대해서 알아보겠습니다. Tailable Cursors 쿼리를 실행할 때, db driver는 커서를 열어 일치하는

dalsacoo-log.tistory.com

해결: https://jackjeong.tistory.com/176

 

[MongoDB] Ch6 - 특수 인덱스와 컬렉션 유형

6장에서는 아래와 같은 내용을 알아보자 큐 같은 데이터를 위한 제한 컬렉션 캐시를 위한 TTL 인덱스 단순 문자열 검색을 위한 전문 인덱스 2D 구현 및 구면 기하학을 위한 공간 정보 인덱스 대용

jackjeong.tistory.com

 

 

+ MongoDB 설치

https://www.mongodb.com/docs/manual/installation/

 

Install MongoDB — MongoDB Manual

Docs Home → MongoDB Manual MongoDB is available in two server editions: Community and Enterprise.MongoDB AtlasMongoDB Atlas is a hosted MongoDB service option in the cloud which requires no installation overhead and offers a free tier to get started.This

www.mongodb.com

 

'개발 > Spring' 카테고리의 다른 글

[JSP] Cookie와 Session  (0) 2023.04.18
REST Docs와 Swagger ui  (0) 2023.04.11
Java MVC 패턴을 이용한 간단한 미니 쿠팡 만들기  (0) 2023.04.04
DAO, DTO, VO  (0) 2023.03.16
Eureka Server & Spring Cloud Gateway  (0) 2023.02.26