P:生产者,也就是要发送消息的程序。
C:消费者:消息的接收者,会一直等待消息到来。
简单模式就是单发单收,消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。
这种模式就是多个消费者消费同一个队列中的消息,既然消费者多了那么就出现了消息分配的问题,所以对应着两种分配策略:
在这种模型中,多了一个Exchange角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)。
C:消费者,消息的接收者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机(X),一方面,接收生产者发送的消息。另一方面,如何处理消息,递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下4种类型:
注意:Exchange负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
路由模式其实就是上述发布/订阅模式的交换机转发类型变成了Direct类型。在这种模式下:
Exchange不再把消息交给每一个绑定的队列,而是根据消息的routingkey进行判断,只有队列的
routingkey与消息的routingkey完全一致,才会接收到消息。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routingkey。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routingkey完全匹配的队列。
C1:消费者,其所在队列指定了需要routingkey为error的消息。
C2:消费者,其所在队列指定了需要routingkey为info、error、warning的消息。
路由模式其实就是上述发布/订阅模式的交换机转发类型变成了Topic类型。在这种模式下:
队列的routingkey与消息的routingkey符合匹配规则,就可以接收到消息,有两种规则:
*:可以(只能)匹配一个单词。
#:可以匹配多个单词(或者零个)。
所以图中,routingkey为a.orange.b的消息就会被转发到Q1,而routingkey为Lazy.a.b.c的消息就会被发送到Q2。
安装API库
Go可以使用streadway/amqp库来操作rabbit,使用以下命令来安装:
gogetgithub.com/streadway/amqp
封装rabbitmq
接下来我们对streadway/amqp库的内容进行一个二次封装,封装为一个rabbitmq.go文件:
生产者
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字producer:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","simple")i:=0for{//每隔2s发送一次消息time.Sleep(time.Second*2)producer.Send("simple","simplemessage:"+strconv.Itoa(i))i=i+1}}消费者
funcmain(){consumer:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","simple")//接收消息时,指定messages:=consumer.Consume()gofunc(){forch:=rangemessages{log.Printf("Receivedamessage:%s",ch.Body)//消费消息要用3stime.Sleep(time.Second*3)}}()select{}}运行结果:
2022/11/0518:54:47Receivedamessage:"simplemessage:0"2022/11/0518:54:52Receivedamessage:"simplemessage:1"2022/11/0518:54:57Receivedamessage:"simplemessage:2"
公平分发模式:
公平分发模式采用的是轮询机制,它会将数个任务按顺序平均分发给消费者。
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字producer:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","worker")i:=0for{//每隔2s发送一次消息time.Sleep(time.Second*2)producer.Send("worker","workermessage:"+strconv.Itoa(i))i=i+1}}消费者1
funcmain(){consumer1:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","worker")//接收消息messages:=consumer1.Consume()gofunc(){forch:=rangemessages{log.Printf("Receivedamessage:%s",ch.Body)//消费消息要用3stime.Sleep(time.Second*3)}}()select{}}消费者2
funcmain(){consumer2:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","worker")//接收消息messages:=consumer2.Consume()gofunc(){forch:=rangemessages{log.Printf("Receivedamessage:%s",ch.Body)//消费消息要用3stime.Sleep(time.Second*3)}}()select{}}运行结果:
#消费者12022/11/0519:45:03Receivedamessage:"workermessage:0"2022/11/0519:45:07Receivedamessage:"workermessage:2"2022/11/0519:45:11Receivedamessage:"workermessage:4"
#消费者22022/11/0519:45:05Receivedamessage:"workermessage:1"2022/11/0519:45:09Receivedamessage:"workermessage:3"2022/11/0519:45:13Receivedamessage:"workermessage:5"
可以发现,公平模式下,偶数消息都被发送给了消费者1,而奇数消息都被发送给了消费者2。
公平派遣模式:
公平派遣模式下发送端与公平分发相同,消费者端只需要加一段配置代码,我们可以将预取计数设置为1。这告诉RabbitMQ一次不要给消费者一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给消费者。而是将其分派给不忙的下一个消费者。
关于消息的确认:
为了确保消息永不丢失,RabbitMQ支持消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。
我们之前的代码中,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除(调用Consumer的第三个参数是autoAck,表示是否自动回复)。在这种情况下,如果你终止一个消费者那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个消费者的尚未消费的消息。如果一个消费者意外宕机了,那么我们希望将任务交付给其他消费者来消费者。
所以一旦向消费者传递了一条消息,就不能马上将其标记为删除,而是要手动确认。我们需要在创建消费者的时候将autoAck参数标记为false:
//Consume接收某个消息队列的消息func(q*RabbitMQ)Consume()<-chanamqp.Delivery{c,e:=q.channel.Consume(q.Name,//指定从哪个队列中接收消息"",false,//不自动确认消息false,false,false,nil,)failOnError(e,"接收消息失败!")returnc}然后每消费完一条消息需要调用Ack(false)函数手动回复。
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字producer:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","worker")i:=0for{//每隔2s发送一次消息time.Sleep(time.Second*2)producer.Send("worker","workermessage:"+strconv.Itoa(i))i=i+1}}消费端限流:
实现公平派遣模式我们需要设置消费者端一次只能消费一条消息,之前我们已经进行了封装,直接在消费者端调用即可:
//Qos配置queue参数func(q*RabbitMQ)Qos(){e:=q.channel.Qos(1,0,false)failOnError(e,"无法设置QoS")}消费者1
funcmain(){consumer1:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","worker")//指定一次只消费一条消息,直到消费完才重新接收consumer1.Qos()//接收消息messages:=consumer1.Consume()gofunc(){forch:=rangemessages{log.Printf("Receivedamessage:%s",ch.Body)//消费消息要用10stime.Sleep(time.Second*10)//手动回复ch.Ack(false)}}()select{}}消费者2
funcmain(){consumer2:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","worker")//指定一次只消费一条消息,直到消费完才重新接收consumer2.Qos()//接收消息messages:=consumer2.Consume()gofunc(){forch:=rangemessages{log.Printf("Receivedamessage:%s",ch.Body)//消费消息要用2stime.Sleep(time.Second*2)//手动回复ch.Ack(false)}}()select{}}运行结果:
#消费者12022/11/0520:31:26Receivedamessage:"workermessage:0"2022/11/0520:31:36Receivedamessage:"workermessage:5"
#消费者22022/11/0520:31:28Receivedamessage:"workermessage:1"2022/11/0520:31:30Receivedamessage:"workermessage:2"2022/11/0520:31:32Receivedamessage:"workermessage:3"2022/11/0520:31:34Receivedamessage:"workermessage:4"2022/11/0520:31:38Receivedamessage:"workermessage:6"2022/11/0520:31:40Receivedamessage:"workermessage:7"
funcmain(){producer:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue")rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/","exchange1","fanout")i:=0for{time.Sleep(time.Second)//fanout模式下不用routingkeyproducer.Publish("exchange1","pubsubmessage:"+strconv.Itoa(i),"")i=i+1}}消费者1
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer1:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue1")//队列绑定到exchangeconsumer1.Bind("exchange1","")//接收消息msgs:=consumer1.Consume()gofunc(){ford:=rangemsgs{log.Printf("Consumer1receivedamessage:%s",d.Body)d.Ack(false)}}()select{}}消费者2
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer2:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue2")//队列绑定到exchangeconsumer2.Bind("exchange1","")//接收消息msgs:=consumer2.Consume()gofunc(){ford:=rangemsgs{log.Printf("Consumer2receivedamessage:%s",d.Body)d.Ack(false)}}()select{}}运行结果:
#消费者12022/11/0522:32:19Consumer1receivedamessage:"pubsubmessage:0"2022/11/0522:32:20Consumer1receivedamessage:"pubsubmessage:1"2022/11/0522:32:21Consumer1receivedamessage:"pubsubmessage:2"2022/11/0522:32:22Consumer1receivedamessage:"pubsubmessage:3"2022/11/0522:32:23Consumer1receivedamessage:"pubsubmessage:4"2022/11/0522:32:24Consumer1receivedamessage:"pubsubmessage:5"
#消费者22022/11/0522:32:19Consumer2receivedamessage:"pubsubmessage:0"2022/11/0522:32:20Consumer2receivedamessage:"pubsubmessage:1"2022/11/0522:32:21Consumer2receivedamessage:"pubsubmessage:2"2022/11/0522:32:22Consumer2receivedamessage:"pubsubmessage:3"2022/11/0522:32:23Consumer2receivedamessage:"pubsubmessage:4"2022/11/0522:32:24Consumer2receivedamessage:"pubsubmessage:5"
funcmain(){producer:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue")//指定为direct类型rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/","exchange","direct")i:=0for{time.Sleep(time.Second)//如果是奇数,就发key1//如果是偶数,就发key2ifi%2!=0{producer.Publish("exchange","routingmessage:"+strconv.Itoa(i),"key1")}else{producer.Publish("exchange","routingmessage:"+strconv.Itoa(i),"key2")}i=i+1}}消费者1
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer1:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue1")//队列绑定到exchangeconsumer1.Bind("exchange","key1")//接收消息msgs:=consumer1.Consume()gofunc(){ford:=rangemsgs{log.Printf("Consumer1receivedamessage:%s",d.Body)d.Ack(false)}}()select{}}消费者2
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer2:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue2")//队列绑定到exchangeconsumer2.Bind("exchange","key2")//接收消息msgs:=consumer2.Consume()gofunc(){ford:=rangemsgs{log.Printf("Consumer2receivedamessage:%s",d.Body)d.Ack(false)}}()select{}}运行结果:
#消费者12022/11/0522:51:10Consumer1receivedamessage:"routingmessage:1"2022/11/0522:51:12Consumer1receivedamessage:"routingmessage:3"2022/11/0522:51:14Consumer1receivedamessage:"routingmessage:5"2022/11/0522:51:16Consumer1receivedamessage:"routingmessage:7"
#消费者22022/11/0522:51:11Consumer2receivedamessage:"routingmessage:0"2022/11/0522:51:13Consumer2receivedamessage:"routingmessage:2"2022/11/0522:51:15Consumer2receivedamessage:"routingmessage:4"2022/11/0522:51:17Consumer2receivedamessage:"routingmessage:6"
funcmain(){producer:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue")//指定为topic类型rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/","exchange2","topic")variintfor{time.Sleep(time.Second)ifi%2!=0{producer.Publish("exchange2","topicmessage:"+strconv.Itoa(i),"a.test.b.c")}else{producer.Publish("exchange2","topicmessage:"+strconv.Itoa(i),"a.test.b")}i++}}消费者1
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer1:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue1")//队列绑定到exchangeconsumer1.Bind("exchange2","*.test.*")//接收消息msgs:=consumer1.Consume()gofunc(){ford:=rangemsgs{log.Printf("Consumer1receivedamessage:%s",d.Body)d.Ack(false)}}()select{}}消费者2
funcmain(){//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer2:=rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/","queue2")//队列绑定到exchangeconsumer2.Bind("exchange2","#.test.#")//接收消息msgs:=consumer2.Consume()gofunc(){ford:=rangemsgs{log.Printf("Consumer2receivedamessage:%s",d.Body)d.Ack(false)}}()select{}}运行结果:
#消费者12022/11/0523:09:53Consumer1receivedamessage:"topicmessage:0"2022/11/0523:09:55Consumer1receivedamessage:"topicmessage:2"2022/11/0523:09:57Consumer1receivedamessage:"topicmessage:4"2022/11/0523:09:59Consumer1receivedamessage:"topicmessage:6"
#消费者22022/11/0523:09:53Consumer2receivedamessage:"topicmessage:0"2022/11/0523:09:54Consumer2receivedamessage:"topicmessage:1"2022/11/0523:09:55Consumer2receivedamessage:"topicmessage:2"2022/11/0523:09:56Consumer2receivedamessage:"topicmessage:3"2022/11/0523:09:57Consumer2receivedamessage:"topicmessage:4"2022/11/0523:09:58Consumer2receivedamessage:"topicmessage:5"2022/11/0523:09:59Consumer2receivedamessage:"topicmessage:6"