안녕하세요 알통몬입니다. 공감 및 댓글은 포스팅 하는데 아주아주 큰 힘이 됩니다!! 포스팅 내용이 찾아주신 분들께 도움이 되길 바라며 더 깔끔하고 좋은 포스팅을 만들어 나가겠습니다^^ |
이번 포스팅은 지난 포스팅과 이어집니다.
http://altongmon.tistory.com/237
외부 객체에 작업 처리 결과 저장
: 스레드가 작업한 결과를 외부 객체에 저장할 경우도 생기는데요.
ex) 작업 처리를 완료 후 외부 Result 객체에 작업 결과를 저장하면
애플리케이션이 Result 객체를 사용해서 어떤 작업을 진행할 수 있음.
=> Result 객체는 공유 객체가 되고, 두 개 이상의 스레드 작업을 취할 목적.
이러한 작업을 위해서 ExecutorService의 submit(Runnable r, V Result) 를 사용할 수 있는데,
V가 Result 타입이 됨.
메소드를 호출하면 바로 Future<V>가 리턴, Future의 get()을 호출하면
스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 V 타입 객체를 리턴합.
리턴 객체는 submit()의 두 번째 매개값으로 준 객체와 동일한데,
차이점은 스레드 처리 결과가 내부에 저장되어 있다는 것입니다.
예제)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ResultByRunnableExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("작업 처리 요청");
class Task implements Runnable {
Result result;
Task(Result result) {
this.result = result;
}
@Override
public void run() {
int sum = 0;
for(int i=1; i<=10; i++) {
sum += i;
}
result.addValue(sum);
}
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = executorService.submit(task1, result);
Future<Result> future2 = executorService.submit(task2, result);
try {
result = future1.get();
result = future2.get();
System.out.println("처리 결과 " + result.accumValue);
System.out.println("작업 처리 완료");
} catch (Exception e) {
e.printStackTrace();
System.out.println("실행 예외 발생 " + e.getMessage());
}
executorService.shutdown();
}
}
class Result {
int accumValue;
synchronized void addValue(int value) {
accumValue += value;
}
}
작업 완료 순으로 통보 :
모든 작업이 요청한 순서대로 처리가 되는 것은 아닌데요.
작업의 양, 스레드 스케쥴링에 따라 먼저 요청한 작업이 나중에, 나중에 요청한 작업이 먼저
완료되는 경우도 존재합니다.
여러 작업이 순서대로 처리될 필요가 없고, 처리 결과 또한 순서가 필요없다면 작업 처리가
완료된 것부터 이용하면 됩니다.
그래서 스레드 풀에서는 작업 처리가 완료된 것만 통보하는 방법이 존재합니다.
바로 CompleteService를 이용하는 방법인데요.
CompleteService는 처리 완료된 작업을 가져오는 poll()과 take()를 제공
리턴타입 메소드명(매개변수) 설명
Future<V> poll() 완료된 작업이 있을 경우 Future를 가져오고,
완료된 작업이 없을 경우 곧바로 null 리턴
Future<V> poll(long timeout, TimeUnit unit) 완료된 작업이 있을 경우 Future를 가져오고,
완료된 작업이 없다며는 timeout까지 블로킹
Future<V> take() 완료된 작업이 있을 경우 Future를 가져오고,
완료된 작업이 앖다면 있을 때까지 블로킹
Future<V> submit(Callable<V> c) 스레드 풀에 Callable 작업 처리 요청
Future<V> submit(Callable<V> c, V result) 스레드 풀에 Runnable 작업 처리 요청
ExecutorCompletionService<V>는 CompletionService의 구현 클래스입니다.
객체 생성 시 생성자 매개값으로 ExecutorService를 제공하면 됨.
ExecutorService es = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProccesors()
);
CompletionService<V> cs = new ExecutorCompletionService<V>(
es
);
poll() 과 take()를 이용해 처리 완료된 작업의 Future를 얻으려면 CompletionService의
submit()로 작업 처리 요청을 해야합니다.
cs.submit(Callable<V> c);
cs.submit(Callable<V> c, V result);
아래의 코드는 take()를 호출하면 완료된 Callable 작업이 있을 때까지 블로킹되었다가,
완료된 작업의 Future를 얻고 get()으로 결과를 얻는 코드이비다.
while 문은 애플리케이션이 종료될 때까지 반복 실행해야하기 때문에
스레드 풀의 스레드에서 실행하는 것이 좋아요.
es.submit(new Runnable() {
@Override
public void run() {
while(true) {
try {
Future<Integer> future = completionService.take();
int val = future.get();
} catch(Exception e) {
break;
}
}
}
});
take()가 리턴하는 완료된 작업 : submit()으로 처리를 요청한 작업의 순서가 아님.
=> 작업의 내용에 따라 먼저 요청한 작업이 나중에 완료될 수 있기 때문.
완료된 작업을 더 이상 가져올 필요가 없다면 take() 블로킹에서 나와
while()문을 종료해야함.
catch 절에서 break가 되어 while문 종료.
예제)
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CompletionServiceExample extends Thread {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
CompletionService<Integer> completionService =
new ExecutorCompletionService<Integer>(executorService);
System.out.println("작업 처리 요청");
for(int i=0; i<3; i++) {
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i=1; i<=10; i++) {
sum += i;
}
return sum;
}
});
}
System.out.println("처리 완료된 작업 확인");
executorService.submit(new Runnable() {
@Override
public void run() {
while(true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("처리 결과 : " + value);
} catch (Exception e) {
break;
}
}
}
});
try { Thread.sleep(3000); } catch (InterruptedException e) {}
executorService.shutdownNow();
}
}