728x90
[Java] CompletableFuture를 활용한 비동기 프로그래밍
1. 기존 Future의 한계
// ExecutorService + Future 사용 예시
ExecutorService exec = Executors.newFixedThreadPool(4);
Future<String> future = exec.submit(() -> {
simulate(); // 작업 수행
return "서버 응답 데이터";
});
String result = future.get(); // 블로킹: 호출 스레드가 여기서 멈춰 대기
// Future 작업이 완료 된 후 이후 로직 실행
- 블로킹 호출: Future.get() 호출 시 작업이 끝날 때까지 스레드가 멈춥니다.
- 콜백 구성 번거로움: 작업 완료 후 후속 로직을 연결하려면 또 다른 스레드를 직접 생성하거나 수동으로 관리해야 합니다.
- 예외 처리 미흡: 복수의 Future 조합, 예외 흐름 추적이 어렵습니다.
2. CompletableFuture 등장 배경
- 함수형 프로그래밍: Java 8의 람다·스트림 스타일을 비동기 영역에도 적용
- 비블로킹 콜백: thenApply, thenAccept 등으로 결과가 준비될 때 자동 실행
- 연쇄 & 합성: thenCompose, thenCombine으로 복잡한 비동기 워크플로우 선언적 구성
- 예외 전파 & 처리: exceptionally, handle 등으로 흐름 내에서 예외 복구
- 경량 스레드풀: 기본 ForkJoinPool.commonPool() 사용, 필요시 커스텀 Executor 지정 가능
3. CompletableFuture 주요 메서드 정리
- supplyAsync(Supplier<U>)
- 비동기 작업을 스레드풀에 제출하고 결과를 반환하는 CompletableFuture<U>를 생성합니다.
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
// I/O 작업 예시
simulateIo();
return "데이터";
});
- supplyAsync(Supplier<U>, Executor)
- 지정한 Executor를 사용하여 비동기 작업을 실행합니다.
ExecutorService pool = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(
() -> fetchFromDB(), pool
);
- runAsync(Runnable)
- 반환값 없이 비동기 작업을 실행하며, CompletableFuture<Void>를 반환합니다.
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
// 리턴값 없는 작업 예시
cleanUpTempFiles();
});
- thenApply(Function<? super T,? extends U>)
- 이전 단계 결과를 받아 변환 후 새 CompletableFuture<U>를 반환합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> {
simulate();
return "데이터";
}, exec);
// 이전 CompletableFuture의 반환 값으로 새로운 CompletableFuture 객체 생성
CompletableFuture<String> cf2 = cf1.thenApply(data -> data.toUpperCase());
- thenAccept(Consumer<? super T>)
- 이전 단계 결과를 소비(처리)하며, 후속 CompletableFuture<Void>를 반환합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> {
simulate();
return "데이터";
}, exec);
// 이전 CompletableFuture의 반환 값으로 소비(처리) 후 새로운 CompletableFuture<Void> 반환
CompletableFuture<Void> cf2 = cf1.thenAccept(data -> System.out.println("결과: " + data));
- thenRun(Runnable)
- 이전 단계의 결과와 관계없이 후속 작업을 실행합니다.
- 즉, 후속 작업이 이전 단계의 결과 값이 필요 없을 경우 사용합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> {
simulate();
return "데이터";
}, exec);
// 이전 CompletableFuture과 결과 값과 상관없이 후속 작업 실행 후 새로운 CompletableFuture<Void> 반환
CompletableFuture<Void> cf2 = cf1.thenRun(() -> System.out.println("후속 작업"));
- thenCompose(Function<? super T,CompletableFuture<U>>)
- 이전 단계들을 바탕으로 새로운 비동기 작업을 실행(flatMap 스타일).
- 즉, 두 개의 비동기 작업 결과를 합성하여 새로운 CompletableFuture 객체를 생성
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> {
simulate();
System.out.println("선행 작업 완료(1)");
return "데이터1";
}, exec);
CompletableFuture<String> cf2 = CompletableFuture
.supplyAsync(() -> {
simulate();
System.out.println("선행 작업 완료(2)");
return "데이터2";
}, exec);
// cf1, cf2의 작업 결과를 합성하여 새로운 CompletableFuture 객체 생성
CompletableFuture<String> cfCombined = cf1.thenCombine(cf2, (s1, s2) -> s1 + s2);
- applyToEither(CompletionStage<? extends T>, Function<? super T,? extends R>)
- 주어진 두 작업 중 먼저 완료된 결과를 사용합니다.
- 아직 작업이 끝나지 않은 Future와는 상관없이 작업이 끝난 Future 기준으로 순차적 비동기 흐름을 구성할 수 있습니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> {
simulate();
System.out.println("선행 작업 완료(1)");
return "데이터1";
}, exec);
CompletableFuture<String> cf2 = CompletableFuture
.supplyAsync(() -> {
simulate();
System.out.println("선행 작업 완료(2)");
return "데이터2";
}, exec);
// cf1, cf2 중 작업이 먼저 끝난 결과 값을 사용
CompletableFuture<String> cf3 = cf1.applyToEither(cf2, winner -> winner);
// cf1 이 작업이 끝나고 cf2가 작업이 끝나지 않았더라도
// cf1으로 비동기 흐름을 구성할 수 있다.
- exceptionally(Function<Throwable,? extends T>)
- 비동기 처리 중 예외 발생 시 처리 할 수 있습니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> {
simulate();
return "데이터";
}, exec);
CompletableFuture<String> cf2 = cf1.thenApply(data -> data.toUpperCase());
// 비동기 흐름(cf1, cf2) 중 에러가 발생 시 캐치하여 처리할 수 있다.
CompletableFuture<String> cf3 = cf2.exceptionally(ex -> {
System.out.println("예외 발생");
return "예외 발생";
});
- handle(BiFunction<? super T,Throwable,? extends U>)
- 성공 또는 실패 모두를 처리해 최종 결과를 반환합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
.supplyAsync(() -> {
simulate();
return "데이터";
}, exec);
CompletableFuture<String> cf2 = cf1.thenApply(data -> {
throw new RuntimeException();
});
// res : 결과 값, ex : 예외 클래스
CompletableFuture<String> handleCf3 = cf2.handle((res, ex) -> {
if (ex != null) return "오류 복구값";
return res + "_processed";
});
- whenComplete(BiConsumer<? super T,? super Throwable>)
- 작업 완료 후 부수작업(로깅 등)을 수행합니다.
cf1.whenComplete((res, ex) -> {
if (ex != null) log.error("실패", ex);
else log.info("완료: {}", res);
});
- allOf(CompletableFuture<?>...)
- 여러 CompletableFuture가 모두 완료될 때까지 기다립니다.
CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2);
// cf1, cf2가 모두 완료 되면 실행 아래 콜백 함수 호출된다.
all.thenRun(() -> System.out.println("모두 완료"));
- anyOf(CompletableFuture<?>...)
- 여러 작업 중 하나라도 완료되면 즉시 반환합니다.
CompletableFuture<Object> first = CompletableFuture.anyOf(cf1, cf2);
// cf1, cf2 중 작업이 먼저 완료된 것이 있으면 아래 콜백 함수 호출된다.
first.thenAccept(System.out::println);
- join() / get()
- 작업이 완료될 때까지 블로킹하여 결과를 반환합니다.
- join()은 CompletionException으로 예외를 래핑합니다.
String result1 = cf1.join(); // 블로킹, 예외는 CompletionException으로 래핑
String result2 = cf1.get(); // 블로킹, checked 예외 처리 필요
- complete(T value) / completeExceptionally(Throwable ex)
- 외부에서 수동으로 작업을 완료하거나 예외 상태로 설정할 수 있습니다.
// 예: 외부 시그널에 따라 강제 완료
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
simulateLongRunningTask();
return "원본 결과";
});
// 타임아웃 감지 후 강제 완료
boolean completed = cf.complete("타임아웃 기본값");
System.out.println(completed); // true
// 이미 완료된 경우 추가 complete는 무시
boolean again = cf.complete("다시 시도");
System.out.println(again); // false
3. CompletableFuture 주요 메서드 및 예시
3-1. 비동기 작업 생성
// ─────────────────────────────────────────────────
// 1) ExecutorService + Future 사용 예시
// ─────────────────────────────────────────────────
ExecutorService exec = Executors.newFixedThreadPool(4);
Future<String> future = exec.submit(() -> {
simulateIo();
return "서버 응답 데이터";
});
String data = future.get(); // ◀ 블로킹 발생
System.out.println("Future 결과: " + data);
// ❗️ 이후 작업은 future 가 완료 될 때까지 대기 후 처리된다.
// ─────────────────────────────────────────────────
// 2) CompletableFuture 사용 예시
// ─────────────────────────────────────────────────
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
simulateIo(); // 비동기 작업 제출
return "서버 응답 데이터";
}, exec);
// 비블로킹 콜백 등록 → get()/join() 없이 처리 가능
cf.thenAccept(result -> System.out.println("CF 결과: " + result));
// ❗️ 이후 작업이 실행되다가 비동기 작업이 완료되면 콜백 함수가 호출된다.
3-2. 콜백 등록 & 결과 변환
// ─────────────────────────────────────────────────
// 1) Future + 별도 스레드에서 콜백 처리
// ─────────────────────────────────────────────────
ExecutorService exec2 = Executors.newSingleThreadExecutor();
Future<String> future2 = exec2.submit(() -> fetchData());
// 콜백을 위해 또 다른 스레드에 등록해야 함
exec2.submit(() -> {
try {
String fetched = future2.get(); // ◀ 블로킹
String upper = fetched.toUpperCase();
System.out.println("Future 콜백 변환: " + upper);
} catch (Exception e) {
e.printStackTrace();
}
});
// ─────────────────────────────────────────────────
// 2) CompletableFuture 콜백 & 변환 기능
// ─────────────────────────────────────────────────
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> fetchData());
// thenApply: 결과 변환 → 새로운 CF 반환
cf2.thenApply(data -> data.toUpperCase())
.thenAccept(upper -> System.out.println("CF 변환 후 콜백: " + upper))
.exceptionally(ex -> {
System.err.println("에러: " + ex.getMessage());
return null;
});
- Future는 변환·소비 로직을 수동으로 스레드에 등록해야 하며 중간에 또 블로킹이 발생합니다.
- CompletableFuture는 thenApply, thenAccept로 선언적 체이닝 및 예외 처리를 제공합니다.
3-3. 연쇄(Composition) & 합성(Combination)
// thenCompose: 순차적 비동기 연쇄 (flatMap 스타일)
CompletableFuture<String> cfChain = CompletableFuture
.supplyAsync(() -> fetchUserId())
.thenCompose(id -> CompletableFuture
.supplyAsync(() -> fetchUserProfile(id))
);
cfChain.thenAccept(profile -> log.info("프로필: {}", profile));
// thenCombine: 병렬 독립 실행 후 결과 합성
CompletableFuture<Integer> cfA = CompletableFuture.supplyAsync(() -> fetchPriceA());
CompletableFuture<Integer> cfB = CompletableFuture.supplyAsync(() -> fetchPriceB());
CompletableFuture<Integer> cfTotal =
cfA.thenCombine(cfB, (a, b) -> a + b);
cfTotal.thenAccept(total -> log.info("총 합계: {}", total));
// applyToEither: 먼저 완료된 쪽 사용
CompletableFuture<String> cfX = CompletableFuture.supplyAsync(() -> taskX());
CompletableFuture<String> cfY = CompletableFuture.supplyAsync(() -> taskY());
cfX.applyToEither(cfY, winner -> "우승 결과: " + winner)
.thenAccept(System.out::println);
3-4. 예외 처리
cf2.exceptionally(ex -> {
log.error("실패:", ex);
return "기본값"; // 예외 발생 시 대체값
})
.thenAccept(v -> log.info("결과: {}", v));
cf2.handle((res, ex) -> {
if (ex != null) {
log.warn("에러 복구:", ex);
return "복구값";
}
return res + "_processed";
});
3-5. 다중 Future 조합
CompletableFuture<String> fx = CompletableFuture.supplyAsync(() -> taskX());
CompletableFuture<String> fy = CompletableFuture.supplyAsync(() -> taskY());
// allOf: 모두 완료 후 실행
CompletableFuture<Void> all = CompletableFuture.allOf(fx, fy);
all.thenRun(() -> log.info("모든 작업 완료"));
// anyOf: 하나라도 완료되면 실행
CompletableFuture<Object> first = CompletableFuture.anyOf(fx, fy);
first.thenAccept(win -> log.info("가장 빠른 결과: {}", win));
4. Spring에서 @Async와 함께 사용하기 - 예시
4-1. 설정 및 빈 등록
// Application 클래스에 @EnableAsync 추가
@SpringBootApplication
@EnableAsync // 비동기 기능 활성화
public class DemoApplication { … }
// Async 전용 스레드풀 등록
@Configuration
public class AsyncConfig {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(4); // CPU 바운드: core = CPU + 1
exec.setMaxPoolSize(16); // I/O 바운드: core = 2*CPU + 1
exec.setQueueCapacity(50);
exec.setThreadNamePrefix("Async-");
exec.initialize();
return exec;
}
}
4-2. @Async 메서드 구현
@Service
public class EmailService {
@Async("taskExecutor")
public CompletableFuture<Boolean> sendEmail(String to, String subject, String body) {
try {
mailSender.send(createMessage(to, subject, body));
return CompletableFuture.completedFuture(true);
} catch (MailException e) {
return CompletableFuture.failedFuture(e);
}
}
}
4-3. 실행 예시
@RestController
@RequiredArgsConstructor
public class UserController {
private final EmailService emailService;
@PostMapping("/signup")
public ResponseEntity<String> signup(@RequestBody UserDto dto) {
userRepo.save(dto.toEntity());
emailService.sendEmail(dto.getEmail(), "Welcome!", "가입을 축하드립니다!")
.thenAccept(ok -> log.info("이메일 발송 성공 여부: {}", ok))
.exceptionally(ex -> {
// 예외 발생 시 로깅·알림
log.error("이메일 발송 실패:", ex);
return null;
});
return ResponseEntity.ok("회원 가입 및 이메일 발송 요청 완료");
}
}
728x90
'Backend > Java' 카테고리의 다른 글
[Java] RestTemplate 대신 WebClient를 선택하는 이유 (3) | 2025.05.25 |
---|---|
[Java] 파일 입출력의 진화: 전통 I/O에서 현대적 비동기·리액티브 방식까지 (0) | 2025.05.24 |
[Java] ☕ Executor 프레임 워크 (1) | 2025.04.27 |
[Java] 🚦 자바 메모리 가시성(Java Memory Visibility) (1) | 2025.04.20 |
[Java] ☕ Thread 정리: 메서드, 생명주기, 제어 방법 (0) | 2025.04.07 |