快速开始:

1
2
3
./nsqlookupd
./nsqd --lookupd-tcp-address=127.0.0.1:4160 -max-msg-size 104857600
./nsqadmin --lookupd-http-address=127.0.0.1:4161

1. NSQ

NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。

1.1 NSQ组成部分

NSQ由3个守护程序组成:

  • nsqlookupd是管理拓扑信息并提供最终一致的发现服务的守护程序。

  • nsqd是接收,排队并将消息传递到客户端的守护程序。

  • nsqadmin是一个Web UI,用于实时群集管理任务。

1.2 拓扑结构

拓扑结构

  • producer:消息发送者
  • consumer:消息接收者

1.3 分发模式

分发模式

  • topic:消息主题
  • channel:消息通道,消息主题的消息复制到每一个channel里

消息是从topic -> channel(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers均匀分布(每个消费者接收该channel的一部分消息)。

1.4 消息通道流程

消息流程

  • 消息默认不持久化,可以配置成持久化模式。nsq采用的方式是 内存+硬盘 的模式,当内存到达一定程度时就会将数据持久化到硬盘。
    • 如果将--mem-queue-size设置为0,所有的消息将会存储到磁盘。
    • 服务器重启时也会将当时在内存中的消息持久化。
  • 每条消息至少传递一次。
  • 消息不保证有序。

2. 快速启动

快速启动

2.1 nsqlookupd

nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic下的nsqd提供了运行时的自动发现服务。 它不维持持久状态,也不需要与任何其他nsqlookupd实例协调以满足查询。因此根据你系统的冗余要求尽可能多地部署nsqlookupd节点。它们消耗的资源很少,可以与其他服务共存。我们的建议是为每个数据中心运行至少3个集群。

开启第一个终端

1
./nsqlookupd

nsqlookupd相关配置项如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-broadcast-address string
该lookupd节点的地址(默认为操作系统主机名)(默认为“ PROSNAKES.local”)
-config string
配置文件的路径
-http-address string
<addr>:<port> 侦听HTTP客户端 (默认为 "0.0.0.0:4161")
-inactive-producer-timeout duration
生产者自上次ping后将保留在活动列表中的持续时间(默认值为5m0s)
-log-prefix string
日志消息前缀(默认为 "[nsqlookupd] ")
-tcp-address string
<addr>:<port> 侦听TCP客户端 (默认 "0.0.0.0:4160")
-tombstone-lifetime duration
如果保留注册,生产者将保持墓碑状态的持续时间(默认为45s)
-verbose
启用详细日志记录
-version
打印版本信息

2.2 nsqd

nsqd是一个守护进程,它接收、排队并向客户端发送消息。

开启第二个终端

1
2
3
./nsqd --lookupd-tcp-address=127.0.0.1:4160 -max-msg-size 104857600
./nsqd -broadcast-address=127.0.0.1
./nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=127.0.0.1:4160

nsqdq相关配置项如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
-auth-http-address value
<addr>:<port> 查询身份验证服务器(可以多次给出)
-broadcast-address string
将使用lookupd注册的地址(默认为OS主机名)(默认为“ PROSNAKES.local”)
-config string
配置文件的路径
-data-path string
存储磁盘支持的消息的路径
-deflate
启用deflate功能协商(客户端压缩)(默认为true
-e2e-processing-latency-percentile value
要跟踪的消息处理时间百分位数(以float(0,1.0])为单位(可以指定多次,也可以用逗号分隔'1.0,0.99,0.95',默认为无)
-e2e-processing-latency-window-time duration
在这段时间内计算端到端延迟分位数(即:60s仅显示过去60秒的分位数计算结果)(默认值为10m0s)
-http-address string
<addr>:<port> 侦听HTTP客户端(默认为“ 0.0.0.0:4151”)
-http-client-connect-timeout duration
HTTP连接超时(默认为2秒)
-http-client-request-timeout duration
HTTP请求超时(默认5秒)
-https-address string
<addr>:<port> 侦听HTTPS客户端(默认为“ 0.0.0.0:4152”)
-log-prefix string
日志消息前缀(默认为“ [nsqd]”)
-lookupd-tcp-address value
lookupd TCP地址(可以多次给出)
-max-body-size int
单个命令正文的最大大小(默认5242880)
-max-bytes-per-file int
滚动前每个磁盘队列文件的字节数(默认为104857600)
-max-deflate-level int
客户端可以协商的最大压缩压缩级别(>值==> nsqd CPU使用率)(默认为6)
-max-heartbeat-interval duration
客户端心跳之间的最大客户端可配置持续时间(默认为1m0s)
-max-msg-size int
单个消息的最大大小(以字节为单位)(默认为1048576)
-max-msg-timeout duration
消息超时之前的最大持续时间(默认为15m0s)
-max-output-buffer-size int
客户端输出缓冲区的最大客户端可配置大小(以字节为单位)(默认值65536)
-max-output-buffer-timeout duration
刷新到客户端之间的最大客户端可配置持续时间(默认为1s)
-max-rdy-count int
客户端的最大RDY计数(默认为2500)
-max-req-timeout duration
消息的最大重新排队超时(默认为1h0m0s)
-mem-queue-size int
保留在内存中的消息数(每个主题/通道)(默认为10000)
-msg-timeout string
自动请求消息之前要等待的时间(默认为“ 1m0s”)
-node-id int
消息ID的唯一部分,(int)范围为[0,1024)(默认为主机名的哈希)(默认为616)
-snappy
启用快捷的特性协商(客户端压缩)(默认为true)
-statsd-address string
UDP <addr>:<port> 用于推送统计信息的statsd守护进程
-statsd-interval string
推送到statsd之间的持续时间(默认为“1m0s”)
-statsd-mem-stats
切换发送内存和GC统计信息到statsd(默认为true)
-statsd-prefix string
用于发送到statsd的密钥的前缀(%s用于主机替换)(默认为“nsq.%s”)
-sync-every int
每个磁盘队列的消息数fsync(默认2500)
-sync-timeout duration
每个磁盘队列fsync持续时间(默认为2s)
-tcp-address string
<addr>:<port> 监听TCP客户端(默认为“0.0.0.0:4150”)
-tls-cert string
证书文件路径
-tls-client-auth-policy string
客户端证书认证策略(“要求”或“要求-验证”)
-tls-key string
密钥文件路径
-tls-min-version value
可接受的最小SSL/TLS版本('ssl3.0''tls1.0''tls1.1''tls1.2')(默认值769)
-tls-required
客户端连接需要TLS (true, false, tcp-https)
-tls-root-ca-file string
证书颁发机构文件的路径
-verbose
启用详细日志
-version
打印版本信息
-worker-id
不要使用这个,使用 --node-id

2.3 nsqadmin

一个实时监控集群状态、执行各种管理任务的Web管理平台。

可选择开启

1
./nsqadmin --lookupd-http-address=127.0.0.1:4161

使用浏览器打开http://127.0.0.1:4171/访问管理界面。

nsqadmin相关的配置项如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
-allow-config-from-cidr string
允许HTTP请求/config端点的CIDR(默认为“127.0.0.1/8”)
-config string
配置文件路径
-graphite-url string
graphite HTTP 地址
-http-address string
<addr>:<port> 监听HTTP客户端(默认为“0.0.0.0:4171”)
-http-client-connect-timeout duration
HTTP连接超时(默认为2s)
-http-client-request-timeout duration
HTTP请求超时(默认5s)
-http-client-tls-cert string
HTTP客户端的证书文件的路径
-http-client-tls-insecure-skip-verify
配置HTTP客户端以跳过对TLS证书的验证
-http-client-tls-key string
HTTP客户端的密钥文件的路径
-http-client-tls-root-ca-file string
HTTP客户端的CA文件的路径
-log-prefix string
日志消息前缀(默认为“[nsqadmin]”)
-lookupd-http-address value
lookupd HTTP地址(可能多次给出)
-notification-http-endpoint string
将向其发送管理操作的POST通知的HTTP端点(完全限定)
-nsqd-http-address value
nsqd HTTP地址(可能多次给出)
-proxy-graphite
代理HTTP请求graphite
-statsd-counter-format string
statsd的实现应用的计数器统计键格式化。如果不需要格式化,则将其设置为空字符串。(默认“stats.counters. % s.count”)
-statsd-gauge-format string
测量统计statsd实现应用的键格式。如果不需要格式化,则将其设置为空字符串。(默认“stats.gauges. % s”)
-statsd-interval duration
时间间隔nsqd被配置为推到statsd(必须匹配nsqd)(默认为1m0s)
-statsd-prefix string
用于发送到statsd的密钥的前缀(%s用于主机替换,必须匹配nsqd)(默认为“nsq.%s”)
-version
打印版本信息

3. pynsq

3.1 基础收发代码

https://pynsq.readthedocs.io/en/latest/writer.html

3.2 生产者单独数据包发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import logging
import nsq
import tornado
logging.basicConfig(level=logging.INFO)

writer = nsq.Writer(['127.0.0.1:4150'])

@tornado.gen.coroutine
def do_pub():
yield tornado.gen.sleep(1)
writer.pub("test_topic", bytes("hello world",enconding='utf-8'))

tornado.ioloop.IOLoop.instance().run_sync(do_pub)
print("wrote one message to nsq")

Writer参数

  • nsqd_tcp_addresses – 一个序列,其元素的格式为“ address:port”,与该作者应发布到的nsqd实例相对应
  • name – 用于记录消息的字符串(默认为第一个nsqd地址)
  • **kwargs – 传递给nsq.AsyncConn初始化

3.3 消费者接收数据包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time
import nsq
import threading
import tornado

'''
使用tornado.ioloop.IOLoop.instance().start()子线程接收数据
'''
data = []

def get_flow():

def handler(message):
global data
message.enable_async()
data.append(str(message.body, encoding='utf-8'))
print(message)
print(str(message.body, encoding='utf-8'))
message.finish()
return True

r = nsq.Reader(
message_handler=handler,
nsqd_tcp_addresses=['127.0.0.1:4150'],
topic='flow',
channel='read',
max_in_flight=10
)
tornado.ioloop.IOLoop.instance().start()
nsq.run()

t = threading.Thread(target=get_flow)
t.start()

for i in range(10000):
print("machine learning")
print(len(data))
time.sleep(1)

Reader参数

  • message_handler – 对收到的每条消息执行的可调用对象
  • topic – 指定所需的NSQ主题
  • channel – 指定所需的NSQ通道
  • name – 用于记录消息的字符串(默认为“ topic:channel”)
  • nsqd_tcp_addresses – 该读取器应连接到的nsqd实例的字符串地址序列
  • lookupd_http_addresses – 该阅读器应查询nsqlookupd实例的字符串地址序列,以查询指定主题的生产者
  • max_tries – 读者尝试处理一条消息的最大尝试次数,之后该消息将被自动丢弃
  • max_in_flight – 该阅读器将流水处理的最大消息数。 该值将在已配置/已发现的nsqd生产者之间平均分配
  • lookupd_poll_interval – 查询所有提供的nsqlookupd实例之间的时间(以秒为单位)。 最初将引入基于该值的随机时间,以便在运行多个读取器时增加抖动
  • lookupd_poll_jitter – 要添加到lookupd池循环中的最大抖动分数。 即使多个使用者同时重新启动,这也有助于平均分配请求。
  • lookupd_connect_timeout – 等待建立与nsqlookupd的连接的时间(以秒为单位)
  • lookupd_request_timeout – 等待nsqlookupd请求完成的时间(以秒为单位)。
  • low_rdy_idle_timeout – 在重新分配RDY计数的状态下(即max_in_flight <num_producers),等待生产者发出消息的时间(以秒为单位)
  • max_backoff_duration – 我们允许退避状态持续的最长时间(以秒为单位)
  • **kwargs – 传递给nsq.AsyncConn初始化

3.4 Tornado

1
2
3
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()

前两句是启动服务器,启动服务器之后,还需要启动 IOLoop 的实例,这样可以启动事件循环机制,配合非阻塞的 HTTP Server 工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 从另一个线程停止Torando的解决方案。
import threading
import tornado.ioloop
import tornado.web
import time


class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world!\n")

def start_tornado(*args, **kwargs):
application = tornado.web.Application([
(r"/", MainHandler),
])
application.listen(8888)
print "Starting Torando"
tornado.ioloop.IOLoop.instance().start()
print "Tornado finished"

def stop_tornado():
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
print "Asked Tornado to exit"

def main():

t = threading.Thread(target=start_tornado)
t.start()
time.sleep(5)
stop_tornado()
t.join()

if __name__ == "__main__":
main()

开始用Tornado:从Hello World开始

如何停止Tornado Web服务器

参考链接

https://www.liwenzhou.com/posts/Go/go_nsq/

https://jiajunhuang.com/articles/2020_08_15-nsq.md.html

感谢观看,如有不足或错误,恳请大佬指正 (^_^)#