이번 포스팅에서는 지금까지 배웠던 RxJS 옵저버블의 기본기를 바탕으로 중첩된 옵저버블 또는 스트림 안의 스트림과 이들의 비동기 제어를 하기 위한 기능을 살펴보려고 한다.
앞선 포스팅에서 옵저버블이 시간에 따른 이벤트 시퀀스라는 개념을 다졌고, 이 옵저버블은 이벤트가 전달되고 변형되는 오케스트레이터로 생각할 수가 있다. 지금까지는 옵저버블 시퀀스를 대부분 독립적으로 처리하는 방법을 이야기 했으며, 이벤트가 언제 방출되든 상관없이 배열과 같은 방식으로 옵저버블의 모든 이벤트에 연산자를 적용하였다. 그리고 combineLatest()와 같은 RxJS 연산자를 사용하면 한 스트림에서 이벤트가 전파되어 다른 곳에서 반응을 일으킬 수 있기 때문에 RxJS와 반응형 패러다임이 빛을 발함을 알 수가 있었다.
이번 포스팅에서는 merge(), switch(), concat()와 같은 연산자들로 실제 문제들을 조금 더 정교하게 해결해 보려고 한다. 어플리케이션의 복잡도에 따라 로직에는 여러 옵저버블 간의 상호 작용이 수반되며, 이 때 옵저버블에는 다양한 타입의 데이터가 포함될 수 있다. 이를 올바르게 다루기 위해 데이터를 단일 소스로 운반하는 다른 옵저버블에 매핑해야 하는 필요성에 기인한 데이터 타입의 평탄화(flattening a data type)라는 FP의 중요한 원칙에 중점을 둔다.
하나를 위한 모두, 모두를 위한 하나
옵저버블에 시간을 추가함으로써 이벤트가 보내진 시각을 기준으로 지연, 필터링, 조작할 수 있는 새로운 차원이 도입되었다. 다양한 타입의 연산자와 셀렉터 함수로 이벤트가 다운스트림의 생성 시간과 방법을 결정하므로 스트림 내의 데이터가 변환되는 방식을 비즈니스 로직이 완전히 제어할 수 있다. 지금까지는 단일 스트림에 집중했지만, 이 장에서는 연산자 사용을 좀 더 확장하여 복잡한 흐름을 만들어 이를 단일 흐름으로 결합하는 기법을 알아본다.
다중 스트림 시나리오는 실제 환경에서 흔히 볼 수 있다. 코드에 비즈니스 로직을 많이 주입할 수록 복잡도가 커지므로 스트림이 얽히게 된다. 이럴 때 중첩된 콜백 함수에만 의존하거나 오로지 Promise에 의존하는 비동기 흐름을 처리하는 시스템의 기하급수적인 복잡성 증가보다 이러한 다중 스트림 시나리오가 유지 관리에 더욱 더 수월하다.
반응형 선언문
반응형 선언문(Reactive Manifesto)에 따르면 반응형 디자인의 핵심 원칙 중 하나는 탄력성이다. 탄력적인 시스템은 다양한 작업 부하에서도 반응형 특성을 유지한다. RxJS는 코드의 작업 방식을 재작성하거나 리팩터링하지 않아도 다양한 입력 속도를 가진 여러 데이터 소스를 다른 방식으로 결합할 수 있기 때문에 반응형 프로그램을 훌륭하게 구현한다.
터치 이벤트를 예를 들어 생각해 보자. 어플리케이션에 터치 지원을 추가하려면 이벤트와 로직 두 세트를 도입해야 한다. 만약 적절한 디자인이 없다면 이를 위해 완전히 새로운 이벤트 핸들러를 만들어야 한다. 그런데 만약 반응형으로 생각을 해 본다면 이들 모두는 같은 채널을 지나는 다른 스트림에 지나지 않는다. 마우스를 사용하든지, 터치를 사용하든지 키 입력과 다른 HTTP 호출, 시간 간격, 애니메이션 등 복잡한 UI와 결합해야 하는 이벤트를 구동하는 스트림은 대부분 이러한 방식으로 작동한다.
마우스 이벤트에 mousedown, mouseup, mousemove가 있듯이, 터치 이벤트에는 touchstart, touchend, touchmove가 있다. 이 말은 마우스 이벤트와 마찬가지로 터치 이벤트에서 동일한 동작을 시연하고 모바일 브라우저에서 코드를 장동하게 하려면 새로운 스트림을 최소 3개 만들어야 한다는 뜻이다. 예를 들면 touchend는 mouseup과 같으므로 다음과 같이 touchend 이벤트를 사용할 수 있다.
const mouseUp$ = Rx.Observable.fromEvent(document, 'mouseup');
const touchEnd$ = Rx.Observable.fromEvent(document, 'touchend');
mouseUp$.subscribe(/* 마우스 클릭 처리 */);
touchEnd$.subscribe(/* 터치 처리 */);
하지만 이 코드는 바람직하지 않다. 동일한 코드일 가능성이 큰 구독 영역이 두 개 존재하며 이 둘 사이에서 공유되어야 하는 모든 코드는 외부 공유 상태가 필요하다. 그리고 두 개의 구독을 추적해야 해서 잠재적인 메모리 누수 영역이 하나 더 생기게 된다. 따라서 아래처럼 두 스트림의 동기화를 걱정할 필요 없이 단일 코드 블록으로 두 구독을 모두 관리할 수 있다면 훨씬 효율적이다.
다중 스트림을 하나로 결합하고 단일 옵저버로 모든 스트림을 처리하는 방법에는 여러 가지가 있다.
- 스트림을 병합하여 이벤트 인터리빙(interleaving)하기
- 스트림을 연결하여 이벤트 순서 유지하기
- 최신 스트림 데이터로 전환하기
스트림을 병합하여 이벤트 인터리빙하기
다중 스트림을 결합하는 가장 간단한 연산자는 merge()이며 이해하기가 쉽다. 이 연산자를 사용하는 목적은 깔때기처럼 다중 스트림의 이벤트를 도착한 순서대로 하나의 옵저버블에 전달하는 데 있다. merge()는 두 이벤트에서 OR 연산을 수행하는 것과 같다.
const source1$ = Rx.Observable.interval(1000)
.map(x => `Source 1 ${x}`)
.take(3);
const source2$ = Rx.Observable.interval(1000)
.map(x => `Source 2 ${x}`)
.take(3);
let source0$ = Rx.Observable
merge(source0$, source1$, source2$).subscribe(console.log);
merge()에서 생성된 결과 스트림은 1초마다 source1과 source2의 값을 교대로 방출한다. merge()는 생성 연산자로 볼 수 있는데, 둘 이상의 옵저버블을 조합하여 새 스트림을 만든다. 인스턴스 형태로 사용할 때 옵저버블이 소스 옵저버블에 매핑되거나 투영되었다고 말한다.
두 스트림 사이에서 merge() 연산자가 정적 형식일 때와 인스턴스 형식일 때 다음과 같이 차이를 확인해 볼 수 있다.
// 정적 형식으로 두 스트림을 결합한 코드
Rx.Observable.merge(mouseUp$, touchEnd$).subscribe(/* 단일 옵저버가 두 이벤트 중 하나를 소비함 */);
// 인스턴스 형식으로 두 스트림을 결합한 코드
mouseUp$.merge(touchEnd$).subscribe(/* 단일 옵저버가 두 이벤트 중 하나를 소비함 */);
병합 스트림에 순서가 있을 때 고려해야 할 점은, merge() 연산자는 이벤트가 수신된 순서대로 각 스트림의 이벤트를 인터리빙한다. 그리고 각 스트림은 독립적이지만 시퀀스의 전체 출력에 영향을 준다. 타입이 좀 더 정적으로 정해진 언어에서 컴파일러는 병합할 수 있는 타입을 종종 제한한다. 이런 경우 예측 가능한 타입이 된다. 자바스크립트는 이러한 제약조건이 없어서 호환되지 않은 타입을 병합하기는 쉽지만 유연한 장점을 가진 반면 스트림 중간에 예상하지 못한 오류가 발생할 수 있다.
조금 다른 주제이지만, 함수형 프로그래밍을 사용할 때 if/else와 같은 명령문은 피해야 한다. 각 이벤트는 다른 이벤트와 호환이 되어야 하고 모든 이벤트에서 방출되는 데이터를 동일한 옵저버 코드에서 소비하려면 최소한 동일한 프로토콜이나 구조를 따라야 한다. RxJS를 사용하면 for/of, if/else 등의 명령을 map(), filter() 조합으로 대체할 수 있다.
merge()에서 기억해야 할 점은 병합된 옵저버블에서 메모리에 현존하는 모든 데이터를 방출한다는 점이다. 인터리빙은 interval()이나 마우스 움직임 같이 이벤트가 비동기로 도착할 때 발생하지만, 데이터를 동기로 불러오면 다음 방출 전에 하나의 전체 스트림을 방출한다.
const source1$ = Rx.Observable.of(1, 2, 3);
const source2$ = Rx.Observable.of('a', 'b', 'c');
Rx.Observable.merge(source1$, source2$).subscribe(console.log);
여기서 merge()가 숫자와 문자를 번갈아 가면서 진행한다고 생각할 수 있지만, merge()는 모든 숫자 다음에 모든 문자를 반복한다. 이는 데이터가 동기로 방출되기 때문이다.
스트림을 연결하여 이벤트 순서 유지하기
merge() 연산자는 소스 스트림에서 이벤트를 받는 순서대로 모든 옵저버블 데이터를 출력하는 방식을 사용한다. 그러나 어떤 시나리오에서는 인터리빙 하는 대신 옵저버블 시퀀스 전체의 순서를 유지하는데 관심이 있을 수 있다. 즉 옵저버블 두 개가 있을 때 하나에서 모든 이벤트를 받은 다음에 그 다음 옵저버블에서 이벤트를 받는 식인 것이다. 이러한 유형의 작업을 두 스트림의 연결(concatenation)이라고 한다.
const source$ = concat(...streams);
merge() 연산자는 모든 소스 옵저버블을 즉시 구독할 수 있지만, concat() 연산자는 한 번에 하나씩 옵저버블만 구독할 수 있다. concat()은 각 기본 스트림에 대한 구독을 계속 관리하지만, 한 번에 하나의 구독만 보유하고 다음 구독 전에 순서대로 처리한다.
const source1$ = Rx.Observable.range(1, 3).delay(3000);
const source2$ = Rx.Observable.of('a', 'b', 'c');
const result = Rx.Observable.concat(source1$, source2$);
result.subscribe(console.log);
위의 예제에서 첫 번째 스트림이 3초 지연되므로 a,b,c가 먼저 방출 된다고 예상하지만 concat은 순서를 유지하기 때문에 다음 스트림을 추가하기 전에 첫 번째 스트림을 완료한다.
만약에 마우스 이벤트 스트림과 같이 무한 스트림을 먼저 놓고 그 다음 스트림을 연결(concat)하면 첫 번째 스트림이 끝나지 않기 때문에 두 번째 스트림을 옵저버가 절대 볼 수 없는 상황이 발생할 수도 있다. 즉, 정상적인 상황에서는 마우스 이벤트 옵저버블만 처리한다. 이런 경우 연결된 옵저버블을 보려면 파이프라인의 어떤 지점에서 해당 스트림을 종료하거나 취소해야 한다. 예를 들면 take() 메서드를 사용하여 처음 몇 개의 이벤트만 취하는 식으로 할 수 있다.
최신 옵저버블 데이터로 전환하기
병합과 연결 모두 입력 스트림 데이터를 파이프라인으로 전달한다. 만약 새로운 시퀀스가 방출될 때 첫 번째 시퀀스 취소하기 처럼 다른 동작을 원한다고 가정하면 다음과 같이 코드를 작성해볼 수 있다.
Rx.Observable.fromEvent(document, 'click') // 페이지에서 클릭 이벤트를 듣는다.
.map(click => Rx.Observable.range(1, 3)) // 다른 옵저버블을 소스 옵저버블에 매핑한다.
.switch() // switch를 사용하여 매핑된 옵저버블에서 데이터를 방출한다.
.subscribe(console.log)
이 코드를 실행하면 마우스 클릭 후 숫자 1, 2, 3이 출력된다. 클릭 이벤트가 발생하면 클릭 이벤트는 취소되고 숫자 1에서 3까지 출력하는 다른 옵저버블로 대체되는 것이다. 이벤트가 전환되지만 구독자는 이 부분을 보지 못한다.
switch()는 검색 스트림에 적합한 연산자이다. 검색 스트림에 적합한 이유는 keyup 이벤트가 곧바로 실행되어서 옵저버에 데이터를 주입할 필요가 없기 때문이다. 옵저버블에 소스를 매핑할 때 switch()는 새 구독이 들어오면 이전 구독을 취소한다. 이러한 방식으로 다운스트림 옵저버블은 오래되었거나 더 이상 관심 없는 연산자를 제거하여 항상 최신 데이터를 취할 수 있다.
세 가지 연산자 merge(), concat(), switch()는 각각 다른 방법으로 옵저버블 데이터를 평탄화하여 그룹화했다. 이와 별개로 중첩 옵저버블을 처리하는 고차 옵저버블이 있다. 중첩 옵저버블을 처리하는 방법은 다음 포스팅에서 이어서 다뤄보려고 한다.
참고자료
- <RxJS 반응형 프로그래밍> 폴 대니얼스, 루이스 아텐시오 저
'Prog. Langs & Tools > RxJS' 카테고리의 다른 글
[RxJS] Ch07. 반응형 스트림 적용하기 - 하 (0) | 2021.04.26 |
---|---|
[RxJS] Ch05. RxJS에서의 시간 - 하 (0) | 2021.04.05 |
[RxJS] Ch04. RxJS에서의 시간 - 상 (0) | 2020.12.17 |
[RxJS] Ch03. 핵심 연산자 (0) | 2020.11.23 |
[RxJS] Ch02. RxJS로 배우는 반응형 프로그래밍 (0) | 2020.10.30 |