使用Cettia构建实时Web应用程序第2部分

本教程的第二部分讨论套接字和套接字组的特性和状态,以及缩放Cettia。

使用Cettia构建实时Web应用程序第2部分

广播事件

要将事件发送到多个套接字,您可以创建一个套接字,向该套接字添加套接字并发送迭代到该套接字上的事件。它应该可以工作,但是socket是有状态的而不是可序列化的,这意味着调用者应该总是检查每次这个套接字是否可用; 无法在导线的另一侧处理此插座。Cettia以功能性的方式解决了这些问题。

应用程序创建一个套接字动作并将其传递给服务器的其中一个查找器方法。

服务器找到相应的套接字并执行传递的动作,逐个传递套接字。

在这里,动作表示用于处理给定参数的功能接口。通过这种方式,您可以将状态管理委托给服务器,并通过构建套接字操作来关注套接字处理; 您还可以序列化并向群集中的其他服务器广播一个操作而不是套接字,然后让服务器为其自己的套接字执行操作。

查找方法来查找服务器中的所有套接字 server.all(Action action)。将以下chat 事件处理程序添加 到套接字处理程序以将给定chat 事件发送 到服务器中的每个套接字:

socket.on("chat", data -> {

server.all((Action & Serializable) s -> {

s.send("chat", data);

});

});

不要将它与通过注册的套接字处理程序混淆 server.onsocket(Action action)。Finder方法包括 server.all 处理服务器中的现有套接字(集群中的每个服务器), server.onsocket 并且要初始化由此新接受的套接字 server。

事实上,编写和提交一个动作只有在你需要做一些比发送事件更复杂的事情时才有用。如果它不那么复杂,可以使用一行代码完成 Sentence。chat 在一行中重写上述 事件处理程序:

socket.on("chat", data -> server.all().send("chat", data));

Sentence 是由服务器创建并返回的,当它的finder方法被调用时没有动作,即 server.all()。Sentence与上面一样, 每个方法 send都映射到预先实现的通用套接字操作,因此如果执行该方法,则其映射操作将根据被调用的查找器方法与服务器找到的套接字一起执行。这就是为什么上述两个代码片段完全相同的原因。

要演示 chat 事件处理程序,请在一个选项卡中打开2个套接字,或在每个浏览器中打开2个浏览器和一个套接字。在跟踪套接字的状态时,将第1部分的logState 事件处理程序添加到内置事件中很方便 。

var socket1 = cettia.open("http://127.0.0.1:8080/cettia");

socket1.on("chat", data => console.log("socket1", data));

var socket2 = cettia.open("http://127.0.0.1:8080/cettia");

socket2.on("chat", data => console.log("socket2", data));

一旦所有套接字打开,选择其中一个并发送一个 chat 事件。然后,你应该看到一个聊天活动通过发送 socket1 和广播 socket1 和 socket2。

socket1.send("chat", "Is it safe to invest in Bitcoin?");

您可能急于回答这个问题。在控制台上使用它。

使用特定的套接字

在大多数情况下,您可能会处理代表特定实体的一组套接字,而不是简单的所有套接字。例如,实体可以是登录到多个浏览器的用户,进入聊天室的用户,游戏中的红队员等等。正如所解释的,服务器的finder方法接受一个标准来查找套接字和一个用找到的套接字执行的动作,这里使用的标准是标签。Cettia允许在套接字中添加和移除标签,并提供查找器方法来查找标签套接字,例如查询数据库。

作为一个简单的例子,我们来编写 myself 事件处理程序,它将给定的事件发送到使用我的用户名标记的套接字。这里,这些套接字表示一个名为我自己的实体。假设用户名包含在username URI中的命名查询参数 中,并且URI编码为safe。例如,如果套接字的URI是 /cettia?username=alice,那么套接字处理程序将通过添加 alice 标记到套接字 socket.tag(String tagName),并且在myself 分派事件时 ,服务器将查找包含该alice 标记的 套接字 server.byTag(String... names) 并将事件发送给它们。

这是一个myself 事件处理程序的实现 。假设有一种方法被称为 findUsernameParameter 从给定的URI查找用户名参数。

String username = findUsernameParameter(socket.uri());

socket.tag(username).on("myself", data -> server.byTag(username).send("myself", data));

要测试 myself 事件处理程序,请在一个选项卡中打开3个套接字,或在每个浏览器中打开3个浏览器和一个套接字

var socket1 = cettia.open("http://127.0.0.1:8080/cettia?username=alice");

socket1.on("myself", data => console.log("socket1", data));

var socket2 = cettia.open("http://127.0.0.1:8080/cettia?username=alice");

socket2.on("myself", data => console.log("socket2", data));

var socket3 = cettia.open("http://127.0.0.1:8080/cettia?username=bob");

socket3.on("myself", data => console.log("socket3", data));

一旦所有套接字打开后,选择其中一个并发送一个 myself 事件。

socket2.send("myself", "A private message for me");

你应该看到,发送的事件 socket2 被广播到 socket1 和 socket2,但不 socket3,他的用户名是不同的。这样,如果您向自己发送一条直接消息,无论您使用哪个浏览器或设备,它都将广播到您打开套接字的每个浏览器和设备,这对于改善多设备用户体验非常有用。

您可能需要比较 myself 与事件echo 和 chat 事件,从第1部分。运行以下代码并找出这些事件之间的不同之处。

[socket1, socket2, socket3].forEach((socket, i) => {

const log = data => console.log(`socket${i + 1}`, data);

socket.on("echo", log).on("chat", log);

});

断开处理

到目前为止,我们只处理打开状态的套接字,但断开连接是不可避免的。如果任何事件由于断开而无法发送给用户,并且在连接恢复时尽管延迟发送,但情况变得复杂。并非所有断线都是相同的; 它们在断开和重新连接之间的时间段中变化。通常,临时断开比永久断开更常见,特别是在移动环境中,并且每种情况的用户体验都不相同。如果某些事件由于暂时断开而延迟几秒钟后传递,则客户可以将它们视为按时交付,但如果延迟由于永久断开而延迟几分钟或几小时,则可能会更好发送关于错过活动的电子邮件。

Cettia将临时断开定义为在60秒内重新连接后的断开连接。它将套接字的生命周期设计为不受临时断开影响,并提供事件驱动的方式来处理断开连接。这是一个服务器端示例,用于发送由于下一个连接断开连接而导致事件失败的事件。附加以下导入:

import java.util.Queue;

import java.util.concurrent.ConcurrentLinkedQueue;

以下代码给套接字处理程序:

Queue queue = new ConcurrentLinkedQueue<>();

socket.oncache(args -> queue.offer(args));

socket.onopen(v -> {

while (socket.state() == ServerSocket.State.OPENED && !queue.isEmpty()) {

Object[] args = queue.poll();

socket.send((String) args[0], args[1], (Action>) args[2], (Action>) args[3]);

}

});

socket.ondelete(v -> queue.forEach(args -> System.out.println(socket + " missed event - name: " + args[0] + ", data: " + args[1])));

请参阅第1部分的套接字生命周期部分,了解服务器的socket 事件与套接字的 open 事件之间的区别 ,以及套接字 open 和 delete 事件的发送时间。默认情况下,客户端重新连接到服务器,延迟间隔由延迟500和比率2(500,1000,2000,4000 ...)的几何级数确定。

  • 如果send 调用该方法时套接字没有活动连接 ,cache 则会使用用于调用send 方法的参数数组触发该 事件 。在这种情况下,您可以决定并收集下次重新连接时发送的事件 queue。

  • 如果 open 事件被触发,queue 通过一个新的连接逐个发送项目来清除 。即使在 open 事件中,您也应该检查套接字是否已打开,以免干扰 queue 。

  • 如果 delete 事件被触发并且 queue 不为空,则必须根据您希望提供的其他事件的用户体验与应用程序的其他构建块一起工作。例如,可以使用数据库来存储错过的事件,并在下次访问服务时显示它们。可以使用推送通知系统来通知用户错过的事件,并且可以使用SMTP服务器发送错过事件的摘要电子邮件。

请注意,在编写套接字操作并将其提交给服务器时,您无需关心给定套接字的状态。即使套接字没有连接并且未能发送事件,也可以在cache 处理程序中安全地处理它们 。

模拟暂时断开的最简单方法是设置一个 name 选项来打开套接字并刷新网页。该 name 选项是浏览上下文中的一个标识符,以允许套接字name 在下一页共享相同的 选项,并继承当前页面中套接字的生命周期。由于此选项可帮助在页面导航过程中恢复错过的事件,因此将实时Web功能添加到多页面应用程序时非常有用。打开开发人员工具 index.html 并运行以下代码片段:

var socket1 = cettia.open("http://127.0.0.1:8080/cettia", {name: "main"});

socket1.on("chat", data => console.log("socket1", "message", data.message, "with", Date.now() - data.sentAt, "ms delay"));

刷新网页,然后 socket1 应该断开连接。在刷新的页面上运行以下代码片段:

var socket2 = cettia.open("http://127.0.0.1:8080/cettia");

socket2.once("open", () => socket2.send("chat", {message: "ㅇㅅㅇ", sentAt: Date.now()}));

socket2.on("chat", data => console.log("socket2", "message", data.message, "with", Date.now() - data.sentAt, "ms delay"));

发送的聊天事件 socket2 无法到达, socket1 因为它没有活动连接,而是将事件缓存在队列中 socket1。如果您在刷新的页面上再次运行第一个代码段,以便 socket1延长生命周期,则应该看到 socket1 接收缓存的事件。当然,如果您推迟运行第一个代码片段1分钟,您将看到 socket1 调度该 delete 事件,因此它的缓存事件在服务器中记录为错过的事件。

缩放Cettia应用程序

最后但并非最不重要的是扩展应用程序。如前所述,任何发布 - 订阅消息系统都可用于水平扩展Cettia应用程序,并且不需要对现有应用程序进行任何修改。缩放Cettia应用程序的想法非常简单:

当调用服务器的其中一个finder方法时,它会将此方法调用序列化为消息并将其发布到群集。

当一个服务器从集群接收到一些消息时,它将反序列化为方法调用并将其应用于其自己的套接字。

无论传递给finder方法的套接字动作多么复杂,只要它是可序列化的,它就可以在其他服务器中的套接字中执行; 你不需要担心很多序列化。由Sentence 所提供的操作 都是可序列化的,并且您可以使用Java 8的强制表达式简单地进行操作,就像 (Action & Serializable) socket -> {} 您必须使用普通操作一样。

在本教程中,我们将使用Hazelcast作为发布 - 订阅消息系统。添加以下依赖项:

com.hazelcast

hazelcast

3.9.3

com.hazelcast

hazelcast-client

3.9.3

现在,添加以下导入:

import com.hazelcast.config.Config;

import com.hazelcast.core.HazelcastInstance;

import com.hazelcast.core.ITopic;

import com.hazelcast.instance.HazelcastInstanceFactory;

import io.cettia.ClusteredServer;

import java.util.Map;

将Cettia零件的第一行替换为 Server server = new DefaultServer();以下行:

ClusteredServer server = new ClusteredServer();

ClusteredServer 类有两种方法:

  1. onpublish(Action> action) - 服务器拦截并序列化对包装服务器的finder方法调用,并将它们传递给参数操作。该操作应该将其发布到群集。

  2. messageAction() - 此操作将已发布的消息进行反序列化,并调用已包装的服务器的查找程序方法。应该在从群集到达时用消息调用它。

为了给你一个想法server.onpublish(message -> server.messageAction().on(message));, ClusteredServer 将会和你的想法 完全一样 DefaultServer。将下面的代码添加到 CettiaConfigListener#contextInitialized 方法中:

// Hazelcast part

HazelcastInstance hazelcast = HazelcastInstanceFactory.newHazelcastInstance(new Config());

ITopic> topic = hazelcast.getTopic("cettia");

// It publishes messages given by the server

server.onpublish(message -> topic.publish(message));

// It relays published messages to the server

topic.addMessageListener(message -> server.messageAction().on(message.getMessageObject()));

现在,如果应用程序server.all 使用动作调用 ,则传递的动作将被序列化并广播到群集中的所有服务器,并由群集中的每个服务器反序列化并执行。让我们在端口8080上重新启动服务器,打开一个新的shell,并通过运行在端口8090上启动另一台服务器 mvn jetty:run -Djetty.port=8090。然后您会看到8080和8090上的Hazelcast节点形成群集。

要测试实现,请在每个端口的一个选项卡中打开2个套接字,或者在每个浏览器中打开2个浏览器和一个套接字:

var socket1 = cettia.open("http://127.0.0.1:8080/cettia");

socket1.on("chat", data => console.log("socket1", data));

var socket2 = cettia.open("http://127.0.0.1:8090/cettia");

socket2.on("chat", data => console.log("socket2", data));

所有套接字打开后,选择其中一个并发送 chat 事件:

socket1.send("chat", "Greetings from 8080");

正如您所看到的,chat 从8080上连接到服务器的客户端发送的 事件会传播到连接到8090和8080上的服务器的客户端。

至于部署,毕竟它只是一个Web应用程序,所以您可以像往常一样部署应用程序并配置环境。请记住,您应该启用“粘性会话”来部署集群式Cettia应用程序。需要管理由多个传输组成的套接字生命周期,并启用由多个HTTP请求响应交换组成的HTTP传输。

结论

Cettia 是一个功能齐全的实时Web应用程序框架,可用于实时在服务器和客户端之间交换事件。在分离关注原则之后,框架分为三层:一个I / O框架不可知层,用于在JVM上的任何I / O框架上运行Cettia应用程序; 传输层以提供可靠的全双工消息信道; 和一个套接字层来提供优雅的模式,以在实时网络中实现更好的用户体验。这种多层体系结构只允许专注于应用级实时事件处理,并且在技术堆栈上有更大的选择自由度。

在本教程中,我们已经了解了Cettia团队在Cettia中做出的关键设计决策背后的原因,以及构建实时导向应用程序所需的各种模式和功能,而不会影响Cettia,因此,我们已经构建了入门套件。


分享到:


相關文章: