- 期望的行为
- 实际行为
- 我试过的方法
- 复制步骤
- 研究
期望的行为
将从多个 api 请求接收到的多个可读流通过管道传输到单个可写流。
api 响应来自 ibm-watson 的 textToSpeech.synthesize()方法。
需要多个请求的原因是服务对文本输入有 5KB 限制。
因此,例如 18KB 的字符串需要四个请求才能完成。
实际行为
可写流文件不完整,乱码。
应用程序似乎“挂起”。
当我尝试在音频播放器中打开不完整的 .mp3 文件时,它说它已损坏。
打开和关闭文件的过程似乎会增加文件的大小——就像打开文件会以某种方式提示更多数据流入一样。
如果输入越大,不良行为就越明显,例如 4000 字节或更少的四个字符串。
我的尝试
我已经尝试了几种方法来使用 npm 包 combined-stream 将可读流通过管道传输到单个可写流或多个可写流, combined-stream2 , multistream和 archiver它们都会导致文件不完整。我的最后一次尝试没有使用任何包,并显示在下面的 Steps To Reproduce 部分中。
因此,我质疑我的应用程序逻辑的每个部分:
01. What is the response type of a watson text to speech api request?
text to speech docs ,说api响应类型是:
Response type: NodeJS.ReadableStream|FileObject|Buffer
我很困惑响应类型是三种可能的事情之一。
在我所有的尝试中,我一直假设它是一个可读流。
02. Can I make multiple api requests in a map function?
03. Can I wrap each request within a
promise()and resolve theresponse?04. Can I assign the resulting array to a
promisesvariable?05. Can I declare
var audio_files = await Promise.all(promises)?06. After this declaration, are all responses 'finished'?
07. How do I correctly pipe each response to a writable stream?
08. How do I detect when all pipes have finished, so I can send file back to client?
对于问题 2 - 6,我假设答案是"is"。
我认为我的失败与问题 7 和 8 有关。
复制步骤
您可以使用由四个随机生成的文本字符串组成的数组来测试此代码,这些字符串的字节大小分别为 3975、3863、3974 和3629 字节 - here is a pastebin of that array .
// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);
// route handler middleware
const api_tts_get = async (req, res) => {
var query_parameters = req.query;
var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV
var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root
// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {
return new Promise((resolve, reject) => {
// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});
// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};
// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});
});
});
try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;
var write_stream = fs.createWriteStream(`${relative_path}.mp3`);
audio_files.forEach((audio, index) => {
// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}
});
write_stream.on('finish', function() {
// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});
});
} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}
}
textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});
据我所知,这似乎适用于单个请求,但不适用于多个请求。
研究
关于可读和可写流、可读流模式(流动和暂停)、'data'、'end'、'drain' 和 'finish' 事件、pipe()、fs.createReadStream() 和 fs。 createWriteStream()
Almost all Node.js applications, no matter how simple, use streams in some manner...
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
https://nodejs.org/api/stream.html#stream_api_for_stream_consumers
Readable streams have two main modes that affect the way we can consume them...they can be either in the
pausedmode or in theflowingmode. All readable streams start in the paused mode by default but they can be easily switched toflowingand back topausedwhen needed...just adding adataevent handler switches a paused stream intoflowingmode and removing thedataevent handler switches the stream back topausedmode.
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
Here’s a list of the important events and functions that can be used with readable and writable streams
The most important events on a readable stream are:
The
dataevent, which is emitted whenever the stream passes a chunk of data to the consumer Theendevent, which is emitted when there is no more data to be consumed from the stream.The most important events on a writable stream are:
The
drainevent, which is a signal that the writable stream can receive more data. Thefinishevent, which is emitted when all data has been flushed to the underlying system.
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
.pipe()takes care of listening for 'data' and 'end' events from thefs.createReadStream().
https://github.com/substack/stream-handbook#why-you-should-use-streams
.pipe()is just a function that takes a readable source stream src and hooks the output to a destination writable streamdst
https://github.com/substack/stream-handbook#pipe
The return value of the
pipe()method is the destination stream
https://flaviocopes.com/nodejs-streams/#pipe
By default, stream.end() is called on the destination
Writablestream when the sourceReadablestream emits'end', so that the destination is no longer writable. To disable this default behavior, theendoption can be passed asfalse, causing the destination stream to remain open:
https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options
The
'finish'event is emitted after thestream.end()method has been called, and all data has been flushed to the underlying system.
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});
https://nodejs.org/api/stream.html#stream_event_finish
If you're trying to read multiple files and pipe them to a writable stream, you have to pipe each one to the writable stream and and pass
end: falsewhen doing it, because by default, a readable stream ends the writable stream when there's no more data to be read. Here's an example:
var ws = fs.createWriteStream('output.pdf');
fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
https://stackoverflow.com/a/30916248
You want to add the second read into an eventlistener for the first read to finish...
var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
b.pipe(c)
}
https://stackoverflow.com/a/28033554
相关的谷歌搜索:
how to pipe multiple readable streams to a single writable stream? nodejs
涉及相同或相似主题的问题,没有权威答案(或可能“过时”):
How to pipe multiple ReadableStreams to a single WriteStream?
Piping to same Writable stream twice via different Readable stream
最佳答案
这里要解决的核心问题是异步性。您几乎拥有它:您发布的代码的问题是您将所有源流并行且无序地输送到目标流中。这意味着 data block 将从不同的音频流中随机流出 - 即使您的 end 事件也会在没有 end 的情况下超越 pipe > 过早关闭目标流,这可以解释为什么重新打开后它会增加。
您想要的是按顺序传递它们 - 您甚至在引用时发布了解决方案
You want to add the second read into an eventlistener for the first read to finish...
或作为代码:
a.pipe(c, { end:false });
a.on('end', function() {
b.pipe(c);
}
这会将源流按顺序通过管道传输到目标流中。
使用您的代码,这意味着将 audio_files.forEach 循环替换为:
await Bluebird.mapSeries(audio_files, async (audio, index) => {
const isLastIndex = index == audio_files_length - 1;
audio.pipe(write_stream, { end: isLastIndex });
return new Promise(resolve => audio.on('end', resolve));
});
注意 bluebird.js mapSeries 的用法在这里。
关于您的代码的进一步建议:
const & let 而不是 var 并考虑使用 camelCase进一步阅读,组合原生 Node 流的限制:https://github.com/nodejs/node/issues/93
关于node.js - 如何将多个可读流(从多个 api 请求)传输到单个可写流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57157632/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何