如何在 Node.Js 中从字符串创建流?

IT技术 javascript string node.js stream inputstream
2021-01-20 12:32:52

我正在使用一个库ya-csv,它需要一个文件或一个流作为输入,但我有一个字符串。

如何将该字符串转换为 Node 中的流?

6个回答

作为@substack纠正我#node,新的流API在节点V10使这更容易:

const Readable = require('stream').Readable;
const s = new Readable();
s._read = () => {}; // redundant? see update below
s.push('your text here');
s.push(null);

...之后,您可以自由地将其通过管道传输或以其他方式将其传递给您的目标消费者。

它不像恢复单行那样干净,但它确实避免了额外的依赖。

更新:到目前为止,在 v0.10.26 到 v9.2.1 中,如果您没有设置push直接从 REPL 提示调用将因not implemented异常_read而崩溃。它不会在函数或脚本内崩溃。如果不一致使您紧张,包括noop.)

@eye_mew 你首先需要 require('stream')
2021-03-18 12:32:52
@dopatramannull告诉流它已完成读取所有数据并关闭流
2021-03-19 12:32:52
来自文档(链接):“所有可读流实现都必须提供一种_read从底层资源获取数据方法。”
2021-04-04 12:32:52
看起来你不应该这样做。引用文档:“该readable.push()方法仅供可读实现者调用,并且只能从readable._read()方法内部调用。”
2021-04-06 12:32:52
为什么要推null入流的缓冲区?
2021-04-10 12:32:52

不要使用 Jo Liss 的简历答案。它在大多数情况下都可以工作,但在我的情况下,它让我失去了 4 或 5 个小时的错误查找时间。不需要第三方module来执行此操作。

新答案

var Readable = require('stream').Readable

var s = new Readable()
s.push('beep')    // the string you want
s.push(null)      // indicates end-of-file basically - the end of the stream

这应该是一个完全兼容的可读流。有关如何正确使用流的更多信息,请参见此处

旧答案:只需使用本机 PassThrough 流:

var stream = require("stream")
var a = new stream.PassThrough()
a.write("your string")
a.end()

a.pipe(process.stdout) // piping will work as normal
/*stream.on('data', function(x) {
   // using the 'data' event works too
   console.log('data '+x)
})*/
/*setTimeout(function() {
   // you can even pipe after the scheduler has had time to do other things
   a.pipe(process.stdout) 
},100)*/

a.on('end', function() {
    console.log('ended') // the end event will be called properly
})

请注意,不会发出 'close' 事件(流接口不需要)。

@Finn 如果没有任何参数,您就不需要 javascript 中的括号
2021-03-24 12:32:52
不要在 2018 年使用“var”!但是常量
2021-04-02 12:32:52

从节点 10.17 开始,stream.Readable 有一种from方法可以轻松地从任何可迭代对象(包括数组文字)创建流:

const { Readable } = require("stream")

const readable = Readable.from(["input string"])

readable.on("data", (chunk) => {
  console.log(chunk) // will be called once with `"input string"`
})

请注意,至少在 10.17 和 12.3 之间,字符串本身是可迭代的,因此Readable.from("input string")可以工作,但每个字符发出一个事件。Readable.from(["input string"])将为数组中的每一项(在本例中为一项)发出一个事件。

另请注意,在以后的节点中(可能是 12.3,因为文档说当时函数已更改),不再需要将字符串包装在数组中。

https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options

根据stream.Readable.from出于性能原因调用 Readable.from(string) 或 Readable.from(buffer) 不会迭代字符串或缓冲区以匹配其他流语义。
2021-03-29 12:32:52
我的错。该函数是在 10.7 中添加的,并按照我最初描述的方式运行。从那时起,字符串不再需要包含在数组中(从 12.3 开始,它不再单独迭代每个字符)。
2021-03-30 12:32:52

只需创建streammodule的新实例并根据您的需要对其进行自定义:

var Stream = require('stream');
var stream = new Stream();

stream.pipe = function(dest) {
  dest.write('your string');
  return dest;
};

stream.pipe(process.stdout); // in this case the terminal, change to ya-csv

或者

var Stream = require('stream');
var stream = new Stream();

stream.on('data', function(data) {
  process.stdout.write(data); // change process.stdout to ya-csv
});

stream.emit('data', 'this is my string');
此代码打破了流约定。pipe()应该至少返回目标流。
2021-03-23 12:32:52
如果您使用此代码,则不会调用结束事件。这不是创建可以普遍使用的流的好方法。
2021-03-30 12:32:52

编辑: 加思的答案可能更好。

我的旧答案文本保留在下面。


将一个字符串转换成流,你可以使用一个暂停通过流:

through().pause().queue('your string').end()

例子:

var through = require('through')

// Create a paused stream and buffer some data into it:
var stream = through().pause().queue('your string').end()

// Pass stream around:
callback(null, stream)

// Now that a consumer has attached, remember to resume the stream:
stream.resume()
Resumer 很棒,但是如果您希望可以将流传递给未知的消费者,“在 nextTick 上自动恢复流”可能会产生惊喜!如果元数据的数据库保存成功,我有一些代码将内容流通过管道传输到文件。那是一个潜伏的bug,它恰好在db write 立即返回成功时成功!后来我将事情重构到一个异步块中,然后轰隆一声,流永远不可读。教训:如果您不知道谁将使用您的流,请坚持使用 through().pause().queue('string').end() 技术。
2021-03-18 12:32:52
我花了大约 5 个小时来调试我的代码,因为我使用了这个答案的恢复部分。如果您愿意,那就太好了..将其删除
2021-03-24 12:32:52
@substack 恢复者的建议对我来说非常有效。谢谢!
2021-04-04 12:32:52
我无法让 zeMirco 的解决方案适用于我的用例,但resumer效果很好。谢谢!
2021-04-11 12:32:52