스트림으로 데이터 컬렉션 관련 동작을 손쉽게 병렬 처리할 수 있다.
병렬 스트림
병렬 스트림은 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.
ParallelStream을 호출하여 병렬 스트림을 생성할 수 있다.
순차 스트림을 병렬 스트림으로 변환하기 (ParallelStream 적용)
@Test
@Description("1~n까지 숫자의 합을 구하기(직렬 & 병렬)")
void parallelStreamTest7_1() throws Exception{
// given
int n = 100;
Long sequentialResult = Stream.iterate(1L, i->i+1)
.limit(n)
.reduce(0L, Long::sum);
// when
Long parallelResult = Stream.iterate(1L, i->i+1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
// then
System.out.println(sequentialResult);
assertEquals(sequentialResult, parallelResult);
}
반대로 sequential 메서드로 병렬을 직렬화 할 수 있다. (만약 parallel(), sequential() 메소드가 같이 존재한다면, 가장 마지막에 있는 메소드가 최종적으로 적용됨)
스트림의 parallel 메서드는 내부적으로 ForkJoinPool을 사용하는데 ForkJoinPool의 프로세서 수는 Runtime.getRuntime().availableProcessors() 에 상응하는 스레드를 갖음으로 기본값을 그대로 사용하는 걸 권장한다.
성능 측정
JMH 자바 마이크로벤치마크하니스 라이브러리를 이용해서 성능체크를 할 수 있다.
Gradle로 JMH를 적용해보자
1) 라이브러리 추가 및 jmh 설정값 세팅
plugins {
...
id "me.champeau.gradle.jmh" version "0.5.2"
}
...
dependencies {
...
jmh group: 'org.openjdk.jmh', name: 'jmh-core', version: '1.25.2'
jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.25.2'
}
jmh {
fork = 1
iterations = 1
warmupIterations = 1
duplicateClassesStrategy = 'warn' //복제 중 중복 파일 존재시 warn 경고 처리
}
jmh 설정을 통해 jmh 빌드 시간을 단축 시킬 수 있다.
2) 성능 체크할 클래스 파일들을 jmh 폴더 아래로 이동
프로젝트 구조 : src > jmh > java > DemoApplication
@SpringBootApplication
public class DemoApplication {
@Benchmark
public void testBenchmark(){
}
public static void main(String[] args) throws RunnerException{
Options options = new OptionsBuilder()
.include(DemoApplication.class.getSimpleName())
.forks(1)
.build();
new Runner(options).run();
}
}
※ github.com/melix/jmh-gradle-plugin 참고
1부터 n까지 총합 구하기 순차적 처리 vs 병렬적 처리 성능 비교
package com.example.demo;
import org.openjdk.jmh.annotations.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@BenchmarkMode(Mode.AverageTime) //벤치마크 대상 메서드를 실행하는데 걸리는 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS)//밀리초 단위로 출력
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})//4Gb의 힙공간을 제공한 환경에서 2번 벤치마크를 수행한 결과의 신뢰성 확보
@State(Scope.Benchmark) // 테스트 argument의 상태를 지정할 수 있음 (scope.benchmark : 동일한 테스트 내 모든 스레드에서 동일한 인스턴스 공유)
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark //밴치마크 대상 메서드
public long sequentialSum(){
return Stream.iterate(1L, i -> i+1).limit(N)
.reduce(0L, Long::sum);
}
@Benchmark
public long interativeSum(){
long result = 0;
for (long i = 0L; i < N; i++) {
result += i;
}
return result;
}
@Benchmark
public long parallelSum(){
return Stream.iterate(1L, i -> i+1).limit(N)
.parallel()
.reduce(0L, Long::sum);
}
@TearDown(Level.Invocation)//매번 밴치마크를 실행한 다음에는 가비지 컬렉터 동작 시도
public void tearDown(){
System.gc();
}
}
[결과]
Benchmark | Mode | Cnt | Score | Error | Units |
ParallelStreamBenchmark.interativeSum | avgt | 4.143 | ms/op | ||
ParallelStreamBenchmark.parallelSum | avgt | 525.138 | ms/op | ||
ParallelStreamBenchmark.sequentialSum | avgt | 140.287 | ms/op |
일반 스트림보다 기본형 특화 스트림(IntStream, LongStream, DoubleStream)을 사용하는 것이 성능에 더 좋다.
> 박싱, 언박싱을 할 경우 성능 저하 발생
항상 병렬처리가 성능이 좋은 것은 아니다.
> 반복 작업의 경우 병렬로 작업을 독립시키기 어려움으로 성능 저하 발생 (함수의 입력값이 i+1로 iterate 연산을 청크로 분할하기 어려움)
> iterate 연산으로 전체 숫자 리스트가 준비되지 않는 경우 병렬 처리시 오히려 성능이 나빠짐
성능을 저하시킬 수 있는 부분 제거 후 병렬처리
@Benchmark
public long parallelRangedSum(){
return LongStream.range(1, N)
.parallel()
.reduce(0L, Long::sum);
}
[결과]
Benchmark | Mode | Cnt | Score | Error | Units |
ParallelStreamBenchmark.parallelRangedSum | avgt | 7.462 | ms/op |
이전 결과에 비해 속도가 많이 개선되었지만, 병렬화를 이용하려면 스트림을 재귀적으로 분할하고 서브스트림을 서로 다른 스레드에서 리듀싱 연산을 수행하고 결과들을 하나로 합쳐야 됨으로 병렬화 자체의 비용이 존재함을 잊지 말아야 한다.
병렬 스트림의 올바른 사용법
1. 공유되는 가변 상태의 변수 사용은 피해야 한다.
2. 성능에 대한 확신이 안들면 무조건 직접 측정해야 한다.
> 벤치마크로 성능 체크 필요
3. 박싱을 주의해야 한다.
>기본형 특화 스트림 이용
4. 순차보다 병렬 스트림이 성능을 저하시키는 연산이 존재한다.
> 순서에 의존하는 연산 사용을 자제 (findAny, unordered 호출 후 limit 등으로 변경)
5. 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려해야 한다.
> 처리해야 할 요소 수(n), 하나의 요소를 처리하는 데 드는 비용(Q)을 고려하여 Q가 높다면 병렬처리시 성능 개선 가능성이 높음
6. 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
7. 스트림을 구성하는 자료구조가 적절한지 확인하라.
8. 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라진다.
> 만약 필터 연산이 있다면, 스트림의 길이를 예측할 수 없음으로 효과적으로 병렬처리할 수 없음
9. 최종 연산 병합 과정의 비용을 확인해야 한다.
포크/조인 프레임워크
포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 뒤 서브 테스트의 결과들을 합쳐 전체 결과를 얻을 수 있다.
1) RecursiveTask 활용
recursiveTask를 사용하려면 compute메서드를 구현해야 한다. compute메서드는 서브태스크로 분할하는 로직과 분할할 수 없는 서브 태스크의 결과를 정의하는 메서드이다.
public static long measureSumPerf(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for(int i = 0; i < 100; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime() - start) / 1_000_000;
// System.out.println("Result: " + sum);
if(duration < fastest) fastest = duration;
}
return fastest;
}
package com.example.demo;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers; //더할 숫자 배열
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
//메인태스크의 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성
private ForkJoinSumCalculator(long[] numbers, int start, int end){
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start; // 해당 태스크에서 더할 배열의 길이
if(length <= THRESHOLD){
return computeSequqntially(); //기준값과 같거나 작으면 순차적으로 결과 계산
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start+length/2);
leftTask.fork(); //ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start+length/2, end);
Long rightResult = rightTask.compute(); //동기 실행 (이때 추가 분할이 일어날 수 있음)
Long leftResult = leftTask.join(); //서브태스크의 결과를 읽거나, 결과가 없다면 기다린다. (비동기로 실행했음으로)
return leftResult+rightResult;
}
private long computeSequqntially(){ //더 분할할 수 없을때 서브태스크의 결과를 계산하는 단순한 알고리즘
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
public static long forkJoinSum(long n){
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
}
System.out.println("ForkJoin sum done in : "+measureSumPerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000)+" msecs");
연산 시간을 출력해보면 41 mesc 로 병렬스트림을 활용했을 때보다 성능이 나빠짐을 확인할 수 있다.
이유는 전체스트림을 long[]으로 변환했기 때문이다.
포크/조인 프레임워크를 제대로 사용하는 방법
1. ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다.
> compute나 fork 메서드를 직접 호출
2. 서브태스크에 fork 메서드 호출을 통해 ForkJoinPool 일정을 조절할 수 있다.
3. 포크/조인 프레임워크의 병렬 계산은 디버깅하기 어렵다.
4. 멀티코어에서 포크/조인 프레임워크를 사용하는 것이 무조건 빠를 거라고 생각하지 말아야 한다.
포크/조인 프레임워크 작업 훔치기
일반적으로 코어 개수만큼 태스크를 분할한 뒤 모든 CPU 코어에서 분할된 태스크를 실행하여 같은 크기의 태스크는 동일하게 작업이 끝날 것이라 생각하지만 실제론 그렇지 않다. 실제론 작업완료 시간이 크게 달라질 수도 있다.
이 문제를 해결하기 위한 방법으론 작업 훔치기가 있다.
작업 훔치기는 모든 스레드를 거의 동일하게 분할한다. 각각의 스레드는 자신에게 할당된 태스크들을 포함한 이중 연결 리스트를 참조하여 큐의 헤드에서 태스크를 가져와 작업을 진행한다. 이때 작업을 완료하여 할일이 없는 스레드는 유효 상태가 아닌 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다. 모든 작업이 끝날때까지 반복한다.
그러므로 분할된 스레드의 종료시간이 비슷해질 수 있다.
Spliterator 인터페이스
iterator 처럼 소스의 요서 탐색 기능을 제공한다. 단, spliterator는 병렬 작업에 특화되어 있다.
1) tryAdvance 메서드 : 요소를 하나씩 순차적으로 소비한 뒤 탐색할 요소가 남아있으면 true 반환 (iterator 동일)
2) trySplit 메서드 : 일부 요소를 분할해서 두번째 spliterator를 생성
3) estimateSize 메서드 : 탐색해야 할 요소 수를 제공
4) characteristics 메서드 : spliterator 자체의 특성 집할을 포함하는 int를 반환
단어 개수 구하기 예제
package com.example.demo;
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
//문자열의 문자를 하나씩 탐색
public WordCounter accumulate(Character c){
if(Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
return lastSpace ? new WordCounter(counter + 1, false) : this;
}
}
public WordCounter combine(WordCounter wordCounter){
return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
}
public int getCounter(){
return counter;
}
}
private int countWords(Stream<Character> stream){
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
1. 함수형으로 단어 개수 구하기 (직렬)
@Test
@Description("함수형으로 단어 개수 구하기")
void parallelStreamTest7_5() throws Exception{
// given
String SENTENCE =
"Nel mezzo del cammin di nostra vita "+
"mi ritrovai in una selva oscura "+
"ch la dritta via era smarrita ";
// when
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
// then
System.out.println("Found "+ countWords(stream) + " words");
}
2. 함수형으로 단어 개수 구하기 (병렬 - parallel)
@Test
@Description("함수형으로 단어 개수 구하기 (parallel로 병렬처리)")
void parallelStreamTest7_6() throws Exception{
// given
String SENTENCE =
"Nel mezzo del cammin di nostra vita "+
"mi ritrovai in una selva oscura "+
"ch la dritta via era smarrita ";
// when
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
// then
System.out.println("Found "+ countWords(stream.parallel()) + " words");
}
결과 : 40 (실제 단어 개수 - 19)
-> 반복형 대신 함수형으로 구현함으로 직접 스레드를 동기화하지 않고 병렬스트림으로 작업할 수 있다.
하지만 임의의 위치에서 분할됨으로 결과값이 틀릴 수 있다.
3. 함수형으로 단어 개수 구하기 - Spliterator 인터페이스를 커스텀하여 단어가 끝나는 위치에서만 분할하도록 정의
package com.example.demo;
import java.util.Spliterator;
import java.util.function.Consumer;
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator (String string){
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++)); // 현재 문자 소비
return currentChar < string.length(); //소비할 문자가 남아있으면 true 반환
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if(currentSize < 10){
return null; // 분할할 문자열을 순차적으로 처리할 만큼 작아졌음을 알리는 null 반환
}
//파싱할 문자열의 중간을 분할 위치로 설정
for (int splitPos = currentSize/2 + currentChar; splitPos < string.length(); splitPos++ ){
//다음 공백일 나올 떄까지 분할 위치를 뒤로 이동
if (Character.isWhitespace(string.charAt(splitPos))){
//처음부터 분할 위치까지 문자열을 파싱할 새로은 spliterator 생성
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
//시작 위치를 분할 위치로 설정
currentChar = splitPos;
//공백을 찾았고 문자열을 분리했으므로 루프 종
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED
+ SIZED
+ SUBSIZED
+ NONNULL // null 존재 X
+ IMMUTABLE // 문자열 자체가 분변 클래스이므로 문자열을 파싱하면서 속성이 추가되지 않음
;
}
}
@Test
@Description("함수형으로 단어 개수 구하기 (병렬 처리 : 임의의 위치가 아닌 단어가 끝나는 위치에서만 분할)")
void parallelStreamTest7_7() throws Exception{
// given
String SENTENCE =
"Nel mezzo del cammin di nostra vita "+
"mi ritrovai in una selva oscura "+
"ch la dritta via era smarrita ";
// when
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
//두번째 인자를 true 넘김으로 병렬 스트림 생성
Stream<Character> stream = StreamSupport.stream(spliterator, true);
// then
System.out.println("Found "+ countWords(stream) + " words");
}
결과 : 19개
#참고서적
모던 자바 인 액션 - 저 : 라울-게이브리얼 우르마, 마리오 푸스코, 앨런 마이크로프트
- 출판사 : 한빛미디어
- 발행 : 2019년 08월 01일
'IT > 자바' 카테고리의 다른 글
[Effective Java] Item 1 생성자 대신 정적 팩토리 메서드를 고려하라 (0) | 2021.07.11 |
---|---|
Chapter 8 컬렉션 API 개선 (0) | 2020.10.04 |
Chapter6 스트림으로 데이터 수집 (0) | 2020.09.20 |
Chapter3 람다 표현식 (0) | 2020.09.07 |
Chapter2 동작 파라미터화 코드 전달하기 (0) | 2020.08.30 |