• 이 장의 내용

    • 병렬 스트림으로 데이터를 병렬 처리하기
    • 병렬 스트림의 성능 분석
    • 포크/조인 프레임워크
    • Spliterator로 스트림 데이터 쪼개기
  • 병럴 스트림

    • 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림

    • 즉, 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리할 수 있도록 할당 가능

    • 컬렉션에서 parallelStream()을 호출하면 병렬 스트림이 생성됨

    • 예시 코드

      // 무한 스트림을 생성, 모든 숫자를 더하는 리듀싱 연산 수행
      public long sequentialSum(long n) {
      	return Stream.iterate(1L, i -> i + 1)
      		.limit(n)
      		.reduce(0L, Long::sum);
      }
      
  • 순차 스트림을 병렬 스트림으로

    병렬 스트림의 간단한 처리과정

    병렬 스트림의 간단한 처리과정

    • 순차 스트림에서 parallel()을 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리

    • 예시 코드

      // 앞선 예제와 코드에서 달라지는 점은 parallel()부분만 다름
      // 내부적으로는 여러 청크로 분할되어 처리됨(정확하게는 병렬처리 플래그가 true로 변경됨)
      // 반대로 sequential()을 호출하면 병렬 스트림을 순차 스트림으로 바꿀 수 있음
      public long parallelSum(long n) {
      	return Stream.iterate(1L, i -> i + 1)
      		.limit(n)
      		.parallel()
      		.reduce(0L, Long::sum);
      }
      
      // 아래처럼 parallel, sequential이 여러번 사용된 경우 마지막으로 호출한 부분이 적용
      // 예제는 병렬 스트림으로 진행
      stream.parallel()
      	.filter(...)
      	.sequential()
      	.map(...)
      	.parallel()
      	.reduce();
      
  • 병렬 스트림에서 사용하는 스레드 풀 설정

    • 병럴 스트림은 내부적으로 ForkJoinPool을 사용함
    • 기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntim().availableProcessors()가 반환하는 값에 상응하는 스레드를 가짐
    • System.setProperty(”java.util.concurrent.ForkJoinPool.common.parallelism”, “12”)를 통해 전역으로 설정 가능(하나의 병렬 스트림에서만 설정하는 것은 아직 불가능)
    • 기본적으로 프로세서 수와 같게 설정되므로 특별한 경우가 아니라면 커스텀하지 않는 것이 권장됨
  • 스트림 성능 측정

    • **자바 마이크로벤치마크 하니스(JMH)**라는 라이브러리를 이용해 벤치마크 측정 가능

    • JMH를 이용하면 어노테이션을 통해 처리 가능하며, JVM을 대상으로 하는 다른 언어용 벤치마크 구현가능

    • dependency

      implementation "org.openjdk.jmh:jmh-core:1.17.4"
      
    • 예시 코드

      @BenchmarkMode(Mode.averageTime)
      @OutputTimeUnit(TimeUnit.MILLISECONDS)
      @Fork(2, jvmArgs={"-Xms46", "-Xmx4G"})
      public class ParallelStreamBenchmark {
      	
      	private static final long N = 10_000_000L;
      	
      	@Benchmark
      	public long sequentialSum() {
      		return Stream.iterate(1L, i -> i + 1).limit(N).reduce(0L, Long::sum);
      	}
      
      	@TearDown(Level.Invocation)
      	public void tearDown() {
      		System.gc();
      	}
      }
      
  • 더 특화된 메서드 사용

    • rangeClosed 메서드는 사용하면 iterate에 비해 가지는 장점
      • rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라짐
      • rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생성
      • 예를 들어 1~20범위의 숫자를 각각 1~5, 6~10, 11~15, 16~20 범위로 구분 가능
      • 이러한 방식이 특화된 스트림
    • 위 예제를 통해 상황에 따라 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 중요할 수 있음
    • 하지만, 병렬 스트림은 항상 좋은 것은 아님 → 분할하고 합치는 과정등의 비용이 저렴하진 않기 때문
  • 병렬 스트림의 올바른 사용법

    • 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 발생

    • 예시 코드

      public long sideEffectSum(long n) {
      	Accumulator accumulator = new Accumulator();
      	LongStream.rangeClosed(1, n).forEach(accumulator::add);
      	return accumulator.total;
      }
      
      public class Accumulator {
      	public long total = 0;
      	public void add(long value) { total += value; }
      }
      
    • 예시 코드의 문제

      • 위 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있어 병렬로 실행하면 참사 발생
      • 특히 total을 접근할 때 데이터 레이스 문제 발생 → 다수의 스레드에서 동시에 데이터에 접근하는 문제
      • 하지만, 동기화로 문제를 해결한다고 하면 결국 병렬화 특성이 사라짐
      • 이러한 이유로 병렬 스트림을 올바르게 사용하기 위해서는 공유된 가변 상태를 피해야 하는 사실을 기억해야함
  • 병렬 스트림 효과적으로 사용하기

    • 확신이 서지 않으면 직접 측정

      • 순차 스트림을 병렬 스트림으로 바꾸는 것은 어렵지 않지만 무조건 병렬로 처리하는 것이 옳지는 않음(언제나 병렬 스트림이 빠르진 않음)
      • 또한 병렬 스트림의 수행 과정은 투명하지 않음
      • 따라서 판단이 잘 서지 않는다면 벤치마크로 직접 측정
    • 박싱 주의

      • 자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소
      • Java 8에서는 박싱 동작을 방지하기 위해 IntStream, LongStream 등을 제공
    • 순차 스트림보다 병렬 스트림 성능이 떨어지는 연산 존재

      • 특히 limit, findFirst 처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비용이 높음
      • 예를 들어 findAny의 경우 요소의 순서와 관계 없으므로 findFirst보다 성능이 좋음
      • 정렬된 스트림에서 unordered를 호출하면 비정렬 스트림을 얻을 수 있음
      • 스트림에 N개 요소가 존재할 때 요소의 순서가 상관없다면, 비정렬 스트림을 통해 limit을 호출하는 것이 효과적
    • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려

      • 처리해야 할 요소 수가 N이고 하나의 요소를 처리하는데 드는 비용을 Q라고 할 때 전체 스트림 파이프라인 처리 비용을 N * Q로 예상 가능
      • 즉, Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미
    • 소량의 데이터는 병렬 스트림이 도움 되지 않음

      • 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문
    • 스트림을 구성하는 자료구조의 적절성 확인

      • 예를 들어 ArrayList를 LinkedList보다 효율적으로 분할할 수 있음
      • LinkedList를 분할하려면 모든 요소를 탐색해야하지만 ArrayList는 요소를 탐색하지 않고도 분할이 가능
      • 또한 range도 쉽게 분리 가능
      • Spliterator를 구현하면 분리 과정의 제어 가능
    • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있음

      • 예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리 가능
      • 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 병렬 처리할 수 있는지 알 수 없음
    • 최종 연산의 병합 과정 비용을 살펴보자

      • 예를 들면 Collector의 combiner 메서드가 있음
      • 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있음
    • 분해와 관련된 다양한 스트림 소스의 친밀도

      Untitled

  • 포크/조인 프레임워크

    • 포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브테스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계됨
    • 서브테스크를 스레드 풀의 작업자 스레드에 분살 할당하는 ExecutorService 인터페이스 구현
    • Recursive Task 활용
      • 스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야 함

      • 여기서 R은 병렬화된 테스크가 생성하는 결과 형식, 결과가 없을 때는 RecursiveAction 형식

      • RecursiveTask에는 compute()를 구현해서 활용

      • compute() 메서드는 테스크를 서브테스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브테스크의 결과를 생산할 알고리즘 정의

      • 다음과 같은 형식을 유지함

        Untitled

      • 예시 코드

        • 일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않음
        • ForkJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장해서 사용
        // RecursiveTask 상속받아 포크/조인 프레임워크에서 사용할 테스크 생성
        public class ForkJoinSumCalculator extends RecursiveTask<Long> {
            private final long[] numbers; // 더할 숫자 배열
            private final int start; // 이 서브테스크에서 처리할 배열의 초기 위치
            private final int end; // 이 서브테스크에서 처리할 배열의 최종 위치
            public static final long THRESHOLD = 10_000; // 이 값 이하의 서브테스크는 더 이상 분할할 수 없음
        
            // 메인 테스크를 생성할 때 사용할 공개 연산자
            public ForkJoinSumCalculator(long[] numbers) {
                this(numbers, 0, numbers.length);
            }
        
            // 메인 테스크의 서브테크스를 재귀적으로 만들 때 사용할 비공개 생성자
            private ForkJoinSumCalculator(long[] numbers, int start, int end) {
                this.numbers = numbers;
                this.start = start;
                this.end = end;
            }
        
            // RecursiveTask 추상 메서드
            @Override
            protected Long compute() {
                // 이 테스크에서 더할 배열의 길이
                int length = end - start;
                
                // 기준값과 같거나 작으면 순차적으로 결과를 계산
                if(length <= THRESHOLD) {
                    return computeSequentially();
                }
                
                // 배열의 첫 번째 절반을 더하도록 서브테스크를 생성
                ForkJoinSumCalculator leftTask =
                        new ForkJoinSumCalculator(numbers, start, start + length/2);
                // ForkJoinPool 다른 스레드로 새로 생성한 테스크를 비동기로 실행        
                leftTask.fork();
                // 배열의 나머지 절반을 더하도록 서브테스크 생성
                ForkJoinSumCalculator rightTask =
                        new ForkJoinSumCalculator(numbers, start + length/2, end);
                // 두 번째 서브테스크를 동시 실행, 이떄 추가로 분할이 일어날 수 있음
                Long rightResult = rightTask.compute();
                // 첫 번째 서브테스크의 결과를 읽거나 없다면 기다림
                Long leftResult = leftTask.join();
                // 두 테스크의 결과를 조합한 값이 이 테스크의 결과값이 되고 이를 반환
                return leftResult + rightResult;
            }
        
            // 분할할 수 없을 때 서브테스크의 결과를 계산하는 단순한 알고리즘
            private long computeSequentially() {
                long sum = 0;
                for(int i = start; i < end; i++) {
                    sum += numbers[i];
                }
                return sum;
            }
        		// ForkJoinSumCalculator 생성자로 원하는 수의 배열을 넘겨줄 수 있음
            public static long forkJoinSum(long n) {
                long[] numbers = LongStream.range(1, n).toArray();
                ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
                return new ForkJoinPool().invoke(task);
        				// invoke 메서드의 반환값은 ForkJoinSumCalculator에서 정의한 테스크의 결과가 됨
            }
        }
        
      • ForkJoinSumCalculator의 실행

  • 포크/조인 프레임워크 제대로 사용하는 방법

  • 작업 훔치기

  • Spliterator 인터페이스