3 回答
TA贡献1871条经验 获得超8个赞
自从其他两个答案(尽管信息非常丰富,实际上是错误的)以来,我发布了一个快速更新,并且我不希望其他人从我接受的答案中得到误导。您不仅可以使用 zmq 做到这一点,它实际上是默认行为。
诀窍是,如果您的发布者客户端在不断丢弃消息之前从未连接到订阅者服务器(这就是为什么我认为它不缓冲消息),但是如果您的发布者连接到订阅者并且您重新启动订阅者,则发布者将缓冲消息直到达到 HWM,这正是我所要求的...所以简而言之,发布者想知道另一端是否有人接受消息,只有在此之后它才会缓冲消息...
这是一些示例代码,演示了这一点(您可能需要进行一些基本编辑才能编译它)。
我只使用了这个依赖项org.zeromq:jeromq:0.5.1。
zmq-publisher.kt
fun main() {
val uri = "tcp://localhost:3006"
val context = ZContext(1)
val socket = context.createSocket(SocketType.PUB)
socket.hwm = 10000
socket.linger = 0
"connecting to $uri".log()
socket.connect(uri)
fun publish(path: String, msg: Msg) {
">> $path | ${msg.json()}".log()
socket.sendMore(path)
socket.send(msg.toByteArray())
}
var count = 0
while (notInterrupted()) {
val msg = telegramMessage("message : ${++count}")
publish("/some/feed", msg)
println()
sleepInterruptible(1.second)
}
}
而且当然zmq-subscriber.kt
fun main() {
val uri = "tcp://localhost:3006"
val context = ZContext(1)
val socket = context.createSocket(SocketType.SUB)
socket.hwm = 10000
socket.receiveTimeOut = 250
"connecting to $uri".log()
socket.bind(uri)
socket.subscribe("/some/feed")
while (true) {
val path = socket.recvStr() ?: continue
val bytes = socket.recv()
val msg = Msg.parseFrom(bytes)
"<< $path | ${msg.json()}".log()
}
}
尝试先在没有订阅者的情况下运行发布者,然后当您启动订阅者时,您错过了到目前为止的所有消息...现在不重新启动发布者,停止订阅者等待一段时间并再次启动它。
这是我的一项服务实际上从中受益的示例......这是结构[current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]
因为我在中间重新启动服务,所有发布者开始缓冲他们的消息,每秒观察约 200 条消息的最终服务观察到下降到 0(那些 1 或 2 是心跳),然后突然爆发 1000 多条消息,因为所有发布者都刷新了他们的缓冲区(重新启动大约需要 5 秒)...实际上我在这里没有丢失任何消息...
请注意,您必须有subscriber:server <= publisher:client
一对(这样发布者就知道“我需要将这些消息传递到一个地方”(您可以尝试绑定发布者并连接订阅者,但您将不会再看到发布者缓冲消息,因为它是有问题的,如果刚刚断开连接的订阅者这样做是因为它不再需要数据或因为它失败)
TA贡献1830条经验 获得超9个赞
首先,欢迎来到 Zen-of-Zero 的世界,其中延迟最为重要
序言:
ZeroMQ 是由 Pieter HINTJENS 的团队设计的,该团队由经验丰富的大师组成,Martin SUSTRIK 名列第一。该设计经过专业设计,以避免任何不必要的延迟。那么询问是否有(有限的)持久性?不,先生,尚未确认 -PUB/SUB
可扩展的正式通信模式原型不会内置它,因为增加了问题并降低了性能和可扩展性(附加延迟、附加处理、附加内存管理)。
如果需要(有限的)持久性(对于缺少远程 SUB 端代理的连接),请随意在应用程序端实现它,或者可以设计和实现一种新的 ZMTP 兼容的此类行为模式原型,扩展 ZeroMQ 框架,如果此类工作进入稳定且公开接受的状态,但不要求高性能、消除延迟的标准已经打磨了PUB/SUB
几乎线性的可扩展性 ad astra,以朝这个方向进行修改。这绝对不是一条路可走。
解决方案 ?
应用程序端可以使用双指针循环缓冲区轻松实现您添加的逻辑,以某种(应用程序端管理)的方式工作- Persistence-PROXY,但位于PUB
发送方的前面。
如果您的设计还喜欢使用最近提供的内置 ZeroMQ 组件来设置额外socket_monitor
的控制层并在那里接收事件流(如图所示),那么您的设计可能会成功地从 ZeroMQ 内部细节中挤出一些额外的酱料从 PUB 端Context
实例“内部”,其中一些额外的网络和连接管理相关事件可能会给您的(应用程序端管理)带来更多信息-持久性代理
然而,请注意
该
_zmq_socket_monitor()_
方法仅支持面向连接的传输,即TCP、IPC和TIPC。
因此,如果计划使用任何最终有趣的传输类,人们可能会直接忘记这一点{ inproc:// | norm:// | pgm:// | epgm:// | vmci:// }
小心 !
“...zmq 优化主题发布?就像如果您持续
topic
快速发布大约 100 个字符长的内容,它实际上是topic
每次都发送还是映射到某个 int 并随后发送 int...?”
告诉你:
“它总是会发布
topic.
当我使用该pub-sub
模式时,我通常发布topic
第一个消息,然后发布实际消息,因此在订阅者中,我只读取第一帧并忽略它,然后读取实际消息”
ZeroMQ 不是这样工作的。没有什么是“单独”<topic>
后面跟着一个<message-body>
,而是相反
主题过滤TOPIC
的机械化工作方式非常不同。
1)你永远不知道,谁.connect()
-s:
即几乎可以肯定版本 2.x 直到版本 4.2+ 将以不同的方式处理主题过滤(ZMTP:RFC 定义初始功能版本握手,以让Context
-instance决定必须使用哪个版本的主题过滤:
版本 2.x用于将所有消息移动到所有对等方,并让所有SUB 端(版本 2.x+)传递消息(并让- SUB
side Context
-instance 处理本地topic
-list 过滤处理)
,而
4.2+ 版本肯定会topic
在 **PUB-side Context
-instance 上执行 -list 过滤处理(CPU 使用量增长,网络传输相反),所以你的 SUB-一方永远不会传递一个字节的“无用”读取“未订阅”的消息。
2)(您可以,但是)不需要将“主题”分离到由此暗示的多帧消息的第一帧中。也许恰恰相反(在高性能、低延迟的分布式系统设计中这样做是一种相当反模式。
主题过滤过程被定义并按字节工作,从左到右,每个主题列表成员值与传递的消息有效负载进行模式匹配。
添加额外的数据、额外的帧管理处理只会增加端到端延迟和处理开销。这样做从来都不是一个好主意,而不是正确的分布式系统设计工作。
尾声:
职业比赛中没有轻而易举的胜利,也没有唾手可得的成果分布式系统设计,如果越少低延迟或超低延迟是设计目标。
另一方面,请确保 ZeroMQ 框架是在考虑到这一点的情况下制定的,并且这些努力以稳定的、最终性能良好平衡的工具集为顶峰,这些工具集智能(设计)、快速(运行)和可扩展(因为地狱可能会这样)由于这种设计智慧,人们喜欢使用正确的信号/消息服务。
希望您对 ZeroMQ 感到满意,并随意在您选择的应用程序套件内的 ZeroMQ 层“前面”添加任何附加功能集。
TA贡献1815条经验 获得超10个赞
发布者无法在没有任何连接的情况下缓冲消息,它只会删除任何新消息:
来自文档:
如果发布者没有连接的订阅者,那么它将简单地删除所有消息。
这意味着您的缓冲区需要不受 Zeromq 的关注。您的缓冲区可以是列表、数据库或您选择的任何其他存储方法,但您不能使用发布者来执行此操作。
现在下一个问题是处理如何检测订阅者已连接/断开连接。这需要告诉我们何时需要开始从缓冲区读取/填充缓冲区。
我建议使用Socket.monitor
和监听 和ZMQ_EVENT_CONNECTED
,ZMQ_EVENT_DISCONNECTED
因为它们会告诉您客户端何时连接/断开连接,从而使您能够切换到填充您选择的缓冲区。当然,可能还有其他不直接涉及 Zeromq 的方法,但这由您决定。
添加回答
举报