멀티스레드와 동시성

Ch12. 스레드 풀과 Executor 프레임워크 - Executor 프레임워크와 ExecutorService

webmaster 2024. 9. 25. 23:16
728x90

Executor 프레임워크의 주요 구성 요소

Executor 인터페이스

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
  • 가장 단순한 작업 실행 인터페이스로, execute(Runnable command) 메서드 하나를 가지고 있다.

ExecutorService 인터페이스의 주요 메서드

public interface ExecutorService extends Executor, AutoCloseable {
     <T> Future<T> submit(Callable<T> task);
     
     @Override
     default void close(){
     	...
     }
	 
     ... 
}
  • Executor 인터페이스를 확장해서 작업 제출과 제어 기능을 추가로 제공한다.
  • 주요 메서드로는 submit() , close()가 있다.
  • 더 많은 기능이 있지만 나머지 기능들은 뒤에서 알아보자.
  • Executor 프레임워크를 사용할 때는 대부분 이 인터페이스를 사용한다.
  • ExecutorService 인터페이스의 기본 구현체는 ThreadPoolExecutor이다.

로그 출력 유틸리티 만들기

로그 출력 유틸리티

  • pool : 스레드 풀에서 관리되는 스레드의 숫자
  • active : 작업을 수행하는 스레드의 숫자
  • queuedTasks : 큐에 대기 중인 작업의 숫자
  • completedTask : 완료된 작업의 숫자
  • printState() 메서드에 ThreadPoolExecutor 구현체가 넘어오면 우리가 구성한 로그를 출력하고, 그렇지 않은 경우에는 인스턴스 자체를 출력한다.

참고로 ExecutorService 인터페이스는 getPoolSize() , getActiveCount() 같은 자세한 기능은 제공하지 않는다. 이 기능은 ExecutorService의 대표 구현체인 ThreadPoolExecutor를 사용해야 한다.

ExecutorService 살펴보기

실행 코드 - 1초간 대기하는 스레드 작업

  • Runnable 인터페이스를 구현한다. 1초의 작업이 걸리는 간단한 작업으로 가정하자.

실행 코드 - ExecutorService 실행하는 코드
실행 결과

  • ExecutorService의 가장 대표하는 구현체는 ThreadPoolExecutor이다.

ThreadPoolExecutor

 

  • ThreadPoolExecutor 구성요소
    • 스레드 풀: 스레드를 관리한다.
    • BlockingQueue : 작업을 보관한다. 생산자 소비자 문제를 해결하기 위해 단순한 큐가 아니라, BlockingQueue를 사용한다.
  • 생산자가 es.execute(new RunnableTask("taskA")) 를 호출하면, RunnableTask("taskA") 인스턴스가 BlockingQueue에 보관된다.
  • 생산자: es.execute(작업)를 호출하면 내부에서 BlockingQueue에 작업을 보관한다. main 스레드가 생산자가 된다.
  • 소비자: 스레드 풀에 있는 스레드가 소비자이다. 이후에 소비자 중에 하나가 BlockingQueue에 들어있는 작업을 받아서 처리한다.

ThreadPoolExecutor의 생성자는 다음 속성을 사용한다.

  • corePoolSize : 스레드 풀에서 관리되는 기본 스레드의 수
  • maximumPoolSize : 스레드 풀에서 관리되는 최대 스레드 수
  • keepAliveTime , TimeUnit unit : 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기 시간이다. 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
  • BlockingQueue workQueue : 작업을 보관할 블로킹 큐
new ThreadPoolExecutor(2,2,0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
  • 최대 스레드 수와 keepAliveTime , TimeUnit unit에 대한 부분은 뒤에서 따로 설명하겠다.
  • 여기서는 corePoolSize=2 , maximumPoolSize=2를 사용해서 기본 스레드와 최대 스레드 수를 맞추었다.
  • 따라서 풀에서 관리되는 스레드는 2개로 고정된다. keepAliveTime , TimeUnit unit는 0으로 설정했는데, 이 부분은 뒤에서 설명한다.
  • 작업을 보관할 블로킹 큐의 구현체로 LinkedBlockingQueue를 사용했다. 참고로 이 블로킹 큐는 작업을 무한대로 저장할 수 있다.

ExecutorService 실행 흐름 분석 - 초기 상태
출력 로그

  • ThreadPoolExecutor를 생성한 시점에 스레드 풀에 스레드를 미리 만들어두지는 않는다.

ExecutorService 실행 흐름 분석 - 1

  • main 스레드가 es.execute("taskA ~ taskD")를 호출한다.
    • 참고로 당연한 이야기지만 main 스레드는 작업을 전달하고 기다리지 않는다. 전달한 작업은 다른 스레드가 실행할 것이다. main 스레드는 작업을 큐에 보관까지만 하고 바로 다음 코드를 수행한다. taskA~D 요청이 블로킹 큐에 들어온다.
  • 최초의 작업이 들어오면 이때 작업을 처리하기 위해 스레드를 만든다.
    • 참고로 스레드 풀에 스레드를 미리 만들어두지는 않는다.
  • 작업이 들어올 때마다 corePoolSize의 크기까지 스레드를 만든다.
    • 예를 들어서 최초 작업인 taskA가 들어오는 시점에 스레드 1을 생성하고, 다음 작업인 taskB 가 들어오 는 시점에 스레드 2를 생성한다.
    • 이런 방식으로 corePoolSize에 지정한 수만큼 스레드를 스레드 풀에 만든다. 여기서는 2를 설정했으므로 2개까지 만든다.
    • corePoolSize까지 스레드가 생성되고 나면, 이후에는 스레드를 생성하지 않고 앞서 만든 스레드를 재사용한다.

ExecutorService 실행 흐름 분석 - 2
출력 로그

  • 스레드 풀에 관리되는 스레드가 2개이므로 (pool=2)
  • 작업을 수행 중인 스레드가 2개이므로 (active=2)
  • 큐에 대기 중인 작업이 2개이므로 (queuedTasks=2)
  • 완료된 작업은 없으므로 (completedTasks=0)

ExecutorService 실행 흐름 분석 - 3

  • 작업이 완료되면 스레드 풀에 스레드를 반납한다. 스레드를 반납하면 스레드는 대기("WAITING") 상태로 스레드 풀에 대기한다.
    • 참고로 실제 반납 되는 게 아니라, 스레드의 상태가 변경된다고 이해하면 된다.

ExecutorService 실행 흐름 분석 - 4

  • 반납된 스레드는 재사용된다.

ExecutorService 실행 흐름 분석 - 5

  • taskC , taskD의 작업을 처리하기 위해 스레드 풀에서 스레드를 꺼내 재사용한다.

ExecutorService 실행 흐름 분석 - 6
출력 로그

  • 작업이 완료되면 스레드는 다시 스레드 풀에서 대기한다.

ExecutorService 실행 흐름 분석 - 7
출력 로그

  • close()를 호출하면 ThreadPoolExecutor 가 종료된다. 이때 스레드 풀에 대기하는 스레드도 함께 제거된다.

참고: close()는 자바 19부터 지원되는 메서드이다. 만약 19 미만 버전을 사용한다면 shutdown()을 호출하자.

 

Runable의 불편함

Runnable 인터페이스의 불편함

public interface Runnable {
     void run();
}
  • 반환 값이 없다: run() 메서드는 반환 값을 가지지 않는다. 따라서 실행 결과를 얻기 위해서는 별도의 메커니즘을 사용해야 한다. 쉽게 이야기해서 스레드의 실행 결과를 직접 받을 수 없다. 앞에서 공부한 SumTask의 예를 생각해 보자. 스레드가 실행한 결과를 멤버 변수에 넣어두고, join() 등을 사용해서 스레드가 종료되길 기다린 다음에 멤버 변수를 통해 값을 받아야 한다.
  • 예외 처리: run() 메서드는 체크 예외(checked exception)를 던질 수 없다. 체크 예외의 처리는 메서드 내부에서 처리해야 한다.

실행 코드 - Runnable의 불편함
실행 결과

  • run(): 0 ~ 9 사이의 무작위 값을 조회한다. 작업에 2초가 걸린다고 가정한다.
  • 무작위 값이므로 숫자의 결과는 다를 수 있다.
  • 프로그램이 시작되면 Thread-1 이라는 별도의 스레드를 하나 만든다.
  • Thread-1 수행하는 MyRunnable 무작위 값을 하나 구한 다음에 value 필드에 보관한다.
  • 클라이언트인 main 스레드가 별도의 스레드에서 만든 무작위 값을 얻어오려면 Thread-1 스레드가 종료될 때까지 기다려야 한다. 그래서 main 스레드는 join() 호출해서 대기한다.
  • 이후에 main 스레드에서 MyRunnable 인스턴스의 value 필드를 통해 최종 무작위 값을 획득한다.

별도의 스레드에서 만든 무작위 값 하나를 받아오는 과정이 이렇게 복잡하다. 작업 스레드( Thread-1 )는 값을 어딘가에 보관해두어야 하고, 요청 스레드( main )는 작업 스레드의 작업이 끝날 때까지 join()을 호출해서 대기한 다음에, 어딘가에 보관된 값을 찾아서 꺼내야 한다. 작업 스레드는 간단히 값을 return을 통해 반환하고, 요청 스레드는 그 반환 값을 바로 받을 수 있다면 코드가 훨씬 간결해질 것이다.
이런 문제를 해결하기 위해 Executor 프레임워크는 Callable과 Future라는 인터페이스를 도입했다

728x90