스레드 풀
병렬 작업 처리가 많아지면 스레드 개수가 증가되고 그에 따른 스레드 생성과 스케줄링으로 인해 CPU가 바빠져 메모리 사용량이 늘어납니다. 따라서 어플리케이션의 성능이 저하됩니다.
갑작스런 병렬작업의 극대화로 인한 스레드 증폭을 막으려면 스레드 풀(Thread Pool)을 사용해야 합니다.
스레드 풀은 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해 놓고 작업 큐(Queue)에 들어오는 작업들을 하나씩 스레드가 맡아 처리합니다.
작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리합니다.
자바에서는 스레드 풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서 ExecutorService 인터페이스와 Excutors 클래스를 제공합니다. Executors의 다양한 정적 메소드를 이용해서 ExecutorService 구현 객체를 만들 수 있는데, 이것이 바로 스레드 풀입니다.
스레드 풀 생성 및 종료
스레드 풀 생성
ExecutorService 구현 객체는 Executors 클래스의 다음 두 가지 메소드 중 하나르 이용해서 간편하게 생성할 수 있습니다.
초기 스레드 수는 ExecutorService 객체가 생성될 때 기본적으로 생성되는 스레드 수를 말하고, 코어 스레드 수는 스레드 수가 증가된 후 사용되지 않는 스레드를 스레드 풀에서 제거할 때 최소한 유지해야 할 스레드 수를 말합니다. 최대 스레드 수는 스레드 풀에서 관리하는 최대 스레드의 개수입니다.
newCachedThreadPool() 메소드로 생성된 스레드 풀의 특징은 초기 스레드 수와 코어 스레드 수는 0개이고, 스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시켜 작업을 처리합니다. 이론적으로는 int 값이 가질 수 있는 최대값만큼 스레드가 추가되지만, 운영체제의 성능과 상황에 따라 달라집니다. 1개 이상의 스레드가 추가되었을 경우 60초 동안 추가된 스레드가 아무 작업을 하지 않으면 추가된 스레드를 종료하고 풀에서 제거합니다. 다음은 newCachedThreadPool() 메소드를 호출해서 ExecutorService 객체를 얻는 코드입니다.
1 2 | ExecutorService executorService = Executors.newCachedThreadPool(); | cs |
newFixedThreadPool(int nThreads) 메소드로 생성된 스레드 풀의 초기 스레드 개수는 0개이고, 코어 스레드 수는 nThreads입니다. 스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시키고 작업을 처리합니다. 최대 스레드 개수는 파라미터로 준 nThreads 입니다. 이 스레드 풀은 스레드가 작업을 처리하지 않고 놀고 있더라도 스레드 개수가 줄지 않습니다. 다음은 CPU 코어의 수만큼 최대 스레드를 사용하는 스레드 풀을 생성합니다.
1 2 3 | ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); | cs |
스레드 풀 종료
스레드 풀의 스레드는 기본적으로 데몬 스레드가 아니기 때문에 main 스레드가 종료되더라도 작업을 처리하기 위해 계속 실행 상태로 남아있습니다. 그래서 main() 메소드가 끝나도 어플리케이션 프로세스는 종료되지 않습니다.
어플리케이션 프로세스를 종료시키려면 스레드 풀을 종료시켜 스레드들이 종료 상태가 되도록 처리해야 합니다. ExecutorService는 종료와 관련해서 다음 세 개의 메소드를 제공하고 있습니다.
남아있는 작업을 마무리하고 스레드 풀을 종료할 때에는 shutdown()을 일반적으로 호출하고, 남아있는 작업과는 상관없이 강제 종료할 때에는 shutdownNow()를 호출합니다.
1 2 3 | executorService.shutdown(); executorService.shutdownNow(); | cs |
작업 생성과 처리 요청
작업 생성
하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현합니다. Runnable과 Callable의 차이점은 작업 처리 완료 후 리턴값이 있느냐 없느냐입니다.
Runnable의 run() 메소드는 리턴값이 없고, Callable의 call() 메소드는 리턴값이 있습니다. call()의 리턴 타입은 implements Callable<T> 에서 지정한 타입입니다. 스레드 풀의 스레드는 작업 큐에서 Runnable 또는 Callable 객체를 가져와 run() 과 call() 메소드를 실행합니다.
작업 처리 요청
작업 처리 요청이란 ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말합니다. ExecutorService는 작업 처리 요청을 위해 두 가지 메소드를 제공합니다.
execute()와 submit() 메소드의 차이점은 두 가지 입니다.
- execute()는 작업 처리 결과를 받지 못하지만, submit()은 작업 처리 결과를 Future 타입으로 리턴합니다.
- execute()는 작업 처리 중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드 풀에서 제거되고, 스레드 풀은 다른 작업 처리를 위해 새로운 스레드를 생성합니다.
- submit()은 작업 처리 중 예외가 발생하면 스레드는 종료되지 않고 다음 작업을 위해 재사용 됩니다.
다음 예제는 Runnable 작업을 정의할 때 Integer.parseInt("숫자")을 넣어서 NumberFormatException이 발생하도록 유도했습니다. 10개의 작업을 execute() 와 submit() 메소드로 각각 처리 요청했을 경우 스레드 풀의 상태를 살펴보도록 합시다. 먼저 execute() 메소드로 작업 처리 요청을 했을 경우입니다.
* ExecuteExam.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | package mT; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class ExecuteExam { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i++) { Runnable runnable = new Runnable() { @Override public void run() { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; int poolSize = threadPoolExecutor.getPoolSize(); String threadName = Thread.currentThread().getName(); System.out.println("[총 스레드 개수: " + poolSize + "] 작업 스레드 이름: " + threadName); int value = Integer.parseInt("숫자"); } }; executorService.execute(runnable); // executorService.submit(runnable); Thread.sleep(10); } executorService.shutdown(); } } | cs |
스레드 풀의 스레드 최대 개수는 변함이 없지만, 실행 스레드의 이름을 보면 모두 다른 스레드가 작업을 처리하고 있습니다. 이것은 작업 처리 도중 예외가 발생하여 해당 스레드는 제거되고 새 스레드를 생성했기 때문입니다.
이번에는 submit() 메소드로 작업 처리 요청을 한 경우입니다. 25번 라인을 주석 처리하고 26번 라인을 주석 제거해서 실행시키세요. 실행 결과를 보면 예외가 발생하더라도 스레드는 종료되지 않고 계속 재사용되어 다른 작업을 처리하고 있는 것을 볼 수 있습니다.
블로킹 방식의 작업 완료 통보
ExecutorService의 submit() 메소드는 파라미터로 준 Runnable 또는 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴합니다.
Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가 최종 결과를 얻는데 사용합니다. 그래서 Future는지연 완료(pending Completion) 객체라고 합니다. Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴합니다. 이것이 블로킹을 사용하는 작업 완료 통보 방식입니다.
리턴 타입인 V는 submit(Runnable task, V result) 의 두 번째 파라미터인 V 타입이거나 submit(Callable<V> task)의 Callable 타입 파라미터 V 타입입니다. 다음은 세 가지 submit() 메소드 별로 Future의 get() 메소드가 리턴하는 값이 무엇인지 보여줍니다.
Future를 이용한 블로킹 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료하기 전까지는 get() 메소드가 블로킹되므로 다른 코드를 실행할 수 없습니다. 다른 코드를 하는 스레드가 get() 메소드를 호출하면 작업을 완료하기 전까지 다른 코드를 처리할 수 없게 됩니다. 그렇기 때문에 get() 메소드를 호출하는 스레드는 새로운 스레드이거나 스레드 풀의 또 다른 스레드가 되어야 합니다.
Future 객체는 작업 결과를 얻기 위한 get() 메소드 이외에도 다음과 같은 메소드를 제공합니다.
리턴값이 없는 작업 완료 통보
리턴값이 없는 작업일 경우는 Runnable 객체로 생성하면 됩니다. 다음은 Runnable 객체를 생성하는 방법을 보여줍니다.
1 2 3 4 5 6 7 | Runnable task = new Runnable() { @Override public void run() { // Code } } | cs |
결과값이 없는 작업 처리 요청은 submit(Runnable task) 메소드를 이용하면 됩니다. 결과값이 없음에도 불구하고 다음과 같이 Future 객체를 리턴하는데, 이것은 스레드가 작업 처리를 정상적으로 완료했는지, 아니면 작업 처리 도중에 예외가 발생했는지 확인하기 위해서 입니다.
1 | Future future = executorService.submit(task); | cs |
작업 처리가 정상적으로 완료되었다면 Future의 get() 메소드는 null을 리턴하지만 스레드가 작업 처리 도중 인터럽트되면 InterruptedException을 발생시키고, 작업 처리 도중 예외가 발생하면 ExecutionException을 발생시킵니다. 그래서 다음과 같은 예외 처리 코드가 필요합니다.
1 2 3 4 5 6 7 | try { future.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } | cs |
다음 예제는 리턴값이 없고 단순히 1부터 10까지의 합을 출력하는 작업을 Runnable 객체로 생성하고 스레드 풀의 스레드가 처리하도록 요청한 것입니다.
* NoResultExam.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | package mT; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class NoResultExam { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); System.out.println("[작업 처리 요청]"); Runnable run = new Runnable() { @Override public void run() { int sum = 0; for (int i = 1; i <=10; i++) { sum += i; } System.out.println("[처리 결과] : " + sum); } }; Future future = executorService.submit(run); try { future.get(); System.out.println("[작업 처리 완료]"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } executorService.shutdown(); } } | cs |
리턴값이 있는 작업 완료 통보
스레드 풀의 스레드가 작업을 완료한 후에 어플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable로 생성하면 됩니다. 다음은 Callable 객체를 생성하는 코드인데, 주의할 점은 제네릭 타입 파라미터 T는 call() 메소드가 리턴하는 타입이 되도록 합니다.
1 2 3 4 5 6 7 8 | Callable<T> task = new Callable<T>() { @Override public T call() throws Exception { // Code return T; } }; | cs |
Callable 작업의 처리 요청은 Runnable 작업과 마친가지로 ExecutorService의 submit() 메소드를 호출하면 됩니다. submit() 메소드는 작업 큐에 Callable 객체를 저장하고 즉시 Future<T>를 리턴합니다. 이때 T는 call() 메소드가 리턴하는 타입입니다.
1 | Future<T> future = executorService.submit(task); | cs |
스레드 풀의 스레드가 Callable 객체의 call() 메소드를 모두 실행하고 T 타입의 값을 리턴하면, Future<T>의 get() 메소드는 블로킹이 해제되고 T 타입의 값을 리턴하게 됩니다.
1 2 3 4 5 6 7 | try { T result = future.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { | cs |
다음 예제는 1부터 10까지의 합을 리턴하는 작업을 Callable 객체로 생성하고, 스레드풀의 스레드가 처리하도록 요청한 것입니다.
* ResultByCallableExam.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | package mT; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ResultByCallableExam { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); System.out.println("[작업 처리 요청]"); Callable<Integer> task = new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; for (int i = 1; i <= 10; i++) { sum += i; } return sum; } }; Future<Integer> future = executorService.submit(task); try { int sum = future.get(); System.out.println("[처리 결과] : " + sum); System.out.println("[작업 처리 완료]"); } catch (Exception e) { e.printStackTrace(); } executorService.shutdown(); } } | cs |
작업 처리 결과를 외부 객체에 저장
상황에 따라서 스레드가 작업한 결과를 외부 객체에 저장해야 할 경우도 있습니다. 예를 들어 스레드가 작업 처리를 완료하고 외부 Result 객체에 작업 결과를 저장하면, 어플리케이션이 Result 객체를 사용해서 어떤 작업을 진행할 수 있을 것입니다. 대개 Result 객체는 공유 객체가 되어, 두 개 이상의 스레드 작업을 취합할 목적으로 이용됩니다.
이런 작업을 위해서는 ExecutorService의 submit(Runnable task, V result) 메소드를 사용할 수 있는데, V가 바로 Result 타입이 됩니다. 메소드를 호출하면 즉시 Future<V>가 리턴되는데, Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 V 타입 객체를 리턴합니다. 리턴된 객체는 submit() 의 두 번째 파라미터로 준 객체와 동일한데, 차이점은 스레드 처리 결과가 내부에 저장되어 있다는 것입니다.
1 2 3 4 5 | Result result = ...; Runnable task = new Task(result); Future<Result> future = executorService.submit(task, result); result = future.get(); | cs |
작업 객체는 Runnable 구현 클래스로 생성하는데 주의할 점은 스레드에서 결과를 저장하기 위해 외부 Result 객체를 사용해야 하므로 생성자를 통해 Result 객체를 주입받도록 해야합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 | class Task implements Runnable { Result result; Task (Result result) { this.result = result; } @Override public void run() { // Code } } | cs |
다음 예제는 1부터 10까지의 합을 계산하는 두 개의 작업을 스레드풀에 처리 요청하고, 각각의 스레드가 작업 처리를 완료한 후 산출된 값을 외부 Result 객체에 누적하도록 했습니다.
* ResultByRunnableExam.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | package mT; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ResultByRunnableExam { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); System.out.println("작업 처리 요청 !"); class Task implements Runnable { Result result; Task(Result result) { this.result = result; } @Override public void run() { int sum = 0; for (int i = 1; i <= 10; i++) { sum += i; } result.addValue(sum); } }; Result result = new Result(); Runnable task1 = new Task(result); Runnable task2 = new Task(result); Future<Result> future1 = executorService.submit(task1, result); Future<Result> future2 = executorService.submit(task2, result); try { result = future1.get(); result = future2.get(); System.out.println("처리 결과: " + result.accumValue); System.out.println("처리 완료"); } catch (Exception e) { e.printStackTrace(); } executorService.shutdown(); } } class Result { int accumValue; synchronized void addValue(int value) { accumValue += value; } } | cs |
작업 완료 순으로 통보
작업 요청 순서대로 작업 처리가 완료되는 것은 아닙니다. 작업의 양과 스케줄링에 따라서 먼저 요청한 작업이 나중에 완료되는 경우도 발생합니다.
스레드 풀에서 작업 처리가 완료된 것만 통보받는 방법이 잇는데, CompletionService를 이용하는 것 입니다. CompletionService는 처리 완료된 작업을 가져오는 poll()과 take() 메소드를 제공합니다.
CompletionService 구현 클래스는 ExecutorCompletionService<V> 입니다. 객체를 생성할 때 생성자 파라미터로 ExecutorService를 제공하면 됩니다.
1 2 3 4 5 6 7 8 | ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); CompletionService<V> completionService = new ExecutorCompletionService<V>( executorService ); | cs |
poll()과 take() 메소드를 이용해서 처리 완료된 작업의 Future를 얻으려면 CompletionService 의 submit() 메소드로 작업 처리 요청을 해야 합니다.
1 2 3 | completionService.submit(Callable<V> task); completionService.submit(Runnable task, V result); | cs |
다음은 take() 메소드를 호출하여 완료된 Callable 작업이 있을 때까지 블로킹 되었다가 완료된 작업의 Future를 얻고, get() 메소드로 결과값을 얻어내는 코드입니다. while문은 어플리케이션이 종료될 때까지 반복 실행해야 하므로 스레드 풀의 스레드에서 실행하는 것이 좋습니다.
1 2 3 4 5 6 7 8 9 10 11 | executorService.submit(new Runnable() { public void run() { while (true) { try { Future<Integer> future = completionService.take(); int value = future.get(); } catch (Exception e) { break; } } } }); | cs |
take() 메소드가 리턴하는 완료된 작업은 submit() 으로 처리 요청한 작업의 순서가 아님을 명심해야 합니다. 작업의 내용에 따라서 먼저 요청한 작업이 나중에 완료될 수 있기 때문입니다. 더 이상 완료된 작업을 가져올 필요가 없다면 take() 블로킹에서 빠져나와 while 문을 종료해야 합니다. ExecutorService의 shutdownNow()를 호출하면 take() 에서 InterruptedException이 발생하고 catch 절에서 break 가 되어 while문을 종료하게 됩니다.
다음 예제는 3개의 Callable 작업을 처리 요청하고 처리가 완료되는 순으로 작업의 결과값을 출력했습니다.
* CompletionServiceExam.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | package mT; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CompletionServiceExam { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); // CompletionService 생성 CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService); System.out.println("처리 요청"); for (int i = 0; i < 3; i++) { completionService.submit(new Callable<Integer>() { public Integer call() throws Exception { int sum = 0; for (int i = 1; i <= 10; i++) { sum += i; } return sum; } }); } System.out.println("처리 완료된 작업 확인"); executorService.submit(new Runnable() { public void run() { while (true) { try { Future<Integer> future = completionService.take(); int value = future.get(); System.out.println("처리 결과: " + value); } catch (Exception e) {break;} } } }); try { Thread.sleep(3000); } catch (InterruptedException e) {} executorService.shutdownNow(); } } | cs |
콜백 방식의 작업 완료 통보
이번에는 콜백(callback) 방식을 이용해서 작업 완료 통보를 받는 방법에 대해서 알아봅시다.
콜백이란 어플리케이션이 스레드에게 작업 처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법을 말합니다. 이때 자동 실행되는 메소드를 콜백 메소드(callback Method)라고 합니다. 다음은 블로킹 방식과 콜백 방식을 비교한 그림입니다.
콜백 방식은 작업 처리를 요청한 후 결과를 기다릴 필요 없이 다른 기능을 수행할 수 있습니다. 그 이유는 작업 처리가 완료되면 자동적으로 콜백 메소드가 실행되어 결과를 알 수 있기 때문입니다.
아래는 CompletionHandler를 이용해서 콜백 객체를 생성하는 코드입니다.
1 2 3 4 5 6 7 8 9 10 11 12 | CompletionHandler<V, A> callback = new CompletionHandler<V, A>() { @Override public void completed(V result, A attachment) { } @Override public void failed(Throwable exc, A attachment) { } } | cs |
CompletionHandler는 completed()와 failed() 메소드가 있습니다.
- completed(): 작업을 정상 처리 완료했을 때 호출되는 콜백 메소드입니다.
- failed(): 작업 처리 도중 예외가 발생했을 때 호출되는 콜백 메소드입니다.
다음 예제를 통해 세부적으로 알아봅시다.
* CallbackExam.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | package mT; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CallbackExam { private ExecutorService executorService; public CallbackExam() { executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); } private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { System.out.println("completed() 실행: " + result); } @Override public void failed(Throwable exc, Void attachment) { System.out.println("failed() 실행: " + exc.toString()); } }; public void doWork(final String x, final String y) { Runnable task = new Runnable() { public void run() { try { int intX = Integer.parseInt(x); int intY = Integer.parseInt(y); int result = intX + intY; callback.completed(result, null); } catch (NumberFormatException e) { callback.failed(e, null); } } }; executorService.submit(task); } public void finish() { executorService.shutdown(); } public static void main(String[] args) { CallbackExam exam = new CallbackExam(); exam.doWork("4", "7"); exam.doWork("4", "자바"); exam.finish(); } } | cs |