* 이 글은 Mario Casciaro, Luciano Mammino가 저서한 <Node.js 디자인 패턴 바이블> 서적을 참고한 게시글입니다.
서적 정보 : http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9788931464283
Node.js 디자인 패턴 바이블 - 교보문고
검증된 패턴과 기술을 이용한 수준 높은 Node.js | 이 책은 이미 Node.js를 처음 접한 후 이제 생산성, 디자인 품질 및 확장성 측면에서 최대한 활용하고자 하는 개발자를 대상으로 합니다. 이 책은
www.kyobobook.co.kr
스트림 코어 모듈에서 사용하는 4가지 기본 추상 클래스 (Readable, Writable, Duplex, Transform) 중 Readable에 대해 알아보자.
0. Readable Stream
Readable 스트림은 데이터 소스 자체를 스트림 형태로 읽을 수 있는 객체이다.
우리가 잘 알고 있는 요놈도 스트림인데,
우리가 콘솔에서 입력하는 값을 Readable 스트림의 형태로 치환하여 프로그램(consumer)에게 넘겨준다.
마찬가지로 process.stdout은 Writable 스트림이며, 따라서 이렇게 짧은 코드로 받아쓰기 프로그램이 가능하다.
process.stdin.pipe(process.stdout);
pipe는 이전 글에서도 잠깐 언급했지만(추후 제대로 포스팅해볼 예정), stream 사이를 연결해주는 파이프라인으로 생각하면 된다.
1. Flowing 모드
Readable 스트림에서 데이터를 가져오는 과정은 크게 2가지로 나뉘는데, 먼저 Flowing 모드부터 살펴보자.
Flowing 모드는 Readable 스트림에서 던져주는 "data" 이벤트를 감지하고, 스트림에서 소유권을 포기한 chunk 데이터를 가져온다.
여기서 핵심은 소유권을 포기했다는 부분인데, 우선 간단히 코드를 보자.
// Flowing Mode
process.stdin
.on("data", (data) => {
console.log(data.toString());
})
.on("end", () => console.log("End of Stream"));
"data" 이벤트 리스너를 등록하고, data(디폴트는 Buffer 객체)가 넘어올 때마다 이를 출력해주고 있다.
문제는, 이 "data" 이벤트의 경우 스트림에서는 더이상 넘겨준 data를 가지고 있지 않고 오직 consumer가 직접 처리를 해야한다.
뭐 어차피 받아와서 쓸건데 무슨 문제가 있겠느냐만,
만약 consumer 역할(여기서 콜백)을 하는 부분의 처리 속도가 데이터가 들어오는 처리 속도를 감당할 수 없다면, 스트림이 포기한 데이터들은 더이상 사용할 수 없게 되는 것이다.
TCP 통신에서도 마찬가지일 수 있다. 서버 쪽에서 클라이언트가 넘겨주는 스트림을 Flowing 모드로 소비하고 있다면, 서버의 처리 속도에 대한 부담이 생길 것이다.
통상적으로 흔히 알려진 병목 현상이라고 하고, Back Pressure 메커니즘을 통해 해결하고 있다. Writable에서 좀더 다뤄볼 예정이다.
이를 해결하기 위해 Node.js에서는 별도의 내부 처리를 구현해서 제공했었지만, 지금은 Flowing 모드는 Stream 1 버전이고,
non-Flowing 모드를 Stream 2 버전으로 취하고 있다.
2. non-Flowing (pause)모드
non-Flowing에서는 더이상 "data" 이벤트를 감지하지 않는다.
대신 "readable" 이벤트를 감지하는데, 이는 "읽을 데이터가 존재" 혹은 "더이상 읽을 데이터가 존재하지 않음"을 나타내주는 이벤트이다.
우선 코드부터 보자.
process.stdin
.on("readable", () => {
let chunk;
while ((chunk = process.stdin.read())) {
console.log(chunk.toString());
}
})
.on("end", () => console.log("Finish"));
이는 위에서 보인 Flowing 모드와 동일하게 동작하는 코드이다.
눈에 띄게 다른 점은, "readable" 이벤트에서 더이상 데이터를 던져주지 않고, 대신 consumer가 직접 read() 함수를 호출하여 데이터를 가져온다는 점이다.
달리 말해, 데이터를 소비할 시점을 우리가 직접 정할 수 있다는 의미이다.
소비하는 속도가 느려지면, 스트림에서는 들어오는 데이터를 계속 버퍼에 쌓다가, 버퍼가 다 차면 OS에서 들어오는 데이터 패킷 속도를 조절하며 나아가 데이터를 전송하는 클라이언트 쪽에서도 전송 속도를 조절하여 패킷 드랍을 최소화하게 된다.
또한 while문에서 매번 read()의 결과가 null인지 체크하고 있다. 즉 더이상 읽을 데이터가 없다면 null을 반환해준다.
심지어 read()함수의 원형은 read([size = DEFAULT_BUFFER_SIZE])라서 우리가 원하는 양만큼의 사이즈만 가져올 수도 있다.
* "data" 이벤트 리스너와 "readable" 이벤트 리스너를 동시에 등록하면 좋지 않은 결과를 초래할 수 있다. read()함수가 호출되면 스트림에서는 "data" 이벤트를 emit하기 때문인데, 자세한 설명은 Node.js 공식 문서에 있다.
https://nodejs.org/dist/latest-v12.x/docs/api/stream.html#stream_class_stream_readable
Stream | Node.js v12.22.5 Documentation
Source Code: lib/stream.js A stream is an abstract interface for working with streaming data in Node.js. The stream module provides an API for implementing the stream interface. There are many stream objects provided by Node.js. For instance, a request to
nodejs.org
3. Readable 스트림 구현
간단하게 밑의 메소드를 가진 Readable 스트림을 구현해보자.
- _read(size) : 호출될 때마다 랜덤 문자열을 생성하다가, 확률적으로 스트림 버퍼의 끝에 null을 붙인다.
import { Readable } from "stream";
import * as crypto from "crypto";
export class MyReadable extends Readable {
constructor(options) {
super(options);
// 총 길이
this.emittedBytes = 0;
}
_read(size) {
const chunk = crypto.randomBytes(size);
this.push(chunk, "utf8");
this.emittedBytes += chunk.length;
// 0 ~ 9 랜덤 숫자중 7 이상이 나오면 push(null)
if (crypto.randomInt(0, 10) >= 7) {
this.push(null);
}
}
}
Readable.push() 함수에 (string, buffer) 등 데이터를 넣을 수 있다.
이를 Flowing, non-Flowing 모드로 각각 소비하는 코드를 작성해본다면,
const myReadable = new MyReadable();
// Flowing 모드
myReadable.on("data", (data) => {
console.log(data.toString());
});
// non-Flowing 모드
myReadable.on("readable", () => {
let chunk;
while ((chunk = myReadable.read())) {
console.log(chunk.toString());
}
});
이 정도가 될 것이다.
다음엔 Writable 스트림에 대해 정리해보도록 하겠다.
'Back-end > Node.js' 카테고리의 다른 글
스트림 코딩 (Stream Coding) - (4) Transform (0) | 2021.08.23 |
---|---|
스트림 코딩 (Stream Coding) - (3) Writable (0) | 2021.08.22 |
스트림 코딩 (Stream Coding) - (1) 버퍼(buffer)와 스트림(stream) (0) | 2021.08.17 |
콜백(Callback)의 비동기 Control flow 패턴 (2) - 순차 반복(sequential iteration) (0) | 2021.07.31 |
콜백(Callback)의 비동기 Control flow 패턴 (1) - 콜백 지옥(Callback Hell) (0) | 2021.07.31 |