본문 바로가기
개발/백엔드

[백엔드]Singleflight 디자인패턴

by 돼지얍 2025. 7. 11.
반응형

최근 회사에서 특정 API에 트래픽이 몰릴 때 서버가 뻗는 현상을 경험했습니다. 캐시를 도입했는데도 캐시가 만료되는 순간 동시에 들어온 수백 개의 요청이 모두 DB를 호출하면서 발생한 문제였죠. 

결국 당장의 극약 처방으로 스케일인아웃으로 해결하긴했지만 백엔드 java 코드 설계상의 개선 필요성은 분명 했습니다.

해당 문제를 개선하기위해 구글링을 하던와중 다른 언어에선 이런 사례를 어떻게 해결할까 , Go 언어의 singleflight 패키지와 이를 활용한 패턴을 발견했습니다.

📌 문제의 시작: 캐시 Stampede 현상

우리 서비스와 일치하는 건 아니지만 이해가 쉬운 예제로 인기 상품 정보를 조회하는 API가 있었습니다:

@RestController
@RequiredArgsConstructor
public class ProductController {
    private final ProductService productService;
    private final RedisTemplate<String, Product> redisTemplate;
    
    @GetMapping("/products/{id}")
    public Product getProduct(@PathVariable Long id) {
        String key = "product:" + id;
        Product cached = redisTemplate.opsForValue().get(key);
        
        if (cached != null) {
            return cached;
        }
        
        // 캐시 미스! DB에서 조회
        Product product = productService.findById(id);  // 약 500ms 소요
        redisTemplate.opsForValue().set(key, product, Duration.ofMinutes(5));
        return product;
    }
}

평소에는 잘 작동했지만, 인기 상품의 캐시가 만료되는 순간 문제가 발생했습니다. 동시에 100개의 요청이 들어오면 모두가 캐시 미스를 겪고, 100개의 DB 쿼리가 동시에 실행되는 거죠. 이를 Cache Stampede 또는 Thundering Herd 문제라고 부른다는 걸 나중에 알게 되었습니다.

 

Go 언어 커뮤니티에서 자주 언급되는 golang.org/x/sync/singleflight 패키지를 발견했습니다. 이 패키지의 핵심 아이디어는 심플했습니다.

"동일한 키에 대한 여러 요청이 동시에 들어오면, 실제 작업은 한 번만 수행하고 그 결과를 모든 요청자에게 공유한다"

 

💡 Singleflight 패턴의 원리 이해하기

패턴의 핵심 원리를 이해하기 위해 먼저 간단한 의사 코드로 정리해봤습니다:

여러 요청이 동시에 도착했을 때:
1. 첫 번째 요청이 실제 작업을 시작
2. 나머지 요청들은 첫 번째 요청이 끝날 때까지 대기
3. 작업이 완료되면 모든 대기 중인 요청에게 동일한 결과 반환

이를 그림으로 표현하면:

기존 방식:
Request 1 → DB Query → Response 1
Request 2 → DB Query → Response 2  
Request 3 → DB Query → Response 3
(3번의 DB 호출)

Singleflight 패턴:
Request 1 ─┐
Request 2 ─┼→ DB Query (1번만) → 모든 요청에 동일한 응답
Request 3 ─┘

🔧 Java/Spring Boot에서 구현하기

Go의 singleflight를 Java로 구현해보기로 했습니다. 가장 빠르게 접근하는 방법으로 CompletableFutureConcurrentHashMap을 활용하는 것이였습니다:

1. Singleflight 구현체 만들기

@Component
public class SingleflightExecutor<K, V> {
    // 현재 진행 중인 작업들을 저장
    private final ConcurrentHashMap<K, CompletableFuture<V>> inFlightRequests = new ConcurrentHashMap<>();
    
    public CompletableFuture<V> execute(K key, Supplier<V> supplier) {
        // 이미 진행 중인 요청이 있는지 확인
        CompletableFuture<V> existingFuture = inFlightRequests.get(key);
        if (existingFuture != null) {
            // 진행 중인 요청에 합류!
            return existingFuture;
        }
        
        // 새로운 작업 시작
        CompletableFuture<V> newFuture = CompletableFuture.supplyAsync(supplier);
        
        // 다른 스레드가 동시에 시작했을 수도 있으니 putIfAbsent 사용
        CompletableFuture<V> actualFuture = inFlightRequests.putIfAbsent(key, newFuture);
        
        if (actualFuture != null) {
            // 다른 스레드가 먼저 시작했네요, 그거 사용
            return actualFuture;
        }
        
        // 작업 완료 후 맵에서 제거
        newFuture.whenComplete((result, error) -> {
            inFlightRequests.remove(key);
        });
        
        return newFuture;
    }
}

처음에는 단순히 HashMap과 synchronized를 사용했는데, 성능이 안 좋아서 ConcurrentHashMap과 putIfAbsent를 활용하는 방식으로 개선했습니다.

2. Service 레이어에 적용하기

@Service
@RequiredArgsConstructor
@Slf4j
public class ProductService {
    private final ProductRepository productRepository;
    private final SingleflightExecutor<Long, Product> singleflightExecutor;
    private final RedisTemplate<String, Product> redisTemplate;
    
    public Product getProduct(Long productId) {
        String cacheKey = "product:" + productId;
        
        // 1. 캐시 확인
        Product cached = redisTemplate.opsForValue().get(cacheKey);
        if (cached != null) {
            log.info("Cache hit for product: {}", productId);
            return cached;
        }
        
        log.info("Cache miss for product: {}", productId);
        
        // 2. Singleflight로 DB 조회
        try {
            return singleflightExecutor.execute(productId, () -> {
                log.info("Executing DB query for product: {}", productId);
                
                // 실제 DB 조회
                Product product = productRepository.findById(productId)
                    .orElseThrow(() -> new ProductNotFoundException(productId));
                
                // 캐시에 저장
                redisTemplate.opsForValue().set(cacheKey, product, Duration.ofMinutes(5));
                
                return product;
            }).get(); // CompletableFuture의 결과를 동기적으로 받기
            
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Failed to get product", e);
        }
    }
}

3. 타임아웃과 에러 처리

실제로 운영에 적용하면서 몇 가지 문제를 발견했습니다:

  • 첫 번째 요청이 실패하면 대기 중인 모든 요청이 실패
  • 첫 번째 요청이 너무 오래 걸리면 모든 요청이 대기

그래서 개선된 버전을 만들었습니다:

@Component
@Slf4j
public class ImprovedSingleflightExecutor<K, V> {
    private final ConcurrentHashMap<K, Flight<V>> inFlightRequests = new ConcurrentHashMap<>();
    private final ExecutorService executor = Executors.newCachedThreadPool();
    
    @Value("${singleflight.timeout:5000}")
    private long timeoutMillis;
    
    // Flight 클래스: 요청 정보를 담는 wrapper
    private static class Flight<V> {
        final CompletableFuture<V> future;
        final long startTime;
        final AtomicInteger waitingCount = new AtomicInteger(1);
        
        Flight(CompletableFuture<V> future) {
            this.future = future;
            this.startTime = System.currentTimeMillis();
        }
    }
    
    public V execute(K key, Supplier<V> supplier) throws Exception {
        while (true) {
            Flight<V> existing = inFlightRequests.get(key);
            
            if (existing != null) {
                // 타임아웃 체크
                if (System.currentTimeMillis() - existing.startTime > timeoutMillis) {
                    log.warn("In-flight request timeout for key: {}", key);
                    inFlightRequests.remove(key, existing);
                    // 다시 시도
                    continue;
                }
                
                existing.waitingCount.incrementAndGet();
                log.info("Joining existing request for key: {}, waiting count: {}", 
                        key, existing.waitingCount.get());
                
                try {
                    return existing.future.get(timeoutMillis, TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    log.error("Timeout waiting for in-flight request: {}", key);
                    throw new RuntimeException("Request timeout", e);
                } finally {
                    existing.waitingCount.decrementAndGet();
                }
            }
            
            // 새로운 요청 시작
            CompletableFuture<V> future = CompletableFuture.supplyAsync(supplier, executor);
            Flight<V> newFlight = new Flight<>(future);
            
            Flight<V> actualFlight = inFlightRequests.putIfAbsent(key, newFlight);
            if (actualFlight != null) {
                // 경쟁 상태: 다른 스레드가 먼저 시작함
                continue;
            }
            
            // 정리 작업 등록
            future.whenComplete((result, error) -> {
                inFlightRequests.remove(key);
                if (error != null) {
                    log.error("Request failed for key: {}", key, error);
                } else {
                    log.info("Request completed for key: {}, served {} waiting requests", 
                            key, newFlight.waitingCount.get());
                }
            });
            
            return future.get(timeoutMillis, TimeUnit.MILLISECONDS);
        }
    }
}

📊 성능 측정 - 테스트 코드

실제로 효과가 있는지 확인하기 위해 JUnit과 병렬 처리를 활용한 테스트 코드를 작성했습니다:

1. 테스트 환경 설정

먼저 테스트를 위한 Mock 서비스를 만들었습니다:

@SpringBootTest
@Slf4j
public class SingleflightPerformanceTest {
    
    @MockBean
    private ProductRepository productRepository;
    
    @Autowired
    private ProductService productService;
    
    @Autowired
    private SingleflightExecutor<Long, Product> singleflightExecutor;
    
    private final AtomicInteger dbCallCount = new AtomicInteger(0);
    private final AtomicInteger errorCount = new AtomicInteger(0);
    
    @BeforeEach
    void setUp() {
        // DB 호출을 시뮬레이션하는 Mock 설정
        when(productRepository.findById(anyLong())).thenAnswer(invocation -> {
            dbCallCount.incrementAndGet();
            Thread.sleep(500); // DB 쿼리 시간 시뮬레이션
            
            Long productId = invocation.getArgument(0);
            return Optional.of(Product.builder()
                .id(productId)
                .name("Product " + productId)
                .price(1000L * productId)
                .build());
        });
        
        // 카운터 초기화
        dbCallCount.set(0);
        errorCount.set(0);
    }
}

2. Singleflight 없이 동시 요청 테스트

@Test
@DisplayName("Singleflight 없이 100개 동시 요청 시 DB 호출 횟수 측정")
void testWithoutSingleflight() throws InterruptedException {
    // given
    int concurrentRequests = 100;
    CountDownLatch startLatch = new CountDownLatch(1);
    CountDownLatch endLatch = new CountDownLatch(concurrentRequests);
    ExecutorService executor = Executors.newFixedThreadPool(concurrentRequests);
    
    List<Long> responseTimes = Collections.synchronizedList(new ArrayList<>());
    
    // when - 100개의 동시 요청 시뮬레이션
    for (int i = 0; i < concurrentRequests; i++) {
        executor.submit(() -> {
            try {
                startLatch.await(); // 모든 스레드가 동시에 시작
                
                long startTime = System.currentTimeMillis();
                productRepository.findById(1L).orElseThrow();
                long endTime = System.currentTimeMillis();
                
                responseTimes.add(endTime - startTime);
                
            } catch (Exception e) {
                errorCount.incrementAndGet();
                log.error("Request failed", e);
            } finally {
                endLatch.countDown();
            }
        });
    }
    
    // 모든 요청 동시 시작
    startLatch.countDown();
    
    // 모든 요청 완료 대기
    boolean completed = endLatch.await(10, TimeUnit.SECONDS);
    
    // then
    assertTrue(completed, "모든 요청이 10초 내에 완료되어야 함");
    
    log.info("=== Without Singleflight ===");
    log.info("Total DB calls: {}", dbCallCount.get());
    log.info("Error count: {}", errorCount.get());
    log.info("Average response time: {}ms", 
        responseTimes.stream().mapToLong(Long::longValue).average().orElse(0));
    log.info("Max response time: {}ms", 
        responseTimes.stream().mapToLong(Long::longValue).max().orElse(0));
    
    // 검증: 동시 요청 수만큼 DB 호출이 발생해야 함
    assertEquals(concurrentRequests, dbCallCount.get());
}

3. Singleflight 적용 후 테스트

@Test
@DisplayName("Singleflight 적용 시 100개 동시 요청에서 DB는 1번만 호출")
void testWithSingleflight() throws InterruptedException {
    // given
    int concurrentRequests = 100;
    CountDownLatch startLatch = new CountDownLatch(1);
    CountDownLatch endLatch = new CountDownLatch(concurrentRequests);
    ExecutorService executor = Executors.newFixedThreadPool(concurrentRequests);
    
    List<Long> responseTimes = Collections.synchronizedList(new ArrayList<>());
    List<Product> results = Collections.synchronizedList(new ArrayList<>());
    
    // when - Singleflight를 통한 요청
    for (int i = 0; i < concurrentRequests; i++) {
        executor.submit(() -> {
            try {
                startLatch.await();
                
                long startTime = System.currentTimeMillis();
                
                // Singleflight 실행
                Product product = singleflightExecutor.execute(1L, () -> 
                    productRepository.findById(1L).orElseThrow()
                ).get();
                
                long endTime = System.currentTimeMillis();
                
                results.add(product);
                responseTimes.add(endTime - startTime);
                
            } catch (Exception e) {
                errorCount.incrementAndGet();
                log.error("Request failed", e);
            } finally {
                endLatch.countDown();
            }
        });
    }
    
    startLatch.countDown();
    boolean completed = endLatch.await(10, TimeUnit.SECONDS);
    
    // then
    assertTrue(completed);
    
    log.info("=== With Singleflight ===");
    log.info("Total DB calls: {}", dbCallCount.get());
    log.info("Error count: {}", errorCount.get());
    log.info("Average response time: {}ms", 
        responseTimes.stream().mapToLong(Long::longValue).average().orElse(0));
    log.info("Results count: {}", results.size());
    
    // 핵심 검증: DB는 1번만 호출되어야 함!
    assertEquals(1, dbCallCount.get(), "DB는 한 번만 호출되어야 함");
    assertEquals(concurrentRequests, results.size(), "모든 요청이 결과를 받아야 함");
    
    // 모든 결과가 동일한지 확인
    Product firstProduct = results.get(0);
    assertTrue(results.stream().allMatch(p -> p.equals(firstProduct)), 
        "모든 요청이 동일한 결과를 받아야 함");
}

결과

Without Singleflight:
  - Total time: 2547ms
  - DB calls: 100
  - Avg response time: 512.34ms
  - Errors: 0

With Singleflight:
  - Total time: 508ms
  - DB calls: 1
  - Avg response time: 503.21ms
  - Errors: 0

모든상황을 가정한 테스트이긴 하지만 운영 서비스와 비슷한 상황에서 성능개선 자체는 명확했습니다.

🤔 실제 적용하면서 배운 점들

모든 상황에 적합하지는 않다

  • 동일한 요청이 짧은 시간에 여러 번 들어오는 경우
  • 작업 비용이 높은 경우 (DB 쿼리, 외부 API 호출 등)
  • 결과를 캐싱할 수 있는 경우

하지만 이런 경우엔 적합하지 않았습니다:

  • 요청마다 다른 결과가 필요한 경우 (예: 사용자별 맞춤 추천)
  • 실시간성이 매우 중요한 경우

다른 디자인패턴들

1. Request Collapsing (Netflix)

Netflix의 Hystrix에서 사용하는 패턴으로, 시간 창(time window) 내의 요청들을 모아서 배치로 처리.

2. Adaptive Concurrency Limits (Google)

동적으로 동시 실행 가능한 요청 수를 조절.

3. Circuit Breaker with Fallback

실패가 반복되면 회로를 차단하고 fallback 응답을 제공(spring java에서 외부 api 장애대응시 가장 많이 보이는 방법)

🏁 마무리

 Singleflight 패턴을 발견하고 추가적인 TMI 소감을 정리해보자면:

  1. 서비스 응급처치후 핵심 문제를 정확히 해결하는 것이 때로는 중요
  2. 다른 생태계의 솔루션도 참고할 가치Go의 패턴을 Java로 구현하면서 많이 배웠음
  3. 테스트없이는 개선도 없다 - 실제 효과를 수치로 확인하는 것의 중요성
반응형

'개발 > 백엔드' 카테고리의 다른 글

Bean scope  (0) 2023.03.17