이 장의 내용
병럴 스트림
병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
즉, 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리할 수 있도록 할당 가능
컬렉션에서 parallelStream()
을 호출하면 병렬 스트림이 생성됨
예시 코드
// 무한 스트림을 생성, 모든 숫자를 더하는 리듀싱 연산 수행
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
순차 스트림을 병렬 스트림으로
병렬 스트림의 간단한 처리과정
순차 스트림에서 parallel()
을 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리
예시 코드
// 앞선 예제와 코드에서 달라지는 점은 parallel()부분만 다름
// 내부적으로는 여러 청크로 분할되어 처리됨(정확하게는 병렬처리 플래그가 true로 변경됨)
// 반대로 sequential()을 호출하면 병렬 스트림을 순차 스트림으로 바꿀 수 있음
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
// 아래처럼 parallel, sequential이 여러번 사용된 경우 마지막으로 호출한 부분이 적용
// 예제는 병렬 스트림으로 진행
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
병렬 스트림에서 사용하는 스레드 풀 설정
Runtime.getRuntim().availableProcessors()
가 반환하는 값에 상응하는 스레드를 가짐System.setProperty(”java.util.concurrent.ForkJoinPool.common.parallelism”, “12”)
를 통해 전역으로 설정 가능(하나의 병렬 스트림에서만 설정하는 것은 아직 불가능)스트림 성능 측정
**자바 마이크로벤치마크 하니스(JMH)**라는 라이브러리를 이용해 벤치마크 측정 가능
JMH를 이용하면 어노테이션을 통해 처리 가능하며, JVM을 대상으로 하는 다른 언어용 벤치마크 구현가능
dependency
implementation "org.openjdk.jmh:jmh-core:1.17.4"
예시 코드
@BenchmarkMode(Mode.averageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(2, jvmArgs={"-Xms46", "-Xmx4G"})
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).reduce(0L, Long::sum);
}
@TearDown(Level.Invocation)
public void tearDown() {
System.gc();
}
}
더 특화된 메서드 사용
rangeClosed
메서드는 사용하면 iterate
에 비해 가지는 장점
병렬 스트림의 올바른 사용법
많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 발생
예시 코드
public long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}
public class Accumulator {
public long total = 0;
public void add(long value) { total += value; }
}
예시 코드의 문제
병렬 스트림 효과적으로 사용하기
확신이 서지 않으면 직접 측정
박싱 주의
순차 스트림보다 병렬 스트림 성능이 떨어지는 연산 존재
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려
소량의 데이터는 병렬 스트림이 도움 되지 않음
스트림을 구성하는 자료구조의 적절성 확인
스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있음
최종 연산의 병합 과정 비용을 살펴보자
combiner
메서드가 있음분해와 관련된 다양한 스트림 소스의 친밀도
포크/조인 프레임워크
스레드 풀을 이용하려면 RecursiveTask<R>
의 서브클래스를 만들어야 함
여기서 R은 병렬화된 테스크가 생성하는 결과 형식, 결과가 없을 때는 RecursiveAction 형식
RecursiveTask에는 compute()
를 구현해서 활용
compute()
메서드는 테스크를 서브테스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브테스크의 결과를 생산할 알고리즘 정의
다음과 같은 형식을 유지함
예시 코드
// RecursiveTask 상속받아 포크/조인 프레임워크에서 사용할 테스크 생성
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers; // 더할 숫자 배열
private final int start; // 이 서브테스크에서 처리할 배열의 초기 위치
private final int end; // 이 서브테스크에서 처리할 배열의 최종 위치
public static final long THRESHOLD = 10_000; // 이 값 이하의 서브테스크는 더 이상 분할할 수 없음
// 메인 테스크를 생성할 때 사용할 공개 연산자
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
// 메인 테스크의 서브테크스를 재귀적으로 만들 때 사용할 비공개 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
// RecursiveTask 추상 메서드
@Override
protected Long compute() {
// 이 테스크에서 더할 배열의 길이
int length = end - start;
// 기준값과 같거나 작으면 순차적으로 결과를 계산
if(length <= THRESHOLD) {
return computeSequentially();
}
// 배열의 첫 번째 절반을 더하도록 서브테스크를 생성
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length/2);
// ForkJoinPool 다른 스레드로 새로 생성한 테스크를 비동기로 실행
leftTask.fork();
// 배열의 나머지 절반을 더하도록 서브테스크 생성
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length/2, end);
// 두 번째 서브테스크를 동시 실행, 이떄 추가로 분할이 일어날 수 있음
Long rightResult = rightTask.compute();
// 첫 번째 서브테스크의 결과를 읽거나 없다면 기다림
Long leftResult = leftTask.join();
// 두 테스크의 결과를 조합한 값이 이 테스크의 결과값이 되고 이를 반환
return leftResult + rightResult;
}
// 분할할 수 없을 때 서브테스크의 결과를 계산하는 단순한 알고리즘
private long computeSequentially() {
long sum = 0;
for(int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
// ForkJoinSumCalculator 생성자로 원하는 수의 배열을 넘겨줄 수 있음
public static long forkJoinSum(long n) {
long[] numbers = LongStream.range(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
// invoke 메서드의 반환값은 ForkJoinSumCalculator에서 정의한 테스크의 결과가 됨
}
}
ForkJoinSumCalculator의 실행
compute
메서드를 실행하며 작업 수행compute
메서드는 병렬로 실행할 만큼 테스크가 작아졌는지 확인하며, 테스크의 크기가 크다고 판단하면 숫자 배열을 반으로 분할해서 두 개의 새로운 ForkJoinSumCalculator로 할당포크/조인 프레임워크 제대로 사용하는 방법
compute, fork
메서드를 직접 호출할 수 있음invoke
사용작업 훔치기
Spliterator 인터페이스