멀티스레드와 동시성

Ch09. 생산자 소비자 문제 - BlockingQueue

webmaster 2024. 8. 27. 19:29
728x90

BlockingQueue

BoundedQueue를 스레드 관점에서 보면 큐가 특정 조건이 만족될 때까지 스레드의 작업을 차단(blocking)한다.

  • 데이터 추가 차단: 큐가 가득 차면 데이터 추가 작업( put() )을 시도하는 스레드는 공간이 생길 때까지 차단된다.
  • 데이터 획득 차단: 큐가 비어 있으면 획득 작업( take() )을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단된다.

그래서 스레드 관점에서 이 큐에 이름을 지어보면 BlockingQueue라는 이름이 적절하다. 자바는 생산자 소비자 문제, 또는 한정된 버퍼라고 불리는 문제를 해결하기 위해 java.util.concurrent.BlockingQueue라는 인터페이스와 구현체들을 제공한다.

package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    boolean remove(Object o);
	//...
}
  • 주요 메서드만 정리했다.
  • 데이터 추가 메서드: add() , offer() , put() , offer(타임아웃)
  • 데이터 획득 메서드: take() , poll(타임아웃) , remove(..)
  • Queue를 상속받는다. 큐를 상속받았기 때문에 추가로 큐의 기능들도 사용할 있다.

BlockingQueue 인터페이스의 대표적인 구현체

  • ArrayBlockingQueue : 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.
  • LinkedBlockingQueue : 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도, 또는 무한하게 사용할 수도 있다.

참고: Deque 용 동시성 자료 구조인 BlockingDeque 도 있다. 동시성 자료 구조들은 뒤에서 다시 한번 설명한.

 

ArrayBlockingQueue 실행 코드

  • BlockingQueue.put(data) : 앞서 설명한 BoundedQueueV5.put()과 같은 기능을 제공한다.
  • BlockingQueue.take() : 앞서 설명한 BoundedQueueV5.take()와 같은 기능을 제공한다.

ArrayBlockingQueuqe.put() 코드

ArrayBlockingQueuqe.put() 구현 코드

  • 앞서 우리가 구현한 BoundedQueueV5와 비슷하게 구현되어 있다. ArrayBlockingQueue는 내부에서 ReentrantLock을 사용한다.
  • 그리고 생산자 전용 대기실과 소비자 전용 대기실이 있다. 만약 버퍼가 가득 차면 생산자 스레드는 생산자 전용 대기실에서 대기( await() )한다. 생산자 스레드가 생산을 완료하면 소비자 전용 대기실에 signal()로 신호를 전달한다.

ArrayBlockingQueuqe.take() 코드

ArrayBlockingQueuqe.take() 구현 코드

"생산자 먼저 실행" 실행 결과

  • 실행 결과는 앞서 만든 BoundedQueueV5와 같기 때문에 생산자 먼저 실행만 출력했다.
  • BlockingQueue의 구현체가 내부에서 모든 로그를 출력하지는 않기 때문에 로그의 양은 줄어들었다. 실제 기능은 BoundedQueueV5와 같다고 생각하면 된다.
  • 결과를 보면 모든 소비자는 자료를 소비했고, 큐에 데이터도 비어있는 것을 확인할 있다. 모든 스레드도 종료되었다.

BlockingQueue - 기능 설명

실무에서 멀티스레드를 사용할 때는 응답성이 중요하다. 예를 들어서 대기 상태에 있어도, 고객이 중지 요청을 하거나, 또는 너무 오래 대기한 경우 포기하고 빠져나갈 수 있는 방법이 필요하다. 생산자가 무언가 데이터를 생산하는데, 버퍼가 빠지지 않아서 너무 오래 대기해야 한다면, 무한정 기다리는 것보다는 작업을 포기하고, 고객분께는 "죄송합니다. 현재 시스템에 문제가 있습니다. 나중에 다시 시도해 주세요."라고 하는 것이 더 나은 선택일 것이다.

 

예를 들어서 생산자는 서버에 상품을 주문하는 고객일 있다. 고객이 상품을 주문하면, 고객의 요청을 생산자 스레드가 받아서 중간에 있는 큐에 넣어준다고 가정하자. 소비자 스레드는 큐에서 주문 요청을 꺼내서 주문을 처리하는 스레드이다. 만약에 선착순 할인 이벤트가 크게 성공해서 갑자기 주문이 폭주하면 주문을 만드는 생산자 스레드는 매우 바쁘게 주문을 큐에 넣게 된다. 큐의 한계가 1000개라고 가정하자. 생산자 스레드는 순간적으로 1000개가 넘는 주문을 큐에 담았다. 소비자 스레드는 한 번에 겨우 10개 정도의 주문만 처리할 수 있다. 이 상황에서 생산자 스레드는 계속 생산을 시도한다. 결국 소비가 생 산을 따라가지 못하고, 큐가 가득 차게 된다.

 

이런 상황이 되면 수많은 생산자 스레드는 큐 앞에서 대기하게 된다. 결국 고객도 응답을 받지 못하고 무한 대기하게 된다. 고객 입장에서 무작정 무한 대기하고 결과도 알 수 없는 상황이 가장 나쁜 상황일 것이다. 이렇게 생산자 스레드가 큐에 데이터를 추가할 때 큐가 가득 찬 경우, 또는 큐에 데이터를 추가하기 위해 너무 오래 대기한 경우에는 데이터 추가를 포기하고, 고객에게 주문 폭주로 너무 많은 사용자가 몰려서 요청을 처리할 수 없다거나, 또는 나중에 다시 시도해 달라고 하는 것이 더 나은 선택일 것이다.

 

큐가 가득 찼을 때 생각할 수 있는 선택지는 4가지가 있다.

  • 예외를 던진다.( 예외를 받아서 처리한다.)
  • 대기하지 않는다. (즉시 false 를 반환한다.)
  • 대기한다.
  • 특정 시간 만큼만 대기한다.

BlockingQueue 다양한 기능 - 공식 API 문서

Operation Throws Exception Special Value Blocks Times Out
Insert(추가) add(e) offer(e) put(e) offer(e, time(unit))
Remove(제거) remove() poll() take() poll(time(unit))
Examine(관찰) element() peek() not applicable not applicable
  • Throws Exception - 대기시 예외
    • add(e): 지정된 요소를 큐에 추가하며, 큐가 가득 차면 "IllegalStateException" 예외를 던진다.
    • remove(): 큐에서 요소를 제거하며 반환한다. 큐가 비어 있으면 "NoSuchElementException" 예외를 던진다.
    • element(): 큐의 머리 요소를 반환하지만, 요소를 큐에서 제거하지 않는다. 큐가 비어 있으면 "NoSuchElementException" 예외를 던진다.
  • Special Value - 대기시 즉시 반환
    • offer(e): 지정된 요소를 큐에 추가하려고 시도하며, 큐가 가득 차면 false 를 반환한다.
    • poll(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 null 을 반환한다.
    • peek(): 큐의 머리 요소를 반환하지만, 요소를 큐에서 제거하지 않는다. 큐가 비어 있으면 null 을 반환한다.
  • Blocks - 대기
    • put(e): 지정된 요소를 큐에 추가할 때까지 대기한다. 큐가 가득 차면 공간이 생길 때까지 대기한다.
    • take(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 요소가 준비될 때까지 대기한다.
    • Examine (관찰): 해당 사항 없음.
  • Times Out - 시간 대기
    • offer(e, time, unit): 지정된 요소를 큐에 추가하려고 시도하며, 지정된 시간 동안 큐가 비워지기를 기다리다가 시간이 초과되면 false 를 반환한다.
    • poll(time, unit): 큐에서 요소를 제거하고 반환한다. 큐에 요소가 없다면 지정된 시간 동안 요소가 준비되기를 기다리다가 시간이 초과되면 null 을 반환한다.
    • Examine (관찰): 해당 사항 없음.

BlockingQueue(즉시 반환)

실행 코드 - 즉시 반환

 

실행 코드 - Main
실행 결과

  • offer(data)는 성공하면 true를 반환하고, 버퍼가 가득 차면 즉시 false를 반환한다.
  • poll() 버퍼에 데이터가 없으면 즉시 null을 반환한다.
  • 버퍼가 가득 차있는 경우 데이터를 추가하지 않고 즉시 false를 반환한다.
  • 버퍼에 데이터가 없는 경우 대기하지 않고 null을 반환한다.

BlockingQueue(시간 대기)

실행 코드 - 시간 대기
실행 코드 - Main
실행 결과

  • offer(data, 시간)는 성공하면 true를 반환하고, 버퍼가 가득 차서 스레드가 대기해야 하는 상황이면, 지정한 시간까지 대기한다. 대기 시간을 지나면 false를 반환한다.
    • 여기서는 확인을 목적으로 1 나노초( NANOSECONDS )로 설정했다.
  • poll(시간) 버퍼에 데이터가 없어서 스레드가 대기해야 하는 상황이면, 지정한 시간까지 대기한다. 대기 시간을 지나면 null을 반환한다.
    • 여기서는 2( SECONDS ) 설정했다.
  • 생산을 담당하는 offer(data, 1 나노초) 메서드는 버퍼가 가득 경우1 나노초만큼 대기한 다음에 false 반한다.
  • 소비를 담당하는 poll(2) 메서드는 버퍼가 경우 2 만큼 대기한 다음에 null을 반환한다.
    • 여기서 consumer3 버퍼를 2초간 대기하다가 2 후에 null 반환받는다.

BlockingQueue(예외)

실행 코드 - 예외
실행 코드 - Main
실행 결과

  • add(data)는 성공하면 true를 반환하고, 버퍼가 가득 차면 즉시 예외가 발생한다.
    • java.lang.IllegalStateException: Queue full
  • remove()는 버퍼에 데이터가 없으면, 즉시 예외가 발생한다.
    • java.util.NoSuchElementException
  • 생산을 담당하는 add(data) 메서드는 버퍼가 가득 찬 경우 IllegalStateException 이 발생한다. 오류 메시지는 Queue full이다.
  • 소비를 담당하는 remove() 메서드는 버퍼가 경우 NoSuchElementException 발생한다.
728x90