nsq介绍及源码阅读
简介
nsq客户端逻辑
nsq消费者
主要请参考nsq_tail
代码。nsqd
的回应消息处理代码为func (c *Conn) readLoop()
。
TCP消息流的二进制结构请参考官方文档:http://nsq.io/clients/tcp_protocol_spec.html
nsq消费者与nsqd
建立连接的流程如下:
- 当建立好TCP连接后,客户端必须发送一个 4 字节的 “magic” 标识码,表示通讯协议的版本。
V2
(4 个字节的 ASCII [space][space][V][2]) 消费用到的推送流协议(和发布用到的请求/响应协议) - 认证后,客户端可以发送
IDENTIFY
命令来停供常用的元数据(比如,更多的描述标识码)和协商特性。服务器会根据客户端请求的内容返回一个JSON数据或直接返回OK
- 然后,客户端还必须使用
SUB
命令订阅一个话题(Topic)和通道(Channel)。成功后服务器会返回OK
- 最后,还需要设置
RDY
状态。如果RDY
状态为 0 ,意味着客户端不会收到任何消息。因此需要设置一个RDY
状态值,例如设置100,不需要任何附加命令,将会有 100 条消息推送到客户端
消费消息数据时,需要给NSQD返回该消息是否成功被处理。只有成功被处理的消息,才真正从NSQ队列中删除不会再投递到任何消费者。
nsq生产者
主要参考https://github.com/nsqio/go-nsq项目中Producer
类的实现。
nsq生产者与nsqd
建立连接的流程如下:
- 当建立好TCP连接后,客户端必须发送一个 4 字节的 “magic” 标识码,表示通讯协议的版本。
V2
(4 个字节的 ASCII [space][space][V][2]) 消费用到的推送流协议(和发布用到的请求/响应协议) - 认证后,客户端可以发送
IDENTIFY
命令来停供常用的元数据(比如,更多的描述标识码)和协商特性。服务器会根据客户端请求的内容返回一个JSON数据或直接返回OK
,表明连接建立成功。
将消息投递到NSQD时,成功后NSQD会返回OK
。由于返回消息上没有ID,表明上看是不能做pipeline操作的。
不过由于在一条连接上NSQD的返回消息肯定与接收到的消息顺序一一对应,因此可以做pipeline操作,可以连续调用多次PUB/MPUB
命令,
但需要将这些命令保存下来,等待NSQD返回数据后再决定是否将这些命令标记完成还是标记为需要重新投递。
实现时,可以借用TCP的滑动窗口概念。如果滑动窗口为1,相当于每次调用PUB/MPUB
命令都需要等待服务器返回后才决定下一步操作,这就退化为同步操作。
nsqd内部处理逻辑
一个Topic可以有多Channel
,每个消息都会复制一份放入Channel
中,也就是说每个Channel
的数据都是独立的。如果消费速度更不上生产的速度,那么每个Channel
上的数据都会序列化到磁盘上,这里是一个坑,有可能会因此导致数据写磁盘多份。
另外,NSQ不能保证数据的消费顺序与生产顺序完全一致。
与nsqlookupd交互
代码调用路径如下:
nsqd.Main()
n.waitGroup.Wrap(func() { n.lookupLoop() })
func (n *NSQD) lookupLoop() : 91行: case val := <-n.notifyChan:
消息分发
func (t *Topic) messagePump()
这里进行消息的分发,直接将该topic下的消息推送给所有的channel上。
消息ID
func (n *NSQD) idPump()
这里生成新的消息ID,然后放入到 n.idChan
中。64位int64的guid生成算法参考https://github.com/bmizerany/noeqd,主要部分解释如下:
time - 41位 (当前毫秒数,一共69年)
配置好的机器ID - 10 bits - 一共支持1024个机器
顺序号 - 12 bits - 每个机器在同一毫秒内一共支持4096个
pub接口
发布一条消息到NSQ消息队列中。代码路径 func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error)
。
- 判断消息长度是否超过限制
- 获取topic名称
- 根据topic名称,获取
Topic
对象,最终会调用到这里:func (n *NSQD) GetTopic(topicName string) *Topic
- 如果topic存在,直接返回
Topic
对象 - 如果topic不存在,就创建一个:
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic
- 创建
Topic
之后,询问lookupd
,获取所有关注这个topic的channel列表,然后获取或创建这些Channel
。 - 创建一个新的Message:
msg := NewMessage(<-s.ctx.nsqd.idChan, body)
- 将该消息放到
Topic
上:err = topic.PutMessage(msg)