使用RethinkDB构建实时应用的简介

发布于:2021-01-11 10:10:22

0

1134

0

RethinkDB 构建 应用

JSON文档存储库RethinkDB专为跨多台计算机的可伸缩性而构建,它是一种使用简单查询语言的分布式数据库。这是入门方法。

RethinkDB 是用于构建实时Web应用程序的开源数据库。开发人员可以将查询变成实时供稿,而不是轮询更改,而实时供稿将实时更新不断推送到应用程序。RethinkDB的流式更新简化了实时后端架构,通过使更改传播成为应用程序持久层的本机部分,消除了多余的麻烦。

除了为实时应用程序开发提供独特功能外,RethinkDB还受益于一些有用的特性,这些特性有助于提供愉悦的开发人员体验。RethinkDB是一种无模式JSON文档存储,旨在实现可伸缩性和易用性,具有易于分片,支持分布式联接和表达性查询语言的功能。

本教程将演示如何使用RethinkDB和Node.js构建实时Web应用程序。它将使用Socket.io将实时更新传达给前端。如果您想继续,可以 安装RethinkDB 或 在云中运行它。

ReQL的第一步

RethinkDB查询语言(ReQL)将自身嵌入用于构建应用程序的编程语言中。ReQL被设计为流利的API,您可以将它们链接在一起以组成查询。

在开始构建应用程序之前,让我们花一些时间来探索查询语言。尝试查询的最简单方法是使用RethinkDB的管理控制台,该控制台通常在端口8080上运行。您可以在Data Explorer选项卡的文本字段中键入RethinkDB查询,然后运行它们以查看输出。数据资源管理器提供自动完成和语法突出显示功能,在学习ReQL时可能会有所帮助。

默认情况下,RethinkDB创建一个名为test的数据库。让我们从向testdatabase添加表开始:

r.db("test").tableCreate("fellowship")

现在,让我们向该表添加一组九个JSON文档:

r.table("fellowship").insert([   { name: "Frodo", species: "hobbit" },   { name: "Sam", species: "hobbit" },   { name: "Merry", species: "hobbit" },   { name: "Pippin", species: "hobbit" },   { name: "Gandalf", species: "istar" },   { name: "Legolas", species: "elf" },   { name: "Gimili", species: "dwarf" },   { name: "Aragorn", species: "human" },   { name: "Boromir", species: "human" } ])

当您运行上面的命令时,数据库将输出一个数组,该数组包含为所有新文档生成的主键。它还将告诉您成功插入了多少新记录。现在我们已经在数据库中有了一些记录,让我们尝试使用ReQL的filter命令来获取团契的霍比特人:

r.table("fellowship").filter({species:"hobbit"})

filter命令检索与提供的布尔表达式匹配的文档。在这种情况下,我们特别希望文件中物种属性等于霍比特人。如果要执行更多操作,可以将其他命令链接到查询。例如,您可以使用以下查询来更改所有霍比特人的种类属性的值:

r.table("fellowship").filter({species: "hobbit"})                      .update({species: "halfling"})

ReQL甚至有一个内置的http命令,您可以使用该命令从公共Web API中获取数据。在以下示例中,我们使用http命令从受欢迎的subreddit中获取当前帖子。完整查询检索帖子,按分数排序,然后显示前五个条目中的几个属性:

r.http("http://www.reddit.com/r/aww.json")("data")("children")("data")  .orderBy(r.desc("score")).limit(5).pluck("score", "title", "url")

如您所见,ReQL对于许多特殊数据分析非常有用。您可以使用它以多种有趣的方式对复杂的JSON数据结构进行切片和切块。如果您想了解更多关于ReQL,你可以参考 API参考文档,则 ReQL引进 的RethinkDB网站上。

在Node.js和Express中使用RethinkDB

既然您已经具备ReQL的基本工作知识,那么该开始构建应用程序了。我们将首先研究如何使用Node.js和Express制作一个API后端,该后端将ReQL查询的输出提供给最终用户。

npm中的rethinkdb模块提供RethinkDB的官方JavaScript客户端驱动程序。您可以在Node.js应用程序中使用它来编写和发送查询。以下示例显示如何执行简单查询并显示输出:

var r = require("rethinkdb");   r.connect().then(function(conn) {   return r.db("test").table("fellowship")           .filter({species: "halfling"}).run(conn)     .finally(function() { conn.close(); }); }) .then(function(cursor) {   return cursor.toArray(); }) .then(function(output) {   console.log("Query output:", output); }) .error(function(err) {   console.log("Failed:", err); });

connect方法建立与RethinkDB的连接。它返回一个连接句柄,您要在执行查询时将其提供给run命令。上面的示例在研究金表中找到所有半身人,然后在控制台中显示它们各自的JSON文档。它使用promise处理异步执行流,并确保在操作完成后正确关闭连接。

让我们在上面的示例中进行扩展,添加具有API端点的Express服务器,该端点使用户能够获取所需物种的所有研究金成员:

var app = require("express")(); var r = require("rethinkdb");   app.listen(8090); console.log("App listening on port 8090");   app.get("/fellowship/species/:species", function(req, res) {   r.connect().then(function(conn) {     return r.db("test").table("fellowship")             .filter({species: req.params.species}).run(conn)         .finally(function() { conn.close(); });   })   .then(function(cursor) { return cursor.toArray(); })   .then(function(output) { res.json(output); })   .error(function(err) { res.status(500).json({err: err}); }) });

如果您以前使用过Express,则上面的代码应该看起来非常直观。URL路径中的最后一个路径段表示一个变量,我们将其传递给ReQL查询中的filter命令,以便仅获取所需的文档。查询完成后,应用程序会将JSON输出中继给用户。如果查询未能完成,则应用程序将返回状态代码500并提供错误。

实时更新与更新

RethinkDB专为构建实时应用程序而设计。通过将changes命令附加到ReQL查询的末尾,可以获得实时的连续查询更新流。changes命令创建一个changefeed,当查询结果更改时,它将为您提供一个接收新记录的游标。以下代码演示了如何使用变更供稿来显示表更新:

r.connect().then(function(c) {   return r.db("test").table("fellowship").changes().run(c); }) .then(function(cursor) {   cursor.each(function(err, item) {     console.log(item);   }); });

每当研究金表中的数据更改时,都会执行cursor.each回调。您可以通过进行任意更改来自己测试。例如,在Boromir被兽人杀死之后,我们可以将其从团契中删除:

r.table("fellowship").filter({name:"Boromir"}).delete()

当查询从团契中删除Boromir时,演示应用程序将在stdout中显示以下JSON数据:

{   new_val: null,   old_val: {     id: '362ae837-2e29-4695-adef-4fa415138f90',     name: 'Boromir',     species: 'human'   } }

当变更供稿提供更新通知时,它们会告诉您记录的先前值和记录的新值。您可以将两者进行比较,以查看发生了什么变化。删除现有记录后,新值将为null。同样,当表接收新记录时,旧值是null。

changes命令当前可用于以下几种查询:get,between,filter,map,orderBy,min和max。计划在将来支持其他类型的查询,例如组操作。

实时计分板

让我们考虑一个更复杂的示例:带有排行榜的多人游戏。您想显示得分最高的前五名用户,并在列表更改时实时更新。RethinkDB更改提要使此操作变得容易。您可以将changefeed附加到包含orderBy和limit命令的查询。每当排名前五名的用户的得分或总体组成发生变化时,changefeed都会为您提供更新。

在开始设置变更提要之前,让我们开始使用数据资源管理器创建一个新表并用一些示例数据填充它:

r.db("test").tableCreate("players") r.table("players").indexCreate("score") r.table("players").insert([   {name: "Bill", score: 33},   {name: "Janet", score: 42},   {name: "Steve", score: 68}   ... ])

创建索引有助于数据库对指定属性进行更有效的排序-在本例中为得分。目前,如果您对索引进行订购,则只能将orderBy命令与changefeeds一起使用。

要检索当前的前五名球员及其得分,可以使用以下ReQL表达式:

r.db("test").table("scores").orderBy({index: r.desc("score")}).limit(3)

我们可以在最后添加changes命令,以获取更新流。为了使这些更新到达前端,我们将使用Socket.io,这是一个用于在服务器和客户端之间实现实时消息传递的框架。它支持多种传输方法,包括WebSockets。Socket.io用法的详细信息不在本文讨论范围之内,但是您可以通过访问Socket.io官方文档了解更多信息 。

以下代码使用sockets.emit将来自更新的更新广播到所有连接的Socket.io客户端:

var sockio = require("socket.io"); var app = require("express")(); var r = require("rethinkdb");   var io = sockio.listen(app.listen(8090), {log: false}); console.log("App listening on port 8090");   r.connect().then(function(conn) {   return r.table("scores").orderBy({index: r.desc("score")})     .limit(5).changes().run(conn); }) .then(function(cursor) {   cursor.each(function(err, data) {     io.sockets.emit("update", data);   }); });

在前端,您可以使用Socket.io客户端库来设置接收updateevent的处理程序:

var socket = io.connect();   socket.on("update", function(data) {   console.log("Update:", data); });

这是一个很好的开始,但是我们需要一种在用户第一次加载页面时填充初始列表值的方法。为此,让我们扩展服务器,以便在用户首次连接时通过Socket.io广播当前排行榜:

var getLeaders = r.table("scores").orderBy({index: r.desc("score")}).limit(5);   r.connect().then(function(conn) {   return getLeaders.changes().run(conn); }) .then(function(cursor) {   cursor.each(function(err, data) {     io.sockets.emit("update", data);   }); });   io.on("connection", function(socket) {   r.connect().then(function(conn) {     return getLeaders.run(conn)       .finally(function() { conn.close(); });   })   .then(function(output) { socket.emit("leaders", output); }); });

在两种情况下,该应用程序都使用相同的基础ReQL表达式,因此我们可以将其存储在变量中以方便重用。ReQL的方法链使其非常有助于这种可组合性。

为了结束演示,让我们构建一个完整的前端。为简单起见,我将使用Polymer的数据绑定系统。让我们从定义模板开始:

<template id="scores" is="auto-binding">   <ul>     <template repeat="{{user in users}}">       <li><strong>{{user.name}}:</strong> {{user.score}}</li>     </template>   </ul> </template>

它使用repeat属性为每个用户插入一个li标签。li标签的内容显示用户名及其当前分数。接下来,让我们编写JavaScript代码:

var scores = document.querySelector("#scores"); var socket = io.connect();   socket.on("leaders", function(data) {   scores.users = data; });   socket.on("update", function(data) {   for (var i in scores.users)     if (scores.users[i].id === data.old_val.id) {       scores.users[i] = data.new_val;       scores.users.sort(function(x,y) { return y.score - x.score });       break;     } });

Leader事件的处理程序仅从服务器获取数据并将其分配给存储用户的模板变量。更新处理程序有点复杂。它在排行榜中找到与old_val相关的条目,然后将其替换为新数据。

当排行榜中某个用户的分数发生变化时,它将用一个具有更新编号的新记录替换旧记录。如果排行榜中的某个用户被以前不存在的用户所取代,它将用一个用户的记录替换另一个用户的记录。上面的代码将正确处理这两种情况。

当然,changefeed更新并不能帮助我们维持用户的实际订单。为了解决该问题,我们只需在每次更新后对用户数组进行排序。Polymer的数据绑定系统将确保实际的DOM表示始终反映所需的顺序。

演示应用程序现已完成,您可以通过运行查询来更改用户分数来对其进行测试。在数据资源管理器中,您可以尝试运行以下内容:

r.table("scores").filter({name: "Bill"})  .update({score: r.row("score").add(100)})

当您更改用户分数的值时,您将看到排行榜更新以反映这些更改。

下一步

常规数据库主要围绕查询/响应工作流进行设计,该工作流很好地映射到Web的传统请求/响应模型。但是,像WebSockets这样的现代技术使构建实时流更新的应用程序成为可能,而没有HTTP请求的延迟或开销。

RethinkDB是第一个专门为实时Web设计的开源数据库。变更提要提供了一种构建查询的方法,该查询可以连续推出实时更新,而无需进行常规轮询。