본문 바로가기

Prog. Langs & Tools/RxJS

[RxJS] Ch03. 핵심 연산자

 

이전 RxJS 포스팅에서는 다음과 같은 주제들에 대해 다루어 보았다.

이번 포스팅에서는 위에서 살펴본 개념들을 바탕으로 옵저버블을 만들어 보고, 모든 데이터 소스에 대한 시간에 따른 데이터 입출력을 옵저버블 컨텍스트로 관리하는 법을 배우려고 한다.

 

이런 상황을 가정해 보자

  • 사용자가 서버에 장기 실행 ajax 요청을 보내고, 호출 후 다른 버튼을 눌러 다른 페이지로 이동하였다.
  • 특정 데이터를 사용할 수 있게 데이터를 풀링했는데 예외가 발생하여 이제 데이터를 사용할 수 없게 되었다.

우리가 사용자에게 무한히 시스템 리소스를 사용할 수 있게 하지는 못하고 따라서 이러한 경우 문제가 발생할 수 있다. 이를 해결하기 위해 RxJS를 사용할 수가 있다. RxJS에 존재하는 스트림은 그 결정적 수명이 거의 전적으로 프로그래머에 의해 정의되는 객체이다.

일반적으로 자바스크립트는 메모리 관리에 대한 지원이 거의 없으며 C/C++처럼 메모리의 수명을 제어하는 정교한 방법이 거의 없다. 즉 메모리에서 객체 할당 및 해제를 완벽하게 제어할 수는 없는 것이다. 이는 자바스크립트가 배우기도 쉽고 사용하기도 쉬운 언어로 만들어 주었지만, 반면 어플리케이션 안에서 무슨 일이 일어나는지에 대해서는 알기가 어렵게 만들었다.

자바스크립트에서는 객체의 수명을 가비지 컬렉터(Garbage Collector)가 제어한다. 가비지 컬렉터는 어플리케이션을 실행하는 런타임 엔진에서 작동하는 프로세스로, 주기적으로 실행되어 사용되지 않는 참조와 관련된 메모리를 해제한다. 가비지 컬렉터는 다양한 객체 사이에 유지되는 함수를 추적하여 그 작업을 수행하는데, 이를 참조 카운팅(reference counting)이라고 한다. 가비지 컬렉터가 더 이상 참조되지 않는 객체를 감지하면 해당 객체는 메모리 해제 대상이 되고 사용되지 않는 참조를 찾지 못하면 메모리 누수가 발생한다.

 

RxJS는 스트림의 수명이 옵저버블을 구독할 때 시작한다. 이렇게 필요할 때만 데이터를 만드는 객체를 지연 데이터 소스(lazy data source)라고 하며, 자바스크립트의 엄격한 즉시 평가(eager evaluation)와는 대조된다. 지연 할당이 즉시 할당에 비해서 가지는 이점은 메모리 관리를 효율적으로 가져갈 수 있다는 점이다. 즉시 할당을 하게 되면 메모리를 초과 할당하는 경우가 빈번하게 발생할 수 있는데, 지연 할당을 하게 되면 그러한 리스크가 줄어들게 된다.

예를 들어 1부터 시작하여 무한한 수의 자연수를 가진 배열이라고 생각하고, slice() 메서드를 통해 앞에 5개만 가져오는 함수를 가정해 보자. 즉시 평가를 사용하게 되면 먼저 1부터 무한대까지의 배열을 생성하고 그 다음 5개의 요소를 취한다. 배열을 생성하는 과정에서 메모리가 부족하게 되고 결국 브라우저는 종료하게 된다. 반면 지연 평가는 처음 다섯 개의 요소만 생성해서 취하면 된다. 표현식의 전체 계산은 표현식의 결과가 필요할 때 까지 기다리는 것이다.

 

지연 할당: 옵저버블 구독

RxJS는 두 가지 방법으로 데이터의 조기 할당을 방지한다. 첫 번째 방법이 지연 구독 메커니즘이고, 두 번째는 옵저버블이 데이터를 메모리에 정적으로 보유하지 않고 이벤트가 방출되는 즉시 데이터를 전달하는 방법이다. 먼저 지연 구독부터 살펴보도록 하자. 지연 구독은 옵저버블이 관심 있는 이벤트로 활성화 될 때까지 휴면 상태로 유지됨을 의미한다. 아래의 예제를 살펴보자.

const source$ = Rx.Observable.create(observer => {
  let i = 0;
  setInterval(() => {
    observer.next(i++); // 무언가가 멈추게 할 때까지 500ms마다 이벤트를 계속 방출
  }, 500);
});

옵저버블을 활성화하려면 먼저 옵저버가 subscribe()로 source$를 구독해야 한다. subscribe()를 호출하면 휴면 상태에 있는 옵저버블을 깨워 값을 생성하고 위의 예제의 경우 0.5초마다 1,2,3, ... 의 이벤트에 대한 할당을 시작한다. 지연 구독은 옵저버블이 기본적으로 데이터를 보유하지 않음을 의미한다. 일정 간격으로 생성된 이벤트는 처리된 후 해제된다. 이러한 방식은 이벤트가 해제됨으로 메모리 누수의 원인이 되는 무한한 메모리 증가를 걱정할 필요가 없다.

 

구독 해제: 명시적 취소

메모리를 할당하는 것 만큼 해제시키는 것도 중요하다. 자바스크립트의 가비지 컬렉터는 똑똑하게 메모리를 정리하지만, 해당 객체에 대한 참조가 사용되지 않거나 네이티브 이벤트 처리 코드에서 자주 발생하는 참조 주기가 없는 경우에만 작동한다. 따라서 객체를 초기화한 후 객체에 대한 참조를 제거하지 않으면 어플리케이션에서 해당 메모리를 복구할 수 없다. 이는 메모리 누수로 이어질 가능성이 있다.

RxJS에서는 이러한 문제를 해결하기 위해 구독 취소를 담당할 수 있는데 subscribe() 호출에서 반환된 Subscription 타입 객체를 통해 구독 관리를 할 수 있다. 이 객체에 소스 스트림을 해제하는 식으로 구현을 하면 가능하다. 옵저버블 사용을 끝내고 이벤트를 수신하지 않으려면 unsubscribe()를 호출하여 해당 이벤트를 해제하면 된다.

const mouseClicks = Rx.Observable.fromEvent(document, 'mouseup');
const subscription = mouseClicks.subscribe(someMouseClickObserver);

subscription.unsubscribe(); // 스트림을 분해하고 할당된 객체를 반환한다.

 

RxJS 옵저버블은 이벤트 스트림을 취소하고 해제하는 단순한 메커니즘을 제공한다. 하지만 이 단순성은 다른 JS API와 함께 사용하게 될 때 문제가 발생할 수 있다. 예를 들면 Promise를 감싸는 옵저버블을 취소하려고 할 때 옵저버블이 취소되어도 Promise는 중단되지 않고 결국 실행될 수가 있다. Promise는 객체가 실행되면 성공 또는 실패의 결과를 완수하도록 디자인 되었기 때문이다. 이처럼 단일 값(Promise)을 방출하도록 디자인된 API와 무한값(옵저버블)을 지원하는 API 간에 디자인 철학이 일치하지 않음을 명시하여야 한다.

 

앞선 포스팅에서도 여러차례 강조한 부분이지만 RxJS를 사용하면서 반응형으로 생각하는 연습을 계속 해야 한다. 반응형이 된다는 것은 미래의 어느 시점에 값이 전달될 때 프로그램이 무엇을 할지를 정의하는 것과 관련이 있다. 여기서 RxJS는 논리적 데이터 흐름을 표현하는 스트림을 만드는 데 필요한 연산자가 있어서 의미있게 사용할 수 있다. 연산자는 대상 옵저버블 객체를 변경하지 않고 체인을 계속 유지하는 새로운 옵저버블 객체를 반환한다. 이러한 연산자들은 이벤트가 데이터 소스를 떠난 후 소비자에게 도달하기 전에 이벤트를 검사, 변경, 생성, 지연하는 데 사용될 수 있다. 지금부터 RxJS에서 자주 사용되는 연산자들을 하나씩 살펴보도록 하자.

 

map: 옵저버블에 연산자 매핑하기

함수형 프로그래밍에서 map은 함수를 적용하여 옵저버블로 실행되는 데이터의 특징을 변경하기 때문에 변환(transformation) 연산 범주에 속한다. map :: x -> f(x) 라는 표기법을 사용하면 주어진 값 x에 대해 입력 x를 출력 f(x)와 연결할 수 있다.

const addSixPercent = x => x + (x * .06);
Rx.Observable.of(10.0, 20.0, 30.0, 40.0)
  .map(addSixPercent)  // 이 함수를 옵저버블 소스의 각 값에 적용합니다.
  .subscribe(console.log);  // -> 10.6, 21.2, 31.8, 42.4

옵저버블의 map()은 불변이다. 즉, 원본을 변환하지 않고 전달된 값을 변환한다는 의미이다. 그리고 RxJS 연산자는 모두 순수함수이므로 소스 옵저버블을 매핑하면 함수의 결과를 담은 새로운 옵저버블이 매핑된다.

 

filter: 원하지 않는 이벤트 필터링하기

필터링은 스트림에서 원하지 않는 요소를 제거하는 프로세스이다. 요소를 제거하는 기준은 셀렉터(selector) 함수로 전달된다. 숫자 입력으로 입력 상자에 제한을 설정하는 예제를 살펴보도록 하자.

const isNumericalKeyCode = code => code >= 48 && code <= 57;
const input = document.querySelector('#input');

Rx.Observable.fromEvent(input, 'keyup')
  .pluck('keycode') // 옵저버블을 통과하는 객체에서 이 속성을 추출
  .filter(isNumericalKeyCode) // 숫자 범위의 키만 허용
  .subscribe(code => console.log(String.fromCharCode(code)));

filter 연산자를 통해 원치 않는 마우스 클릭이나 터치 이벤트 등을 무시하는 데에도 사용할 수 있다. 이 연산자를 통해 스트림에서 값을 제거하는 역할을 한다.

 

reduce: 결과 누적하기

컬렉션의 각 항목을 단편적이 아닌 전체적으로 보고 싶을 때가 있다. 이럴 때는 리덕션(reduction) 연산을 통해 연산 결과가 컬렉션이 아닌 단일 값으로 출력한다. 이러한 누적(accumulator) 함수는 모든 요소에서 호출되며 현재 누계와 새 값이 매개 변수로 제공된다. 아래 예제를 한 번 살펴보자.

const add = (x, y) => x + y;

Rx.Observable.from([
  {
    date: '2020-11-21',
    amount: 1000
  },
  {
    date: '2020-11-22',
    amount: 2000
  },
  {
    date: '2020-11-23',
    amount: 3000  
  }
])
  .pluck('amount') // amount 속성 추출
  .reduce(add, 0) // add 함수로 amount 값을 계산
  .subscribe(console.log); // 6000

reduce는 첫 번째 시드 값으로 초기화 된(위의 예제는 0이다) 옵저버블 시퀀스에 누적 함수를 적용한다. reduce()는 단일 값을 반환하는데 부분 누적도 필요해서 이 경우는 변형인 scan()을 사용한다. scan은 리덕션 프로세스가 진행되는 동안 각 중간 결과를 반환한다.

 

이렇게 RxJS의 다양한 연산자를 알아보았는데 한 가지 기억해야 할 것은 이러한 메서드 자체로는 스트림에서 작업이 실행되지 않는다. 오직 구독자만이 이 작업을 할 수 있다. 연산자가 옵저버블에서 호출되면 미래 값으로 옵저버블을 구성한다. 배열과 옵저버블의 차이점을 꼽자면 배열이 현재 일어나는 일을 나타내는 반면, 옵저버블은 미래의 작업을 나타낸다는 점이 될 수가 있다.

함수형 프로그래밍은 지연 함수 체인을 만들 수 있어야 한다. 따라서 위의 옵저버블 연산자들과 집계(aggregate) 함수를 조합할 수 있어야 한다. 기억해야 할 점은 옵저버블 시퀀스는 독립적이어야 한다는 점이다. 참고로 Lodash.js나 Underscore.js 같은 함수형 라이브러리는 이를 지원한다.

독립적인 옵저버블 파이프라인은 사이드 이펙트가 없어야 하고 옵저버블의 컨텍스트에 벗어나는 참조를 허용하지 않는다. 이는 비즈니스 로직이 순수하면 전체 프로그램도 순수하고 안정적이라는 점을 명심해야 한다. 옵저버블이 외부의 컨텍스트를 참조하게 되면 옵저버블은 더이상 무상태(stateless)가 아니며 상태와 옵저버블이 상호 의존하게 된다. 이러한 오류를 방지하기 위해서는 옵저버블 연산자에 전달되는 콜백 함수는 작고 지역적이어야 하고 옵저버블 내부는 나머지 어플리케이션과 분리되어 완전한 무상태를 유지하여야 한다.

아래 예제는 filter()와 반대의 기능을 가지는 exclude() 연산자를 작성한 코드이다. 코드에서 볼 수 있듯이 모든 연산자는 완전히 새로운 옵저버블을 생성하고 자체 방식으로 데이터를 변환하여 체인의 다음 구독자에게 위임한다.

function exclude(predicate) {
  return Rx.Observable.create(subscriber => { // 새로운 결과와 함께 반환할 새로운 옵저버블 컨텍스트를 생성
    let source = this; // 람다 함수 안에 있으므로 this는 외부 범위를 가리킴
    return source.subscribe(value => {
      try { // 사용자가 제공한 콜백에서 오류를 잡아냅니다.
        if(!predicate(value)) subscriber.next(value); // 체인의 새 연산자에서 다음 값을 구독자에게 전달
      } catch(e) {
        subscriber.error(e);
      }
    },
    e => subscriber.error(e),
    () => subscriber.complete());
  });
}

Rx.Obervable.prototype.exclude = exclude; // 옵저버블 프로토타입을 확장하여 연산자를 추가

옵저버블의 대표적인 집계 연산자는 다음과 같다.

  • take(count): 필터링 연산자, 옵저버블 시퀀스에서 지정된 양(count)의 연속적인 요소를 반환한다. 나중에 다른 무한 스트림에서 유한한 이벤트 집합을 추출할 때 유용
  • first, last: take 함수의 기능 추가 버전, 옵저버블 스트림의 첫 번째 또는 마지막 요소를 각각 반환
  • min, max: 필터링 연산자, 각각 유한 스트림의 최솟값, 최댓값을 반환하는 숫자를 방출하는 옵저버블에서 작업
  • do: 유틸리티 연산자, 일부 타입의 부가 작용을 수행하기 위해 옵저버블 시퀀스의 각 요소에 대한 동작을 호출

집계 연산자를 사용한 예제는 아래와 같다.

Rx.Observable.from(candidates)
  .pluck('experience')
  .take(2) // 처음 두 요소만 사용
  .do(val => console.log(val)) // 출력 루틴을 수행하고 옵저버블 시퀀스를 따라 전달
  .subscribe();

 

오늘 다루었던 RxJS 핵심 연산자 관련 내용을 요약하면 다음과 같다.

  • 스트림은 JS 네이티브 이벤트 시스템보다 개선된 구독 취소, 구독 해제를 위한 자체 메커니즘을 제공한다.
  • Observable 데이터 타입은 배열과 유사한 모델을 사용하여 연산자를 순차적으로 적용할 수 있는 플루언트 함수 체인을 가능하게 한다.
  • JS의 네이티브 Promise와 달리 옵저버블에는 취소 및 해제를 위한 내장 기능이 있다.
  • 옵저버블 시퀀스의 연산자에 주입되는 함수는 어플리케이션의 비즈니스 로직을 포함하며 부가 작용이 없어야 한다.
  • 옵저버블은 무제한 연결할 수 있는 연산자를 자체적으로 포함한다.
  • 연산자는 서로 독립적으로 작동하며 이전 연산자의 결과로만 작업한다.
  • 사용된 연산자의 순서와 타입에 따라 옵저버블의 성능 특성과 동작이 결정된다.

 

참고자료

  • <RxJS 반응형 프로그래밍> 폴 대니얼스, 루이스 아텐시오 저