멀티스레드와 동시성

Ch12. 스레드 풀과 Executor 프레임워크 - Future

webmaster 2024. 10. 6. 00:37
728x90

Runnable과 Callable 비교

Runnable

package java.lang;
public interface Runnable {
	void run();
}
  • Runnable의 run() 은 반환 타입이 void이다. 따라서 값을 반환할 수 없다.
  • 예외가 선언되어 있지 않다. 따라서 해당 인터페이스를 구현하는 모든 메서드는 체크 예외를 던질 수 없다.
  • 참고로 자식은 부모의 예외 범위를 넘어설 없다. 부모에 예외가 선언되어 있지 않으므로 예외를 던질 없다.
  • 물론 런타임(비체크) 예외는 제외다.

Callable

package java.util.concurrent;
public interface Callable<V> {
	V call() throws Exception;
}
  • java.util.concurrent 에서 제공되는 기능이다.
  • Callable의 call() 은 반환 타입이 제네릭 V이다. 따라서 값을 반환할 수 있다.
  • throws Exception 예외가 선언되어 있다. 따라서 해당 인터페이스를 구현하는 모든 메서드는 체크 예외인 Exception과 그 하위 예외를 모두 던질 수 있다.

Callable과 Future 사용

실행 코드 - Future와 Callable 사용
실행 결과
Executors.newFixedThreadPool

  • java.util.concurrent.Executors 가 제공하는 newFixedThreadPool(size)을 사용하면 편리하게 ExecutorService를 생성할 수 있다.
  • MyCallable
    • 숫자를 반환하므로 반환할 제네릭 타입을 <Integer>로 선언했다.
    • 구현은 Runnable 코드와 비슷한데, 유일한 차이는 결과를 필드에 담아두는 것이 아니라, 결과를 반환한다는 이다. 따라서 결과를 보관할 별도의 필드를 만들지 않아도 된다.
  • ExecutorService 가 제공하는 submit()을 통해 Callable을 작업으로 전달할 수 있다.
  • MyCallable 인스턴스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 하나가 작업을 실행할 것이다. 이때 작업의 처리 결과는 직접 반환되는 것이 아니라 Future라는 특별한 인터페이스를 통해 반환된다.
  • future.get()을 호출하면 MyCallable의 call() 이 반환한 결과를 받을 수 있다.

참고로 Future.get() 은 InterruptedException , ExecutionException 체크 예외를 던진다. 여기서는 잡지 말고 간단하게 밖으로 던지자.

 

Executor 프레임워크의 강점
요청 스레드가 결과를 받아야 하는 상황이라면, Callable을 사용한 방식은 Runnable을 사용하는 방식보다 훨씬 편리하다. 코드만 보면 복잡한 멀티스레드를 사용한다는 느낌보다는, 단순한 싱글 스레드 방식으로 개발한다는 느낌이 들 것이다. 이 과정에서 내가 스레드를 생성하거나, join()으로 스레드를 제어하거나 한 코드는 전혀 없다. 심지어 Thread라는 코드도 없다. 단순하게 ExecutorService에 필요한 작업을 요청하고 결과를 받아서 쓰면 된다! 복잡한 멀티스레드를 매우 편리하게 사용할 수 있는 것이 바로 Executor 프레임워크의 큰 강점이다.

 

하지만 편리한 것은 편리한 것이고, 기반 원리를 제대로 이해해야 문제없이 사용할 수 있다. 여기서 잘 생각해 보면 한 가지 애매한 상황이 있다. future.get()을 호출하는 요청 스레드(main)는 future.get()을 호출했을 때 2가지 상황으로 나뉘게 된다.

  • MyCallable 작업을 처리하는 스레드 풀의 스레드가 작업을 완료했다.
  • MyCallable 작업을 처리하는 스레드 풀의 스레드가 아직 작업을 완료하지 못했다.

future.get()을 호출했을 스레드 풀의 스레드가 작업을 완료했다면 반환받을 결과가 있을 것이다. 그런데 아직 작업을 처리 중이라면 어떻게 될까? 이런 의문도 것이다. 결과를 바로 반환하지 않고, 불편하게 Future라는 객체를 대신 반환할까? 부분을 제대로 이해해야 한다.

Future 분석

Future<Integer> future = es.submit(new MyCallable());
  • submit()의 호출로 MyCallable의 인스턴스를 전달한다.
  • 이때 submit() 은 MyCallable.call() 이 반환하는 무작위 숫자 대신에 Future를 반환한다.
  • 생각해 보면 MyCallable 이 즉시 실행되어서 즉시 결과를 반환하는 것은 불가능하다. 왜냐하면 MyCallable 은 즉시 실행되는 것이 아니다. 스레드 풀의 스레드가 미래의 어떤 시점에 이 코드를 대신 실행해야 한다.
  • MyCallable.call() 메서드는 호출 스레드가 실행하는 것도 아니고, 스레드 풀의 다른 스레드가 실행하기 때 문에 언제 실행이 완료되어서 결과를 반환할지 알 수 없다.
  • 따라서 결과를 즉시 받는 것은 불가능하다. 이런 이유로 es.submit() 은 MyCallable의 결과를 반환하는 대 신에 MyCallable의 결과를 나중에 받을 수 있는 Futurue라는 객체를 대신 제공한다.

정리하면 Future는 전달한 작업의 미래이다. 이 객체를 통해 전달한 작업의 미래 결과를 받을 수 있다.

Future 확인 로그
실행 결과

실행 결과 분석

Future 확인 로그 - 초기 상태

  • MyCallable 인스턴스를 편의상 taskA라고 하겠다.
  • 편의상 스레드풀에 스레드가 1개 있다고 가정하겠다.
Future<Integer> future = es.submit(new MyCallable());

Future 확인 로그1 - 실행 결과 분석

  • submit()을 호출해서 ExecutorService에 taskA를 전달한다.

Future 확인 로그1 - 실행 결과 분석(future 생성)

  • 요청 스레드는 es.submit(taskA)를 호출하고 있는 중이다.
  • ExecutorService는 전달한 taskA의 미래 결과를 알 수 있는 Future 객체를 생성한다.
    • Future는 인터페이스이다. 이때 생성되는 실제 구현체는 FutureTask이다.
  • 그리고 생성한 Future 객체 안에 taskA 인스턴스를 보관한다.
  • Future 내부에 taskA 작업의 완료 여부와, 작업의 결과 값을 가진다.

Future 확인 로그1 - 실행 결과 분석(future 반환)

  • submit()을 호출한 경우 Future 가 만들어지고, 전달한 작업인 taskA 가 바로 블로킹 큐에 담기는 것이 아니라, 그림처럼 taskA를 감싸고 있는 Future 가 대신 블로킹 큐에 담긴다.

Future 확인 로그2 - 실행 결과 분석(요청 스레드 코드 실행)

  • 생성한 Future를 즉시 반환하기 때문에 요청 스레드는 대기하지 않고, 자유롭게 본인의 다음 코드를 호출할 수 있다.
  • 이것은 마치 Thread.start()를 호출한 것과 비슷하다. Thread.start()를 호출하면 스레드의 코드가 별도의 스레드에서 실행된다. 요청 스레드는 대기하지 않고, 즉시 다음 코드를 호출할 있다.

Future 확인 로그3 - 실행 결과 분석(Thread 작업 시작)
Future 확인 로그3 - 실행 결과 분석(Thread 작업 시작)

  • 큐에 들어있는 Future[taskA]를 꺼내서 스레드 풀의 스레드 1이 작업을 시작한다.
  • 참고로 Future의 구현체인 FutureTask는 Runnable 인터페이스도 함께 구현하고 있다.
  • 스레드 1은 FutureTask의 run() 메서드를 수행한다.
  • 그리고 run() 메서드가 taskA의 call() 메서드를 호출하고 그 결과를 받아서 처리한다.
    • FutureTask.run() -> MyCallable.call()

Future 확인 로그3 - 실행 결과 분석(Main Thread Blocking)
Future 확인 로그3 - 실행 결과 분석(Main Thread Blocking)

  • 스레드 1
    • 스레드 1은 taskA의 작업을 아직 처리 중이다. 아직 완료하지는 않았다.
  • 요청 스레드
    • 요청 스레드는 Future 인스턴스의 참조를 가지고 있다.
    • 그리고 언제든지 본인이 필요할 때 Future.get()을 호출해서 taskA 작업의 미래 결과를 받을 수 있다.
    • 요청 스레드는 작업의 결과가 필요해서 future.get()을 호출한다.
      • Future에는 완료 상태가 있다. taskA의 작업이 완료되면 Future의 상태도 완료로 변경된다.
      • 그런데 여기서 taskA의 작업이 아직 완료되지 않았다. 따라서 Future 도 완료 상태가 아니다.
    • 요청 스레드가 future.get()을 호출하면 Future 가 완료 상태가 될 때까지 대기한다. 이때 요청 스레드의 상태는 "RUNNABLE" -> "WAITING"이 된다.

future.get()을 호출했을 때

  • Future가 완료 상태: Future 가 완료 상태면 Future에 결과도 포함되어 있다. 이 경우 요청 스레드는 대기하 지 않고, 값을 즉시 반환받을 수 있다.
  • Future가 완료 상태가 아님: taskA 가 아직 수행되지 않았거나 또는 수행 중이라는 뜻이다. 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 대기해야 한다. 요청 스레드가 마치 락을 얻을 때처럼, 결과를 얻기 위해 대기한 다. 이처럼 스레드가 어떤 결과를 얻기 위해 대기하는 것을 블로킹(Blocking)이라 한다.

참고: 블로킹 메서드
Thread.join() , Future.get()과 같은 메서드는 스레드가 작업을 바로 수행하지 않고, 다른 작업이 완료될 때까지 기다리게 하는 메서드이다. 이러한 메서드를 호출하면 호출한 스레드는 지정된 작업이 완료될 때까지 블록(대기) 되어 다른 작업을 수행할 없다.

 

Future 확인 로그4 - 실행 결과 분석(Future 작업 완료)
Future 확인 로그4 - 실행 결과 분석(Future 작업 완료)

  • 요청 스레드
    • 대기(WAITING) 상태로 future.get()을 호출하고 대기 중이다.
  • 스레드 1
    1. taskA 작업을 완료한다.
    2. Future에 taskA의 반환 결과를 담는다. 
    3. Future의 상태를 완료로 변경한다.
    4. 요청 스레드를 깨운다. 요청 스레드는 "WAITING" -> "RUNNABLE" 상태로 변한다.

Future 확인 로그5 - 실행 결과 분석(요청 스레드 RUNNABLE)
Future 확인 로그5 - 실행 결과 분석(요청 스레드 RUNNABLE)

  • 요청 스레드
    • 요청 스레드는 "RUNNABLE" 상태가 되었다. 그리고 완료 상태의 Future에서 결과를 반환받는다. 참고로 taskA의 결과가 Future에 담겨있다.
  • 스레드 1
    • 작업을 마친 스레드 1은 스레드 풀로 반환된다. "RUNNABLE" -> "WAITING"

Future 확인 로그6 - 실행 결과 분석(Future 인스턴스 로그)
Future 확인 로그6 - 실행 결과 분석(Future 인스턴스 로그)

  • Future의 인스턴스인 FutureTask를 보면 "Completed normally"로 정상 완료된 것을 확인할 수 있다.

정리

Future<Integer> future = es.submit(new MyCallable());
  • Future는 작업의 미래 결과를 받을 수 있는 객체이다.
  • submit() 호출 시 future는 즉시 반환된다. 덕분에 요청 스레드는 블로킹되지 않고, 필요한 작업을 할 수 있다.
Integer result = future.get();

 

  • 작업의 결과가 필요하면 Future.get()을 호출하면 된다.
  • Future가 완료 상태: Future 가 완료 상태면 Future에 결과도 포함되어 있다. 이 경우 요청 스레드는 대기하 않고, 값을 즉시 반환받을 있다.
  • Future가 완료 상태가 아님: 작업이 아직 수행되지 않았거나 또는 수행 중이라는 뜻이다. 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 블로킹 상태로 대기해야 한다.

Future 활용

순수 스레드로 작성한 SumTask

실행 코드
실행 결과

  • 이미 앞서 처리해보았던 문제여서 크게 어려움은 없을 것이다.

Callable과 ExecutorService를 사용한 SumTask

실행 코드
실행 결과

  • ExecutorService와 Callable을 사용한 덕분에, 이전 코드보다 훨씬 직관적이고 깔끔하게 코드를 작성할 수 있다.
  • 특히 작업의 결과를 반환하고, 요청 스레드에서 그 결과를 바로 받아서 처리하는 부분이 매우 직관적이다. 코드만 보면 마치 멀티스레드를 사용하지 않고, 단일 스레드 상황에서 일반적인 메서드를 호출하고 결과를 받는 것처럼 느껴진다.
  • 그리고 스레드를 생성하고, Thread.join()과 같은 스레드를 관리하는 코드도 모두 제거할 수 있다.
  • 추가로 Callable.call() throws InterruptedException과 같은 체크 예외도 던질 있다.

Future 이유

Future 없이 결과를 직접 반환하는 코드(가정)

Integer sum1 = es.submit(task1); // 여기서 블로킹
Integer sum2 = es.submit(task2); // 여기서 블로킹
  • 실제로 동작하는 코드는 아니다. 이런 코드는 없다.

Future 없이 결과를 직접 반환 하는 코드(가정)

  • 먼저 ExecutorService 가 Future 없이 결과를 직접 반환한다고 가정해 보자.
  • 요청 스레드는 task1을 ExecutorService에 요청하고 결과를 기다린다.
    • 작업 스레드가 작업을 수행하는데 2초가 걸린다.
    • 요청 스레드는 결과를 받을 때까지 2초간 대기한다.
    • 요청 스레드는 2초 후에 결과를 받고 다음 라인을 수행한다.
    • 요청 스레드는 task2를 ExecutorService에 요청하고 결과를 기다린다.
    • 작업 스레드가 작업을 수행하는데 2초가 걸린다.
    • 요청 스레드는 결과를 받을 때까지 2초간 대기한다.
    • 결과를 받고 요청 스레드가 다음 라인을 수행한다.

Future를 사용하지 않는 경우 결과적으로 task1의 결과를 기다린 다음에 task2를 요청한다. 따라서 4초의 시간이 걸렸다. 이것은 마치 단일 스레드가 작업을 것과 비슷한 결과이다!

Future 반환하는 코드

Future<Integer> future1 = es.submit(task1); // 여기는 블로킹 아님 
Future<Integer> future2 = es.submit(task2); // 여기는 블로킹 아님

Integer sum1 = future1.get(); // 여기서 블로킹
Integer sum2 = future2.get(); // 여기서 블로킹

Future를 반환 하는 코드 - 1

  • 요청 스레드는 task1을 ExecutorService에 요청한다.
  • 요청 스레드는 즉시 Future를 반환받는다.
    • 작업 스레드 1은 task1을 수행한다.
  • 요청 스레드는 task2를 ExecutorService에 요청한다.
    • 요청 스레드는 즉시 Future를 반환받는다.
    • 작업 스레드 2는 taksk2를 수행한다.

요청 스레드는 task1 , task2를 동시에 요청할 있다. 따라서 작업은 동시에 수행된다.

Future를 반환 하는 코드 - 2

  • 이후에 요청 스레드는 future1.get()을 호출하며 대기한다.
    • 작업 스레드 1이 작업을 진행하는 약 2초간 대기하고 결과를 받는다.
  • 이후에 요청 스레드는 future2.get()을 호출하며 즉시 결과를 받는다.
    • 작업 스레드 2는 이미 2초간 작업을 완료했다. 따라서 future2.get() 거의 즉시 결과를 반환한다.

Future를 잘못 사용한 예

앞서 설명한 문제 상황과 같은 원리로 Future를 호출하자마자 바로 get()을호출해도 문제가 있다.

Future 잘못 활용한 예 1

Future<Integer> future1 = es.submit(task1); // non-blocking 
Integer sum1 = future1.get(); // blocking, 2초 대기

Future<Integer> future2 = es.submit(task2); // non-blocking 
Integer sum2 = future2.get(); // blocking, 2초 대기
  • 요청 스레드가 작업을 하나 요청하고 그 결과를 기다린다. 그리고 그다음에 다시 다음 요청을 전달하고 결과를 기다린다.
  • 총 4초의 시간이 걸린다.

Future를 잘못 활용한 예 2

Integer sum1 = es.submit(task1).get(); // get()에서 블로킹 
Integer sum2 = es.submit(task2).get(); // get()에서 블로킹
  • Future를 잘못 활용한 예 1과 똑같은 코드이다. 대신에 submit()을 호출하고 그 결과를 변수에 담지 않고 바로 연결해서 get()을 호출한다.
  • 총 4초의 시간이 걸린다

실제 Future를 잘못 사용한 실행 코드
실행 결과

  • 총 4초의 시간이 걸린다.

정리

  • Future라는 개념이 없다면 결과를 받을 때까지 요청 스레드는 아무 일도 못하고 대기해야 한다. 따라서 다른 작업을 동시에 수행할 수도 없다.
  • Future라는 개념 덕분에 요청 스레드는 대기하지 않고, 다른 작업을 수행할 수 있다. 예를 들어서 다른 작업을 더 요청할 수 있다. 그리고 모든 작업 요청이 끝난 다음에, 본인이 필요할 때 Future.get()을 호출해서 최종 결과를 받을 수 있다.
  • Future를 사용하는 경우 결과적으로 task1 , task2를 동시에 요청할 수 있다. 두 작업을 바로 요청했기 때문에 작업을 동시에 제대로 수행할 수 있다.

Future는 요청 스레드를 블로킹(대기) 상태로 만들지 않고, 필요한 요청을 모두 수행할 있게 해준다. 필요한 모든 요청을 다음에 Future.get()을 통해 블로킹 상태로 대기하며 결과를 받으면 된다. 이런 이유로 ExecutorService는 결과를 직접 반환하지 않고, Future를 반환한다.

Future 정리

 public interface Future<V> {
     boolean cancel(boolean mayInterruptIfRunning);
     boolean isCancelled();
     boolean isDone();
     V get() throws InterruptedException, ExecutionException;
     V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException;
     enum State {
         RUNNING,
         SUCCESS,
		FAILED,
		CANCELLED
    }
     default State state() {}
 }

 

주요 메서드

boolean cancel(boolean mayInterruptIfRunning)

  • 기능: 아직 완료되지 않은 작업을 취소한다.
  • 매개변수: mayInterruptIfRunning
    • cancel(true) : Future를 취소 상태로 변경한다. 이때 작업이 실행 중이라면 Thread.interrupt()를 호출해서 작업을 중단한다.
    • cancel(false) : Future를 취소 상태로 변경한다. 단 이미 실행 중인 작업을 중단하지는 않는다.
  • 반환값: 작업이 성공적으로 취소된 경우 true , 이미 완료되었거나 취소할 수 없는 경우 false
  • 설명: 작업이 실행 중이 아니거나 아직 시작되지 않았으면 취소하고, 실행 중인 작업의 경우 mayInterruptIfRunning이 true이면 중단을 시도한다.
  • 참고: 취소 상태의 Future에 Future.get()을 호출하면 CancellationException 런타임 예외가 발생한다.

boolean isCancelled()

  • 기능: 작업이 취소되었는지 여부를 확인한다.
  • 반환값: 작업이 취소된 경우 true , 그렇지 않은 경우 false
  • 설명: 이 메서드는 작업이 cancel() 메서드에 의해 취소된 경우에 true를 반환한다.

boolean isDone()

  • 기능: 작업이 완료되었는지 여부를 확인한다.
  • 반환값: 작업이 완료된 경우 true , 그렇지 않은 경우 false
  • 설명: 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에 true를 반환한다.

State state()

  • 기능: Future의 상태를 반환한다. 자바 19부터 지원한다.
    • RUNNING : 작업 실행 중
    • SUCCESS : 성공 완료
    • FAILED : 실패 완료
    • CANCELLED : 취소 완료

V get()

  • 기능: 작업이 완료될 때까지 대기하고, 완료되면 결과를 반환한다.
  • 반환값: 작업의 결과
  • 예외
    • InterruptedException : 대기 중에 현재 스레드가 인터럽트 된 경우 발생
    • ExecutionException : 작업 계산 중에 예외가 발생한 경우 발생
  • 설명: 작업이 완료될 때까지 get()을 호출한 현재 스레드를 대기(블록킹)한다. 작업이 완료되면 결과를 반환한다.

V get(long timeout, TimeUnit unit)

  • 기능: get()과 같은데, 시간 초과되면 예외를 발생시킨다.
  • 매개변수:
    • timeout : 대기할 최대 시간
    • unit: timeout 매개변수의 시간 단위 지정
  • 반환값: 작업의 결과
  • 예외:
    • InterruptedException : 대기 중에 현재 스레드가 인터럽트 된 경우 발생
    • ExecutionException : 계산 중에 예외가 발생한 경우 발생
    • TimeoutException : 주어진 시간 내에 작업이 완료되지 않은 경우 발생
  • 설명: 지정된 시간 동안 결과를 기다린다. 시간이 초과되면 TimeoutException을 발생시킨다.

Future 취소

Future Cancel 예제 실행 코드

  • 매개변수 mayInterruptIfRunning를 변경하면서 어떻게 작동하는지 차이를 확인해보자.
  • cancel(true) : Future를 취소 상태로 변경한다. 이때 작업이 실행 중이라면 Thread.interrupt()를 호출해서 작업을 중단한다.
  • cancel(false) : Future 취소 상태로 변경한다. 이미 실행 중인 작업을 중단하지는 않는다.

mayInterruptIfRunning = true 실행 결과

mayInterruptIfRunning = true

  • cancel(true)를 호출했다.
  • mayInterruptIfRunning=true 사용하면 실행 중인 작업에 인터럽트가 발생해서 실행중인 작업을 중지 시도한다.
  • 이후 Future.get() 호출하면 CancellationException 런타임 예외가 발생한다.

mayInterruptIfRunning = false 실행 결과

mayInterruptIfRunning = false

  • cancel(false)를 호출했다.
  • mayInterruptIfRunning=false 사용하면 실행중인 작업은 그냥 둔다. (인터럽트를 걸지 않는다.)
  • 실행중인 작업은 그냥 두더라도 cancel() 호출했기 때문에 Future "CANCEL" 상태가 된다.
  • 이후 Future.get()을 호출하면 CancellationException 런타임 예외가 발생한다

Future 예외

Future 에러 - 실행 코드
Future 에러 - 실행 결과

  • 요청 스레드: es.submit(new ExCallable())을 호출해서 작업을 전달한다.
  • 작업 스레드: ExCallable을 실행하는데, IllegalStateException 예외가 발생한다.
    • 작업 스레드는 Future에 발생한 예외를 담아둔다. 참고로 예외도 객체이다. 잡아서 필드에 보관할 수 있다.
    • 예외가 발생했으므로 Future의 상태는 "FAILED"가 된다.
  • 요청 스레드: 결과를 얻기 위해 future.get()을 호출한다.
    • Future의 상태가 "FAILED"면 ExecutionException 예외를 던진다.
    • 이 예외는 내부에 앞서 Future에 저장해 둔 IllegalStateException을 포함하고 있다.
    • e.getCause()을 호출하면 작업에서 발생한 원본 예외를 받을 수 있다.

Future.get() 작업의 결과 값을 받을 수도 있고, 예외를 받을 수도 있다. 마치 싱글 스레드 상황에서 일반적인 메서드를 호출하는 같다. Executor 프레임워크가 얼마나 설계되어 있는지 있는 부분이다.

728x90