快速开始:
1 2 3
| ./nsqlookupd ./nsqd --lookupd-tcp-address=127.0.0.1:4160 -max-msg-size 104857600 ./nsqadmin --lookupd-http-address=127.0.0.1:4161
|
1. NSQ
NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。
1.1 NSQ组成部分
NSQ由3个守护程序组成:
1.2 拓扑结构
- producer:消息发送者
- consumer:消息接收者
1.3 分发模式
- topic:消息主题
- channel:消息通道,消息主题的消息复制到每一个channel里
消息是从topic -> channel
(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers
均匀分布(每个消费者接收该channel的一部分消息)。
1.4 消息通道流程
- 消息默认不持久化,可以配置成持久化模式。nsq采用的方式是
内存+硬盘
的模式,当内存到达一定程度时就会将数据持久化到硬盘。
- 如果将
--mem-queue-size
设置为0,所有的消息将会存储到磁盘。
- 服务器重启时也会将当时在内存中的消息持久化。
- 每条消息至少传递一次。
- 消息不保证有序。
2. 快速启动
快速启动
2.1 nsqlookupd
nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic
下的nsqd提供了运行时的自动发现服务。 它不维持持久状态,也不需要与任何其他nsqlookupd实例协调以满足查询。因此根据你系统的冗余要求尽可能多地部署nsqlookupd
节点。它们消耗的资源很少,可以与其他服务共存。我们的建议是为每个数据中心运行至少3个集群。
开启第一个终端
nsqlookupd
相关配置项如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| -broadcast-address string 该lookupd节点的地址(默认为操作系统主机名)(默认为“ PROSNAKES.local”) -config string 配置文件的路径 -http-address string <addr>:<port> 侦听HTTP客户端 (默认为 "0.0.0.0:4161") -inactive-producer-timeout duration 生产者自上次ping后将保留在活动列表中的持续时间(默认值为5m0s) -log-prefix string 日志消息前缀(默认为 "[nsqlookupd] ") -tcp-address string <addr>:<port> 侦听TCP客户端 (默认 "0.0.0.0:4160") -tombstone-lifetime duration 如果保留注册,生产者将保持墓碑状态的持续时间(默认为45s) -verbose 启用详细日志记录 -version 打印版本信息
|
2.2 nsqd
nsqd是一个守护进程,它接收、排队并向客户端发送消息。
开启第二个终端
1 2 3
| ./nsqd --lookupd-tcp-address=127.0.0.1:4160 -max-msg-size 104857600 ./nsqd -broadcast-address=127.0.0.1 ./nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=127.0.0.1:4160
|
nsqdq
相关配置项如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| -auth-http-address value <addr>:<port> 查询身份验证服务器(可以多次给出) -broadcast-address string 将使用lookupd注册的地址(默认为OS主机名)(默认为“ PROSNAKES.local”) -config string 配置文件的路径 -data-path string 存储磁盘支持的消息的路径 -deflate 启用deflate功能协商(客户端压缩)(默认为true) -e2e-processing-latency-percentile value 要跟踪的消息处理时间百分位数(以float(0,1.0])为单位(可以指定多次,也可以用逗号分隔'1.0,0.99,0.95',默认为无) -e2e-processing-latency-window-time duration 在这段时间内计算端到端延迟分位数(即:60s仅显示过去60秒的分位数计算结果)(默认值为10m0s) -http-address string <addr>:<port> 侦听HTTP客户端(默认为“ 0.0.0.0:4151”) -http-client-connect-timeout duration HTTP连接超时(默认为2秒) -http-client-request-timeout duration HTTP请求超时(默认5秒) -https-address string <addr>:<port> 侦听HTTPS客户端(默认为“ 0.0.0.0:4152”) -log-prefix string 日志消息前缀(默认为“ [nsqd]”) -lookupd-tcp-address value lookupd TCP地址(可以多次给出) -max-body-size int 单个命令正文的最大大小(默认5242880) -max-bytes-per-file int 滚动前每个磁盘队列文件的字节数(默认为104857600) -max-deflate-level int 客户端可以协商的最大压缩压缩级别(>值==> nsqd CPU使用率)(默认为6) -max-heartbeat-interval duration 客户端心跳之间的最大客户端可配置持续时间(默认为1m0s) -max-msg-size int 单个消息的最大大小(以字节为单位)(默认为1048576) -max-msg-timeout duration 消息超时之前的最大持续时间(默认为15m0s) -max-output-buffer-size int 客户端输出缓冲区的最大客户端可配置大小(以字节为单位)(默认值65536) -max-output-buffer-timeout duration 刷新到客户端之间的最大客户端可配置持续时间(默认为1s) -max-rdy-count int 客户端的最大RDY计数(默认为2500) -max-req-timeout duration 消息的最大重新排队超时(默认为1h0m0s) -mem-queue-size int 保留在内存中的消息数(每个主题/通道)(默认为10000) -msg-timeout string 自动请求消息之前要等待的时间(默认为“ 1m0s”) -node-id int 消息ID的唯一部分,(int)范围为[0,1024)(默认为主机名的哈希)(默认为616) -snappy 启用快捷的特性协商(客户端压缩)(默认为true) -statsd-address string UDP <addr>:<port> 用于推送统计信息的statsd守护进程 -statsd-interval string 推送到statsd之间的持续时间(默认为“1m0s”) -statsd-mem-stats 切换发送内存和GC统计信息到statsd(默认为true) -statsd-prefix string 用于发送到statsd的密钥的前缀(%s用于主机替换)(默认为“nsq.%s”) -sync-every int 每个磁盘队列的消息数fsync(默认2500) -sync-timeout duration 每个磁盘队列fsync持续时间(默认为2s) -tcp-address string <addr>:<port> 监听TCP客户端(默认为“0.0.0.0:4150”) -tls-cert string 证书文件路径 -tls-client-auth-policy string 客户端证书认证策略(“要求”或“要求-验证”) -tls-key string 密钥文件路径 -tls-min-version value 可接受的最小SSL/TLS版本('ssl3.0'、'tls1.0'、'tls1.1'或'tls1.2')(默认值769) -tls-required 客户端连接需要TLS (true, false, tcp-https) -tls-root-ca-file string 证书颁发机构文件的路径 -verbose 启用详细日志 -version 打印版本信息 -worker-id 不要使用这个,使用 --node-id
|
2.3 nsqadmin
一个实时监控集群状态、执行各种管理任务的Web管理平台。
可选择开启
1
| ./nsqadmin --lookupd-http-address=127.0.0.1:4161
|
使用浏览器打开http://127.0.0.1:4171/
访问管理界面。
nsqadmin
相关的配置项如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| -allow-config-from-cidr string 允许HTTP请求/config端点的CIDR(默认为“127.0.0.1/8”) -config string 配置文件路径 -graphite-url string graphite HTTP 地址 -http-address string <addr>:<port> 监听HTTP客户端(默认为“0.0.0.0:4171”) -http-client-connect-timeout duration HTTP连接超时(默认为2s) -http-client-request-timeout duration HTTP请求超时(默认5s) -http-client-tls-cert string HTTP客户端的证书文件的路径 -http-client-tls-insecure-skip-verify 配置HTTP客户端以跳过对TLS证书的验证 -http-client-tls-key string HTTP客户端的密钥文件的路径 -http-client-tls-root-ca-file string HTTP客户端的CA文件的路径 -log-prefix string 日志消息前缀(默认为“[nsqadmin]”) -lookupd-http-address value lookupd HTTP地址(可能多次给出) -notification-http-endpoint string 将向其发送管理操作的POST通知的HTTP端点(完全限定) -nsqd-http-address value nsqd HTTP地址(可能多次给出) -proxy-graphite 代理HTTP请求graphite -statsd-counter-format string statsd的实现应用的计数器统计键格式化。如果不需要格式化,则将其设置为空字符串。(默认“stats.counters. % s.count”) -statsd-gauge-format string 测量统计statsd实现应用的键格式。如果不需要格式化,则将其设置为空字符串。(默认“stats.gauges. % s”) -statsd-interval duration 时间间隔nsqd被配置为推到statsd(必须匹配nsqd)(默认为1m0s) -statsd-prefix string 用于发送到statsd的密钥的前缀(%s用于主机替换,必须匹配nsqd)(默认为“nsq.%s”) -version 打印版本信息
|
3. pynsq
3.1 基础收发代码
https://pynsq.readthedocs.io/en/latest/writer.html
3.2 生产者单独数据包发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import logging import nsq import tornado logging.basicConfig(level=logging.INFO)
writer = nsq.Writer(['127.0.0.1:4150'])
@tornado.gen.coroutine def do_pub(): yield tornado.gen.sleep(1) writer.pub("test_topic", bytes("hello world",enconding='utf-8'))
tornado.ioloop.IOLoop.instance().run_sync(do_pub) print("wrote one message to nsq")
|
Writer参数
- nsqd_tcp_addresses – 一个序列,其元素的格式为“ address:port”,与该作者应发布到的nsqd实例相对应
- name – 用于记录消息的字符串(默认为第一个nsqd地址)
- **kwargs – 传递给nsq.AsyncConn初始化
3.3 消费者接收数据包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import time import nsq import threading import tornado
''' 使用tornado.ioloop.IOLoop.instance().start()子线程接收数据 ''' data = []
def get_flow():
def handler(message): global data message.enable_async() data.append(str(message.body, encoding='utf-8')) print(message) print(str(message.body, encoding='utf-8')) message.finish() return True
r = nsq.Reader( message_handler=handler, nsqd_tcp_addresses=['127.0.0.1:4150'], topic='flow', channel='read', max_in_flight=10 ) tornado.ioloop.IOLoop.instance().start() nsq.run()
t = threading.Thread(target=get_flow) t.start()
for i in range(10000): print("machine learning") print(len(data)) time.sleep(1)
|
Reader参数
- message_handler – 对收到的每条消息执行的可调用对象
- topic – 指定所需的NSQ主题
- channel – 指定所需的NSQ通道
- name – 用于记录消息的字符串(默认为“ topic:channel”)
- nsqd_tcp_addresses – 该读取器应连接到的nsqd实例的字符串地址序列
- lookupd_http_addresses – 该阅读器应查询nsqlookupd实例的字符串地址序列,以查询指定主题的生产者
- max_tries – 读者尝试处理一条消息的最大尝试次数,之后该消息将被自动丢弃
- max_in_flight – 该阅读器将流水处理的最大消息数。 该值将在已配置/已发现的nsqd生产者之间平均分配
- lookupd_poll_interval – 查询所有提供的nsqlookupd实例之间的时间(以秒为单位)。 最初将引入基于该值的随机时间,以便在运行多个读取器时增加抖动
- lookupd_poll_jitter – 要添加到lookupd池循环中的最大抖动分数。 即使多个使用者同时重新启动,这也有助于平均分配请求。
- lookupd_connect_timeout – 等待建立与nsqlookupd的连接的时间(以秒为单位)
- lookupd_request_timeout – 等待nsqlookupd请求完成的时间(以秒为单位)。
- low_rdy_idle_timeout – 在重新分配RDY计数的状态下(即max_in_flight <num_producers),等待生产者发出消息的时间(以秒为单位)
- max_backoff_duration – 我们允许退避状态持续的最长时间(以秒为单位)
- **kwargs – 传递给nsq.AsyncConn初始化
3.4 Tornado
1 2 3
| http_server = tornado.httpserver.HTTPServer(application) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()
|
前两句是启动服务器,启动服务器之后,还需要启动 IOLoop 的实例,这样可以启动事件循环机制,配合非阻塞的 HTTP Server 工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import threading import tornado.ioloop import tornado.web import time
class MainHandler(tornado.web.RequestHandler): def get(self): self.write("Hello, world!\n")
def start_tornado(*args, **kwargs): application = tornado.web.Application([ (r"/", MainHandler), ]) application.listen(8888) print "Starting Torando" tornado.ioloop.IOLoop.instance().start() print "Tornado finished"
def stop_tornado(): ioloop = tornado.ioloop.IOLoop.instance() ioloop.add_callback(ioloop.stop) print "Asked Tornado to exit"
def main():
t = threading.Thread(target=start_tornado) t.start() time.sleep(5) stop_tornado() t.join()
if __name__ == "__main__": main()
|
开始用Tornado:从Hello World开始
如何停止Tornado Web服务器
参考链接
https://www.liwenzhou.com/posts/Go/go_nsq/
https://jiajunhuang.com/articles/2020_08_15-nsq.md.html
感谢观看,如有不足或错误,恳请大佬指正 (^_^)#