.stream()
从数据库流式传输记录,以便一次或分批使用,而无需首先将整个结果集缓冲在内存中。
await Something.stream(criteria)
.eachRecord(async (record)=>{
});
参数 | 类型 | 详细信息 | |
---|---|---|---|
1 | criteria | 用于匹配数据库中记录的 Waterline 标准。 |
使用以下之一
.eachRecord(async (record)=>{ ... })
.eachBatch(async (records)=>{ ... })
您提供给 eachRecord()
或 eachBatch()
的自定义函数将接收以下参数
参数 | 类型 | 详细信息 | |
---|---|---|---|
1 | record 或 records | 当前记录或当前记录批次。批次数组将始终包含至少一条记录,并且它将永远不会包含超过批次大小(默认情况下为三十)的记录。 |
名称 | 类型 | 何时 |
---|---|---|
UsageError | 如果传入的内容无效,则抛出。 | |
AdapterError | 如果数据库适配器出现问题,则抛出。 | |
Error | 如果发生任何其他意外情况,则抛出。 |
有关在 Sails 和 Waterline 中协商错误的示例,请参见 概念 > 模型和 ORM > 错误。
.stream()
方法几乎与 .find()
完全相同,只是它一次获取一批记录。每次加载一批记录时,都会调用您提供的迭代器函数一次或多次。如果您使用的是 .eachRecord()
,则您的每个记录函数将对批次中的每条记录调用一次。否则,使用 .eachBatch()
,您的每个批次函数将使用整个批次调用一次。
这对于处理非常大的结果集非常有用,如果尝试同时将整个结果集保存在内存中,这些结果集可能会溢出服务器的可用 RAM。您可以使用 Waterline 的 .stream()
方法来执行您可能已经从 Mongo 游标中熟悉的那些操作:准备报告,在 shell 脚本中循环遍历和修改数据库记录,将大量数据从一个地方移动到另一个地方,执行复杂的转换,甚至协调 map/reduce 作业。
我们在下面探索四种示例情况
一个操作,它一次迭代数据库中名为 Finn 的用户
await User.stream({name:'Finn'})
.eachRecord(async (user)=>{
if (Math.random() > 0.5) {
throw new Error('Oops! This is a simulated error.');
}
sails.log(`Found a user ${user.id} named Finn.`);
});
一个操作,它使用动态生成的站点地图进行响应
// e.g. in an action that handles `GET /sitemap.xml`:
var sitemapXml = '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">';
await BlogPost.stream()
.limit(50000)
.sort('title ASC')
.eachRecord((blogPost)=>{
sitemapXml += (
'<url>\n'+
' <loc>https://blog.example.com/' + _.escape(encodeURIComponent(blogPost.slug))+'</loc>\n'+
' <lastmod>'+_.escape(blogPost.updatedAt)+'</lastmod>\n'+
'<changefreq>monthly</changefreq>\n'+
'</url>'
);
});
sitemapXml += '</urlset>';
.populate()
一个命令行脚本片段,它搜索名为“Bailey Bitterbumps”的人的令人毛骨悚然的评论,并将它们报告给当局
// e.g. in a shell script
var numReported = 0;
await Comment.stream({ author: 'Bailey Bitterbumps' })
.limit(1000)
.skip(40)
.sort('title ASC')
.populate('attachedFiles', {
limit: 3,
sort: 'updatedAt'
})
.populate('fromBlogPost')
.eachRecord(async (comment)=>{
var isCreepyEnoughToWorryAbout = comment.rawMessage.match(/creepy/) && comment.attachedFiles.length > 1;
if (!isCreepyEnoughToWorryAbout) {
return;
}
await sails.helpers.sendTemplateEmail.with({
template: 'email-creepy-comment-notification',
templateData: {
url: `https://blog.example.com/${comment.fromBlogPost.slug}/comments/${comment.slug}.`
},
to: '[email protected]',
subject: 'Creepy comment alert'
});
numReported++;
});
sails.log(`Successfully reported ${numReported} creepy comments.`);
如果我们运行上一个示例中的代码,我们将为每个令人毛骨悚然的评论发送一封电子邮件... 这可能很多,考虑到 Bailey Bitterbumps。这不仅速度慢,而且可能意味着向我们的 事务性电子邮件提供商 发送数千个单独的 API 请求,从而迅速超出我们的 API 速率限制。
对于这种情况,我们可以使用 .eachBatch()
来获取正在获取的整个记录批次,而不是一次处理单个记录,从而显着减少必要的 API 请求数量。
默认情况下,.stream()
使用 30 的批次大小。这意味着它将加载每批次最多 30 条记录;因此,如果您使用的是 .eachBatch()
,则每次调用您的自定义函数时,它将接收 1 到 30 条记录。
要增加或减少批次大小,请向 .eachBatch()
传递额外的参数
.eachBatch(100, async (records)=>{
console.log(`Got ${records.length} records.`);
})
在您的代码中使用
.eachBatch()
不一定比使用.eachRecord()
更有效率或效率更低。这是因为,无论您使用哪个迭代器,Waterline 都会一次向数据库请求多条记录(默认情况下为 30 条)。使用.eachBatch()
,您可以使用上面描述的额外参数轻松配置此批次大小。在使用.eachRecord
时也可以自定义批次大小(例如,为了避免被您正在使用的第三方 API 限速)。只需使用.meta()
即可。例如,.meta({batchSize: 100})
。
- 此方法可以与
await
、promise 链或 传统的 Node 回调 一起使用。.stream()
在从任何迭代器收到第一个错误时立即中止并抛出错误。.stream()
一次一个地按顺序对每条记录或批次运行提供的迭代器函数。在 Sails 1.1.0 之前,.stream()
的推荐用法期望迭代器调用回调(next
),该回调作为第二个参数提供。只要您在函数签名中不实际包含第二个参数,这就不再必要了。- 在 Sails v1.0 / Waterline 0.13 之前,此方法有一个较低级的界面,它公开了一个 可读的“对象流”。这很强大,但往往容易出错。新的与适配器无关的
.stream()
不依赖于发射器或任何特定类型的 Node 流。(需要以旧的方式工作吗?别担心,使用少量代码,您仍然可以轻松地使用新的界面构建与 streams2/streams3 兼容的可读“对象流”。)- 在这里 阅读更多关于创建
.stream()
的动机,包括其他示例、背景信息和实现细节。