짜집기한 글입니다.
TL;DR
- Stream은 데이터, 패킷 ,비트 등의 일련 연속성을 갖는 흐름을 의미하는데-음성,영상,데이터 등의 작은 조각들을 하나의 줄기를 이루어 전송하는 데이터열이다.
- 스트림은 데이터를 청크 단위로 쪼개 처리해야 할 때 적절하게 사용할 수 있다.
- Node Stream은 결국 EventEmitter의 인스턴스이다. 따라서 이벤트 핸들러를 작성할 수 있다.
Stream 이란?
- Stream은 추상적인 개념으로 데이터를 통일된 방식으로 다루기 위한 가상의 개념이다.
- 큰 데이터를 다룰 때나, 외부 소스로부터 데이터를 가져올 때 chunk해서 가져온다.
종류
Writable: 데이터를 쓸 수 있는 스트림
- 쓸 수 있는 스트림(ex. fs.createWriteStream())
- fs.createWriteStream
- process.stdout
- 클라이언트 입장의 HTTP 요청
- 서버 입장의 HTTP 응답
http.createServer((request, reponse) => {
response.wrtie(JSON.stringfy(reponse))
})_
Readable: 데이터를 읽을 수 있는 스트림
- 읽을 수 있는 스트림(ex. fs.createReadStream())
- fs.createReadStream 로부터 나오는 Stream 객체
- 서버 입장의 HTTP 요청 - 클라이언트의 요청을 읽어낼 수 있는 Stream
- 클라이언트 입장의 HTTP 응답 - 서버로부터 돌아온 응답을 읽어낼 수 있는 Stream
http.createServer((request, response) => { request.on('data', () => {}) })
Duplex: Readable & Writable 스트림
- 읽고 쓸 수 있는 스트림(ex. net.Socket)
- TCP sockets - 네트워크를 통해 주고받음
- zlib streams - 압축 알고리즘 적용된 스트림
- crypto streams - 암호화 알고리즘 모듈
Transform: 데이터를 쓰고 읽을 때 데이터를 수정하거나 변환 할 수 있는 Duplex 스트림
- 데이터를 쓰고 읽을 때 데이터를 수정하거나 변형할 수 있는 Duplex 스트림(ex.zlib.createDeflate())
- 데이터가 읽기 가능 스트림을 통과하면서 변환되는 스트림입니다. 예를 들어, 압축 또는 암호화 작업이 이에 해당합니다.
- zlib streams
- crypto streams
개선 예시
1. 간단한 개선 예시
개선 전
아래 서버를 돌리면, 파일을 한번에 읽어 메모리에 적재하기 때문에 많은 메모리를 사용한다.
const fs = require("fs")
const server = require("http").createServer()
server.on("request", (req, res) => {
fs.readFile("./big.file", (err, data) => {
if (err) throw err
res.end(data)
})
})
server.listen(8000)
개선 후
- 파일을 읽어 응답해주는 건 같지만, 스트림을 제공받아 청크 단위로 순차적 응답하기 때문에 메모리 사용량이 매우 적어진다. -> 파일을 메모리에 버퍼로 잡지 않고 스트림 처리하여 메모리 사용량을 개선
const fs = require("fs") const server = require("http").createServer() server.on("request", (req, res) => { const src = fs.createReadStream("./big.file") src.pipe(res) }) server.listen(8000)
2. 실무 개선 예시
개선 전
- 한 달에 한번 인보이스를 발행하는 잡을 구현했는데 간혹 인스턴스가 죽는 증상이 발견됨.
- 1차 원인은 어떠한 연유로 ping에 응답하지 못했고, livenessProbe 실패, 컨테이너 재시작되는 것이었음
- 테스트해보니 typeorm을 통해 대량의 데이터를 가져와 생성, 수정 시도할 경우 처리량이 따라가지 못햇음 (약 2600개)
// 2600개
const invoices = this.invoiceRepository.find({})
...
invoices.forEach((invoice) => {
invoice.issue();
});
this.invoiceRepository.save(inoivces);
개선 후
- 잡 처리 시간이 늘어나더라도 stream을 얻고, 청크 단위로 처리하도록 개선했음
// NOTE: 1개식 청크해서 처리
await pipelime(
this.invoiceRepository.stream(),
new Writable(
objectMode: true,
async write(chunk, encoding, callback) {
try {
invoice.issue()
thiz.invoiceRepository.save(invoice);
}
}
))
참고
댓글