博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
NSQ入门
阅读量:3960 次
发布时间:2019-05-24

本文共 3199 字,大约阅读时间需要 10 分钟。

简介

NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征

适合小型项目使用,用来学习消息队列实现原理、学习 golang channel知识以及如何用 go 来写分布式,为什么说适合小型小型项目使用因为,nsq 如果没有能力进行二次开发的情况存在的问题还是很多的。

Nsq特性

1. 支持无 SPOF 的分布式拓扑    2. 水平扩展(没有中间件,无缝地添加更多的节点到集群)    3. 低延迟消息传递 (性能)    4. 结合负载均衡和多播消息路由风格    5. 擅长面向流媒体(高通量)和工作(低吞吐量)工作负载    6. 主要是内存中(除了高水位线消息透明地保存在磁盘上)    7. 运行时发现消费者找到生产者服务(nsqlookupd)    8. 传输层安全性 (TLS)    9. 数据格式不可知    10. 一些依赖项(容易部署)和健全的,有界,默认配置    11. 任何语言都有简单 TCP 协议支持客户端库    12. HTTP 接口统计、管理行为和生产者(不需要客户端库发布)    13. 为实时检测集成了 statsd    14. 健壮的集群管理界面 (nsqadmin)

应用场景

异步处理

参照下图利用消息队列把业务流程中的非关键流程异步化,从而显著降低业务请求的响应时间。

在这里插入图片描述
![在这里插入图片描述](https://img-blog.csdnimg.cn/img_convert/eecec391b68a0080e9bee68d174d8994.png#pic_cente

应用解耦

通过使用消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他业务要使用订单数据可直接订阅消息队列,提高系统的灵活性。

在这里插入图片描述

流量削峰

类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。

在这里插入图片描述

NSQ架构

NSQ工作模式

在这里插入图片描述

Go操作NSQ

安装

go get -u github.com/nsqio/go-nsq

一个简单的生产者,消费者示例代码如下:

生产者

// nsq_producer/main.gopackage mainimport (	"bufio"	"fmt"	"os"	"strings"	"github.com/nsqio/go-nsq")// NSQ Producer Demovar producer *nsq.Producer// 初始化生产者func initProducer(str string) (err error) {
config := nsq.NewConfig() producer, err = nsq.NewProducer(str, config) if err != nil {
fmt.Printf("create producer failed, err:%v\n", err) return err } return nil}func main() {
nsqAddress := "127.0.0.1:4150" err := initProducer(nsqAddress) if err != nil {
fmt.Printf("init producer failed, err:%v\n", err) return } reader := bufio.NewReader(os.Stdin) // 从标准输入读取 for {
data, err := reader.ReadString('\n') if err != nil {
fmt.Printf("read string from stdin failed, err:%v\n", err) continue } data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q" {
// 输入Q退出 break } // 向 'topic_demo' publish 数据 err = producer.Publish("topic_demo", []byte(data)) if err != nil {
fmt.Printf("publish msg to nsq failed, err:%v\n", err) continue } }}

消费者

// nsq_consumer/main.gopackage mainimport (	"fmt"	"os"	"os/signal"	"syscall"	"time"	"github.com/nsqio/go-nsq")// NSQ Consumer Demo// MyHandler 是一个消费者类型type MyHandler struct {
Title string}// HandleMessage 是需要实现的处理消息的方法func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return}// 初始化消费者func initConsumer(topic string, channel string, address string) (err error) {
config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err := nsq.NewConsumer(topic, channel, config) if err != nil {
fmt.Printf("create consumer failed, err:%v\n", err) return } consumer := &MyHandler{
Title: "沙河1号", } c.AddHandler(consumer) // if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD if err := c.ConnectToNSQLookupd(address); err != nil {
// 通过lookupd查询 return err } return nil}func main() {
err := initConsumer("topic_demo", "first", "127.0.0.1:4161") if err != nil {
fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) // 定义一个信号的通道 signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c <-c // 阻塞}

转载地址:http://lzhzi.baihongyu.com/

你可能感兴趣的文章
Struts2+Spring3+Mybatis3开发环境搭建
查看>>
mongoDB入门必读(概念与实战并重)
查看>>
通俗易懂解剖jbpm4
查看>>
云盘 同步盘介绍 同步工具介绍
查看>>
rsync
查看>>
win7 英文版电脑 不睡眠,不休眠
查看>>
Bash中如何判断一个命令是否存在 查看当前目录下文件的个数
查看>>
makefile
查看>>
linux 文件权限
查看>>
部分简化字感觉不如繁体字有深意
查看>>
cgo 崩溃 64位地址截断引发的挂死问题
查看>>
drbd
查看>>
网络 IP
查看>>
网络路由
查看>>
网络 tcp 性能 可靠
查看>>
网络 https 握手
查看>>
去掉调试信息
查看>>
lsof 使用
查看>>
golang获取本机地址
查看>>
date 使用
查看>>