본문 바로가기
Backend/Java

[Java] CompletableFuture를 활용한 비동기 프로그래밍

by 제이동 개발자 2025. 5. 5.
728x90

[Java] CompletableFuture를 활용한 비동기 프로그래밍

1. 기존 Future의 한계

// ExecutorService + Future 사용 예시
ExecutorService exec = Executors.newFixedThreadPool(4);
Future<String> future = exec.submit(() -> {
    simulate();             // 작업 수행
    return "서버 응답 데이터";
});
String result = future.get(); // 블로킹: 호출 스레드가 여기서 멈춰 대기
// Future 작업이 완료 된 후 이후 로직 실행

 

  • 블로킹 호출: Future.get() 호출 시 작업이 끝날 때까지 스레드가 멈춥니다.
  • 콜백 구성 번거로움: 작업 완료 후 후속 로직을 연결하려면 또 다른 스레드를 직접 생성하거나 수동으로 관리해야 합니다.
  • 예외 처리 미흡: 복수의 Future 조합, 예외 흐름 추적이 어렵습니다.

 

 

2. CompletableFuture 등장 배경

  • 함수형 프로그래밍: Java 8의 람다·스트림 스타일을 비동기 영역에도 적용
  • 비블로킹 콜백: thenApply, thenAccept 등으로 결과가 준비될 때 자동 실행
  • 연쇄 & 합성: thenCompose, thenCombine으로 복잡한 비동기 워크플로우 선언적 구성
  • 예외 전파 & 처리: exceptionally, handle 등으로 흐름 내에서 예외 복구
  • 경량 스레드풀: 기본 ForkJoinPool.commonPool() 사용, 필요시 커스텀 Executor 지정 가능

 

 

3. CompletableFuture 주요 메서드 정리

  • supplyAsync(Supplier<U>)
    • 비동기 작업을 스레드풀에 제출하고 결과를 반환하는 CompletableFuture<U>를 생성합니다.
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
    // I/O 작업 예시
    simulateIo();
    return "데이터";
});

 

  • supplyAsync(Supplier<U>, Executor)
    • 지정한 Executor를 사용하여 비동기 작업을 실행합니다.
ExecutorService pool = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(
    () -> fetchFromDB(), pool  
);

 

  • runAsync(Runnable)
    • 반환값 없이 비동기 작업을 실행하며, CompletableFuture<Void>를 반환합니다.
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
    // 리턴값 없는 작업 예시
    cleanUpTempFiles();
});

 

  • thenApply(Function<? super T,? extends U>)
    • 이전 단계 결과를 받아 변환 후 새 CompletableFuture<U>를 반환합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      return "데이터";
    }, exec);

// 이전 CompletableFuture의 반환 값으로 새로운 CompletableFuture 객체 생성
CompletableFuture<String> cf2 = cf1.thenApply(data -> data.toUpperCase());

 

  • thenAccept(Consumer<? super T>)
    • 이전 단계 결과를 소비(처리)하며, 후속 CompletableFuture<Void>를 반환합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      return "데이터";
    }, exec);

// 이전 CompletableFuture의 반환 값으로 소비(처리) 후 새로운 CompletableFuture<Void> 반환
CompletableFuture<Void> cf2 = cf1.thenAccept(data -> System.out.println("결과: " + data));

 

 

  • thenRun(Runnable)
    • 이전 단계의 결과와 관계없이 후속 작업을 실행합니다.
    • 즉, 후속 작업이 이전 단계의 결과 값이 필요 없을 경우 사용합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      return "데이터";
    }, exec);

// 이전 CompletableFuture과 결과 값과 상관없이 후속 작업 실행 후 새로운 CompletableFuture<Void> 반환
CompletableFuture<Void> cf2 = cf1.thenRun(() -> System.out.println("후속 작업"));

 

  • thenCompose(Function<? super T,CompletableFuture<U>>)
    • 이전 단계들을 바탕으로 새로운 비동기 작업을 실행(flatMap 스타일).
    • 즉, 두 개의 비동기 작업 결과를 합성하여 새로운 CompletableFuture 객체를 생성
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      System.out.println("선행 작업 완료(1)");
      return "데이터1";
    }, exec);
CompletableFuture<String> cf2 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      System.out.println("선행 작업 완료(2)");
      return "데이터2";
    }, exec);

// cf1, cf2의 작업 결과를 합성하여 새로운 CompletableFuture 객체 생성
CompletableFuture<String> cfCombined = cf1.thenCombine(cf2, (s1, s2) -> s1 + s2);

 

  • applyToEither(CompletionStage<? extends T>, Function<? super T,? extends R>)
    • 주어진 두 작업 중 먼저 완료된 결과를 사용합니다.
    • 아직 작업이 끝나지 않은 Future와는 상관없이 작업이 끝난 Future 기준으로 순차적 비동기 흐름을 구성할 수 있습니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      System.out.println("선행 작업 완료(1)");
      return "데이터1";
    }, exec);
CompletableFuture<String> cf2 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      System.out.println("선행 작업 완료(2)");
      return "데이터2";
    }, exec);

// cf1, cf2 중 작업이 먼저 끝난 결과 값을 사용
CompletableFuture<String> cf3 = cf1.applyToEither(cf2, winner -> winner);
// cf1 이 작업이 끝나고 cf2가 작업이 끝나지 않았더라도
// cf1으로 비동기 흐름을 구성할 수 있다.

 

  • exceptionally(Function<Throwable,? extends T>)
    • 비동기 처리 중 예외 발생 시 처리 할 수 있습니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      return "데이터";
    }, exec);

CompletableFuture<String> cf2 = cf1.thenApply(data -> data.toUpperCase());

// 비동기 흐름(cf1, cf2) 중 에러가 발생 시 캐치하여 처리할 수 있다.
CompletableFuture<String> cf3 = cf2.exceptionally(ex -> {
  System.out.println("예외 발생");
  return "예외 발생";
});

 

  • handle(BiFunction<? super T,Throwable,? extends U>)
    • 성공 또는 실패 모두를 처리해 최종 결과를 반환합니다.
ExecutorService exec = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf1 = CompletableFuture
    .supplyAsync(() -> {
      simulate();
      return "데이터";
    }, exec);

CompletableFuture<String> cf2 = cf1.thenApply(data -> {
  throw new RuntimeException();
});

// res : 결과 값, ex : 예외 클래스
CompletableFuture<String> handleCf3 = cf2.handle((res, ex) -> {
  if (ex != null) return "오류 복구값";
  return res + "_processed";
});

 

  • whenComplete(BiConsumer<? super T,? super Throwable>)
    • 작업 완료 후 부수작업(로깅 등)을 수행합니다.
cf1.whenComplete((res, ex) -> {
    if (ex != null) log.error("실패", ex);
    else log.info("완료: {}", res);
});

 

  • allOf(CompletableFuture<?>...)
    • 여러 CompletableFuture가 모두 완료될 때까지 기다립니다.
CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2);
// cf1, cf2가 모두 완료 되면 실행 아래 콜백 함수 호출된다.
all.thenRun(() -> System.out.println("모두 완료"));

 

  • anyOf(CompletableFuture<?>...)
    • 여러 작업 중 하나라도 완료되면 즉시 반환합니다.
CompletableFuture<Object> first = CompletableFuture.anyOf(cf1, cf2);
// cf1, cf2 중 작업이 먼저 완료된 것이 있으면 아래 콜백 함수 호출된다.
first.thenAccept(System.out::println);

 

  • join() / get()
    • 작업이 완료될 때까지 블로킹하여 결과를 반환합니다.
    • join()은 CompletionException으로 예외를 래핑합니다.
String result1 = cf1.join(); // 블로킹, 예외는 CompletionException으로 래핑
String result2 = cf1.get();  // 블로킹, checked 예외 처리 필요

 

  • complete(T value) / completeExceptionally(Throwable ex)
    • 외부에서 수동으로 작업을 완료하거나 예외 상태로 설정할 수 있습니다.
// 예: 외부 시그널에 따라 강제 완료
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    simulateLongRunningTask();
    return "원본 결과";
});
// 타임아웃 감지 후 강제 완료
boolean completed = cf.complete("타임아웃 기본값");
System.out.println(completed); // true

// 이미 완료된 경우 추가 complete는 무시
boolean again = cf.complete("다시 시도");
System.out.println(again);     // false

 

 

3. CompletableFuture 주요 메서드 및 예시

3-1. 비동기 작업 생성

// ─────────────────────────────────────────────────
// 1) ExecutorService + Future 사용 예시
// ─────────────────────────────────────────────────
ExecutorService exec = Executors.newFixedThreadPool(4);
Future<String> future = exec.submit(() -> {
    simulateIo();
    return "서버 응답 데이터";
});
String data = future.get();             // ◀ 블로킹 발생
System.out.println("Future 결과: " + data);
// ❗️ 이후 작업은 future 가 완료 될 때까지 대기 후 처리된다.


// ─────────────────────────────────────────────────
// 2) CompletableFuture 사용 예시
// ─────────────────────────────────────────────────
CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> {
        simulateIo();                   // 비동기 작업 제출
        return "서버 응답 데이터";
    }, exec);                           

// 비블로킹 콜백 등록 → get()/join() 없이 처리 가능
cf.thenAccept(result -> System.out.println("CF 결과: " + result));
// ❗️ 이후 작업이 실행되다가 비동기 작업이 완료되면 콜백 함수가 호출된다.

 

3-2. 콜백 등록 & 결과 변환

// ─────────────────────────────────────────────────
// 1) Future + 별도 스레드에서 콜백 처리
// ─────────────────────────────────────────────────
ExecutorService exec2 = Executors.newSingleThreadExecutor();
Future<String> future2 = exec2.submit(() -> fetchData());

// 콜백을 위해 또 다른 스레드에 등록해야 함
exec2.submit(() -> {
    try {
        String fetched = future2.get(); // ◀ 블로킹
        String upper   = fetched.toUpperCase();
        System.out.println("Future 콜백 변환: " + upper);
    } catch (Exception e) {
        e.printStackTrace();
    }
});

// ─────────────────────────────────────────────────
// 2) CompletableFuture 콜백 & 변환 기능
// ─────────────────────────────────────────────────
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> fetchData());

// thenApply: 결과 변환 → 새로운 CF 반환
cf2.thenApply(data -> data.toUpperCase())   
   .thenAccept(upper -> System.out.println("CF 변환 후 콜백: " + upper))
   .exceptionally(ex -> {                   
       System.err.println("에러: " + ex.getMessage());
       return null;
   });
  • Future는 변환·소비 로직을 수동으로 스레드에 등록해야 하며 중간에 또 블로킹이 발생합니다.
  • CompletableFuturethenApply, thenAccept선언적 체이닝 및 예외 처리를 제공합니다.

3-3. 연쇄(Composition) & 합성(Combination)

// thenCompose: 순차적 비동기 연쇄 (flatMap 스타일)
CompletableFuture<String> cfChain = CompletableFuture
    .supplyAsync(() -> fetchUserId())
    .thenCompose(id -> CompletableFuture
        .supplyAsync(() -> fetchUserProfile(id))
    );
cfChain.thenAccept(profile -> log.info("프로필: {}", profile));

// thenCombine: 병렬 독립 실행 후 결과 합성
CompletableFuture<Integer> cfA = CompletableFuture.supplyAsync(() -> fetchPriceA());
CompletableFuture<Integer> cfB = CompletableFuture.supplyAsync(() -> fetchPriceB());

CompletableFuture<Integer> cfTotal =
    cfA.thenCombine(cfB, (a, b) -> a + b);
cfTotal.thenAccept(total -> log.info("총 합계: {}", total));

// applyToEither: 먼저 완료된 쪽 사용
CompletableFuture<String> cfX = CompletableFuture.supplyAsync(() -> taskX());
CompletableFuture<String> cfY = CompletableFuture.supplyAsync(() -> taskY());

cfX.applyToEither(cfY, winner -> "우승 결과: " + winner)
   .thenAccept(System.out::println);

 

 

3-4. 예외 처리

cf2.exceptionally(ex -> {
    log.error("실패:", ex);
    return "기본값";   // 예외 발생 시 대체값
})
.thenAccept(v -> log.info("결과: {}", v));

cf2.handle((res, ex) -> {
    if (ex != null) {
        log.warn("에러 복구:", ex);
        return "복구값";
    }
    return res + "_processed";
});

 

3-5. 다중 Future 조합

CompletableFuture<String> fx = CompletableFuture.supplyAsync(() -> taskX());
CompletableFuture<String> fy = CompletableFuture.supplyAsync(() -> taskY());

// allOf: 모두 완료 후 실행
CompletableFuture<Void> all = CompletableFuture.allOf(fx, fy);
all.thenRun(() -> log.info("모든 작업 완료"));

// anyOf: 하나라도 완료되면 실행
CompletableFuture<Object> first = CompletableFuture.anyOf(fx, fy);
first.thenAccept(win -> log.info("가장 빠른 결과: {}", win));

 

 

4. Spring에서 @Async와 함께 사용하기 - 예시

4-1. 설정 및 빈 등록

// Application 클래스에 @EnableAsync 추가
@SpringBootApplication
@EnableAsync  // 비동기 기능 활성화
public class DemoApplication { … }

// Async 전용 스레드풀 등록
@Configuration
public class AsyncConfig {
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
        exec.setCorePoolSize(4);       // CPU 바운드: core = CPU + 1
        exec.setMaxPoolSize(16);       // I/O 바운드: core = 2*CPU + 1
        exec.setQueueCapacity(50);
        exec.setThreadNamePrefix("Async-");
        exec.initialize();
        return exec;
    }
}

 

4-2. @Async 메서드 구현

@Service
public class EmailService {
    @Async("taskExecutor")
    public CompletableFuture<Boolean> sendEmail(String to, String subject, String body) {
        try {
            mailSender.send(createMessage(to, subject, body));
            return CompletableFuture.completedFuture(true);
        } catch (MailException e) {
            return CompletableFuture.failedFuture(e);
        }
    }
}

 

4-3. 실행 예시

@RestController
@RequiredArgsConstructor
public class UserController {
    private final EmailService emailService;

    @PostMapping("/signup")
    public ResponseEntity<String> signup(@RequestBody UserDto dto) {
        userRepo.save(dto.toEntity());

        emailService.sendEmail(dto.getEmail(), "Welcome!", "가입을 축하드립니다!")
            .thenAccept(ok -> log.info("이메일 발송 성공 여부: {}", ok))
            .exceptionally(ex -> {
                // 예외 발생 시 로깅·알림
                log.error("이메일 발송 실패:", ex);
                return null;
            });

        return ResponseEntity.ok("회원 가입 및 이메일 발송 요청 완료");
    }
}

 

 

728x90