본문 바로가기

Prog. Langs & Tools/RxJS

[RxJS] Ch07. 반응형 스트림 적용하기 - 하

지난 포스팅에 이어서 반응형 스트림의 중첩 옵저버블 처리 및 비동기 스트림에 대해 다루어 보려고 한다.

 

중첩 옵저버블 처리

우리는 지난 포스팅에서 스트림의 출력을 동시에 하나로 결합하는 방법을 배웠다. 조합하는 방법에 따라 옵저버블 자체에서 다른 옵저버블을 방출하기도 한다. 아래와 같은 상황을 중첩 옵저버블 구조라고 한다.

중첩 옵저버블은 특정 동작 때문에 결과를 소스 옵저버블로 반환해야 하는 후속 비동기 작업을 유발하거나 시작하는 경우에 유용하다. 지금까지는 스칼라 값을 반환하는 함수를 주로 매핑하였으나, 중첩 옵저버블은 매핑된 함수가 다른 옵저버블에 매핑된 옵저버블 또는 옵저버블의 옵저버블을 반환하는 상황을 나타낼 때 사용한다. 이러한 상황은 함수형 프로그래밍에서 빈번하게 발생하는데, 그 이유는 map() 프로토콜이 구조 보존(structure preserving) 함수이기 때문이다.

옵저버블을 실제 시나리오에서 어떻게 사용하는지 살펴보자. 데이터를 스트리밍하는 검색창을 구현할 때 사용한 코드이다.

const search$ = Rx.Observable.fromEvent(inputText, 'keyup')
  ...
  .map(query => sendRequest(testData, query))
  .subscribe(...)

만약 여기에서 sendRequest()가 실제 옵저버블을 반환했다면 Observable<Observable<Array>> 타입이 생성된다. 따라서 구독자는 그 안의 Observable<Array>를 직접 처리해 주어야 한다. 이는 바람직한 동작이 아니다. 위의 search$는 중첩 옵저버블이다. 그런데 옵저버는 래핑된 옵저버블 값의 계층에 반응해서는 안된다. 반응하게 되면 불필요한 노출 또는 적절한 캡슐화의 결핍이 된다. 이들은 모든 비즈니스 규칙을 적용하여 얻은 기본 데이터를 항상 수신해야 하며, 따라서 이 계층을 어떻게든 단일 계층으로 평탄화 하거나 풀어야 한다.

이는 search$가 구독자에게 간단한 검색 결과 스트림으로 표시되게 스트림을 병합할 수 있다. 이렇게 하려면 mergeMap() 연산자를 알아야 하는데, mergeMap() 연산자는 내부 옵저버블을 다시 단일 옵저버블 구조로 압축하는 추가 로직을 가지고 있다. 

const search$ = Rx.Observable.fromEvent(inputText, 'keyup')
  ...
  .mergeMap(query => sendRequest(testData, query)) // 결과를 병합하고 query로 생성된 옵저버블 값으로 전환한다.
  .subscribe(...)

 

 

객체 지향 개발 경험만 있다면 데이터 구조를 평탄화 하는 작업은 익숙하지 않을 수 있다. 이는 옵저버블이 옵저버블을 통해 흐르는 데이터를 관리하고 제어하기 때문이다. 이 개념은 데이터의 컨테이너화라고 한다. 이로 인해 가질 수 있는 장점은 앞서도 여러 차례 언급하였고 다음과 같은 것들이 있다.

  • 불변성 및 사이드 이펙트가 없는 코드 유지
  • 많은 이벤트 유형을 원활하게 지원하기 위한 데이터 처리 추상화
  • 선언적으로 연산자 체인을 만들기 위한 고차함수 사용

옵저버블 평탄화 개념은 중첩 옵저버블에서 값을 추출하고 중첩 구조를 통합하여 사용자가 한 수준만 보게 하는 것을 의미한다. 매핑과 평탄화의 비교는 아래의 그림을 통해 이해할 수 있다.

 

비동기 스트림

실시간으로 변화하는 주식의 주가 같은 데이터를 종목 별로 가져오려고 할 때 비동기 스트림을 사용한다.

예를 들면 페이스북(티커: FB)의 주가를 가져온다고 할 때 위의 그림과 같이 Promise 기반 ajax 호출을 연결한다. API에서 생성된 CSV 문자열은 정리하여 하나의 문자열로 파싱해야 한다. 여기서는 원격 서비스에서 데이터를 가져와 결과를 게시할 수 있는 옵저버블 반환 함수를 생성한다. 그리고 이 함수는 실시간으로 피드를 폴링하기 위해 2초 간격으로 실행되는 함수와 결합한다.

const csv = str => str.split(/,\s*/); // CSV 문자열에서 배열을 만드는 헬퍼 함수

const webservice = ... // 주가 정보를 받아오는 API 링크

const requestQuote$ = symbol =>
  Rx.Observable.fromPromise(
    ajax(webservice.replace(/\$symbol/, symbol))) // Promise 기반 ajax()로 서비스 조회
    .map(res => res.replace(/"/g, ''))
    .do(console.log)
    .map(csv); // 출력을 정리하고 파싱

const twoSecond$ = Rx.Observable.interval(2000);

// 한 스트림을 다른 스트림에 매핑하기
const fetchDataInterval$ = symbol => twoSecond$
  .mergeMap(() => requestQuote$(symbol));

두 개의 독립적인 스트림을 결합하거나 매핑을 할 때 mergeMap()으로 이를 수행할 수 있다. fetchDataInterval$은 종목 코드를 가져와 2초마다 주가 데이터를 요청하는 스트림을 만드는 함수를 생성한다. 근본적으로 mergeMap()의 의미는 매핑된 옵저버블을 병합하여 스트림(맵)의 특성을 변환하는 것이다. 이 연산자는 여러 수준의 옵저버블을 포함하는 복잡한 비즈니스 프로세스를 조율하는 데 사용할 수 있어서 매우 유용하다.

여기에 추가적으로 distinchUntilChanged()라는 필터링 연산자를 추가하면, 주가 변동이 없을 때 DOM을 갱신하지 않아 주가의 변경 사항이 감지될 때만 스트림이 옵저버에 전이 되게 된다. 

이렇게 되면 단순히 2초마다 처리하는 것보다 더 최적화가 이루어졌음을 알 수 있다.

자주 사용되는 조인 연산자에는 다음과 같은 것들이 있다.

  • mergeMap() : 함수를 반환하는 옵저버블을 소스 옵저버블의 각 항목 값에 매핑하고 출력 옵저버블을 평탄화한다. (map() ... merge())
  • concatMap() : mergeMap()과 비슷하게 병합이 연속적으로 발생한다. 즉, 각 옵저버블은 이전 옵저버블이 완료될 때까지 대기한다. (map() ... concatAll())
  • switchMap() : mergeMap()과 비슷하지만, 최근에 매핑된 옵저버블 값만 표시한다. 즉, 가장 최근 값을 방출하는 매핑된 옵저버블로 전환하여 이전의 내부 옵저버블을 취소한다. (map() ... switch())

 

concatMap으로 드래그 앤 드롭하기

드래그 앤 드롭을 구현하려면 세 가지 유형의 마우스 이벤트를 식별해야 한다. 첫 번째는 드래그의 시작인 mousedown, 두번째는 마우스 버튼을 떼는 mouseup, 마지막으로 드래그를 추적하려면 mousemove 이다. 각각은 스트림을 모델링을 하면 된다. 드래그는 사용자가 마우스 버튼을 클릭하면 시작되고, 사용자가 마우스 버튼을 놓거나 mouseup 이벤트가 발생하면 멈춘다.

웹 페이지에서 특정 컨텐츠를 드래그하는 예제 코드는 다음과 같다. 모든 부가 작용이 옵저버에 전달되므로 추상화 수준이 높다.

const panel = document.querySelector('#dragTarget'); // 드래그하려는 패널 또는 대상에 대한 참조
const mouseDown$ = Rx.Observable.fromEvent(panel, 'mousedown'); // 패널 또는 대상에 대한 mousedown 이벤트의 옵저버블
const mouseUp$ = Rx.Observable.fromEvent(panel, 'mouseup'); // 패널 또는 대상에 대한 mouseup 이벤트의 옵저버블
const mouseMove$ = Rx.Observable.fromEvent(panel, 'mousemove'); // 패널 또는 대상에 대한 mousemove 이벤트의 옵저버블

const drag$ = mouseDown$.concatMap(() => mouseMove$.takeUntil(mouseUp$));

drag$.subscribe(event => {
  panel.style.left = event.clientX + 'px';
  panel.style.top = event.clientY + 'px';
});

여기에서 사용된 takeUntil()이라는 take()의 변형 연산자는 노티파이어(notifier) 옵저버블이 값을 방출할 때까지 소스 옵저버블이 값을 방출하게 한다. 노티파이어 옵저버블의 개념은 RxJS에서 자주 사용되는데, 어떤 종류의 상호 작용을 시작하거나 중지하는 신호로 사용된다. 여기서는 mouseup 이벤트가 발생할 때까지 mousedown 이벤트와 연결된 모든 mousemove 이벤트를 기져온다.

터치 인터페이스로 드래그를 구현하는 경우 스트림 선언에서 이벤트 이름을 touchmove, touchstart, touctend로 변경하기만 하면 된다. 비즈니스 로직은 동일하게 유지되며 코드 또한 정확히 동일하게 작동한다.

concatAll()이 동작하는 로직은 아래의 그림을 참고하자.

 

이번 포스팅에서는 옵저버블을 중첩하는 방법, 결과를 평탄화 하여 옵저버에 적절한 데이터를 전달하는 방법, 그리고 복잡한 어플리케이션을 제작할 때 중첩되거나 병합된 스트림을 사용하는 방법 등에 대해서 살펴보았다. 다음 포스팅에서는 옵저버블을 더 조율하고 이들을 함께 사용하는 방법에 대해서 알아보려고 한다.

 

참고자료

  • <RxJS 반응형 프로그래밍> 폴 대니얼스, 루이스 아텐시오 저