본문 바로가기
Backend/Node

[Node] Stream 이란 (실제 개선 예시)

by BenKangKang 2024. 2. 21.

짜집기한 글입니다.

TL;DR

- Stream은 데이터, 패킷 ,비트 등의 일련 연속성을 갖는 흐름을 의미하는데-음성,영상,데이터 등의 작은 조각들을 하나의 줄기를 이루어 전송하는 데이터열이다.

- 스트림은 데이터를 청크 단위로 쪼개 처리해야 할 때 적절하게 사용할 수 있다.

- Node Stream은 결국 EventEmitter의 인스턴스이다. 따라서 이벤트 핸들러를 작성할 수 있다.

Stream 이란?

  • Stream은 추상적인 개념으로 데이터를 통일된 방식으로 다루기 위한 가상의 개념이다.
  • 큰 데이터를 다룰 때나, 외부 소스로부터 데이터를 가져올 때 chunk해서 가져온다.

종류

Writable: 데이터를 쓸 수 있는 스트림

  • 쓸 수 있는 스트림(ex. fs.createWriteStream())
  • fs.createWriteStream
  • process.stdout
  • 클라이언트 입장의 HTTP 요청
  • 서버 입장의 HTTP 응답
http.createServer((request, reponse) => {
    response.wrtie(JSON.stringfy(reponse))
})_

Readable: 데이터를 읽을 수 있는 스트림

  • 읽을 수 있는 스트림(ex. fs.createReadStream())
  • fs.createReadStream 로부터 나오는 Stream 객체
  • 서버 입장의 HTTP 요청 - 클라이언트의 요청을 읽어낼 수 있는 Stream
  • 클라이언트 입장의 HTTP 응답 - 서버로부터 돌아온 응답을 읽어낼 수 있는 Stream
    http.createServer((request, response) => {
      request.on('data', () => {})
    })

Duplex: Readable & Writable 스트림

  • 읽고 쓸 수 있는 스트림(ex. net.Socket)
  • TCP sockets - 네트워크를 통해 주고받음
  • zlib streams - 압축 알고리즘 적용된 스트림
  • crypto streams - 암호화 알고리즘 모듈

Transform: 데이터를 쓰고 읽을 때 데이터를 수정하거나 변환 할 수 있는 Duplex 스트림

  • 데이터를 쓰고 읽을 때 데이터를 수정하거나 변형할 수 있는 Duplex 스트림(ex.zlib.createDeflate())
  • 데이터가 읽기 가능 스트림을 통과하면서 변환되는 스트림입니다. 예를 들어, 압축 또는 암호화 작업이 이에 해당합니다.
  • zlib streams
  • crypto streams

개선 예시

1. 간단한 개선 예시

개선 전

아래 서버를 돌리면, 파일을 한번에 읽어 메모리에 적재하기 때문에 많은 메모리를 사용한다.

const fs = require("fs")
const server = require("http").createServer()

server.on("request", (req, res) => {
  fs.readFile("./big.file", (err, data) => {
    if (err) throw err

    res.end(data)
  })
})

server.listen(8000)

개선 후

  • 파일을 읽어 응답해주는 건 같지만, 스트림을 제공받아 청크 단위로 순차적 응답하기 때문에 메모리 사용량이 매우 적어진다. -> 파일을 메모리에 버퍼로 잡지 않고 스트림 처리하여 메모리 사용량을 개선
    const fs = require("fs")
    const server = require("http").createServer()
    server.on("request", (req, res) => {
    const src = fs.createReadStream("./big.file")
    src.pipe(res)
    })
    
    server.listen(8000)

 

2. 실무 개선 예시
개선 전

- 한 달에 한번 인보이스를 발행하는 잡을 구현했는데 간혹 인스턴스가 죽는 증상이 발견됨.
- 1차 원인은 어떠한 연유로 ping에 응답하지 못했고, livenessProbe 실패, 컨테이너 재시작되는 것이었음
- 테스트해보니 typeorm을 통해 대량의 데이터를 가져와 생성, 수정 시도할 경우 처리량이 따라가지 못햇음 (약 2600개)

 

// 2600개
const invoices = this.invoiceRepository.find({})
...
invoices.forEach((invoice) => {
invoice.issue();
});

this.invoiceRepository.save(inoivces);

 

 

개선 후

- 잡 처리 시간이 늘어나더라도 stream을 얻고, 청크 단위로 처리하도록 개선했음

// NOTE: 1개식 청크해서 처리
await pipelime(
	this.invoiceRepository.stream(),
	new Writable(
		objectMode: true,
		async write(chunk, encoding, callback) {
			try {
				invoice.issue()
				thiz.invoiceRepository.save(invoice);
		}
	}
))

 

참고

댓글