【RabbitMQ】Go实现
RabbitMQ是一个消息代理,底层为队列,支持AMQP(高级消息队列协议)。
消息队列的作用
- 应用解耦:生产者和消费者之间通过消息传递进行通信,而非直接调用。
- 异步通信:消息的发送方可以在不等待响应的情况下继续执行,消息会被存储在队列中等待接收者处理,提高了系统的吞吐量。
- 削峰填谷:在遇到瞬时的高流量情况下,消息队列作为缓冲区,防止数据过载。
消息队列的缺点
- 降低系统的可用性:系统引入外部依赖越多,越容易挂掉。
- 系统复杂度提高:需要面对各种问题,比如保证消息没有被重复消费、处理信息丢失的情况、保证消息传递的顺序性。
- 一致性问题:多个系统处理消息可能成功可能不成功,造成数据不一致。
Go使用RabbitMQ
首先,安装rabbitmq
choco install rabbitmq
安装后,会自动启动服务,默认运行在本地5672
端口,管理界面为localhost:15672
,默认用户和密码均为guest
接下来用go编写两段程序send.go
和receive.go
模拟消息发送方和接收方。
send.go
:
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 连接到RabbitMQ服务器(broker)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()
// 创建channel(amqp.Channel)
ch, err := conn.Channel()
if err != nil {
log.Fatalf("channel.open: %s", err)
}
defer ch.Close()
// 声明一个队列
ch.QueueDeclare(
"hello", // 队列名称,若该队列不存在则创建
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// 发送消息到队列,超时时间设为2秒
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // 带超时context的固定写法
msg := "Hello World!"
ch.PublishWithContext(
ctx,
"", // exchange
"hello", // exchange为空时,key为队列名称
false, // mandatory
false, // immediate
amqp.Publishing{ // 消息内容
ContentType: "text/plain", // MIME content type
Body: []byte(msg),
},
)
}
receive.go
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 连接到RabbitMQ服务器(broker)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()
// 创建channel(amqp.Channel)
ch, err := conn.Channel()
if err != nil {
log.Fatalf("channel.open: %s", err)
}
defer ch.Close()
// 声明一个队列,这里不需要声明队列,因为send.go已经声明过了
// 如果声明队列参数不一致,会报错
// ch.QueueDeclare(
// "hello", // 队列名称,若该队列不存在则创建
// false, // durable
// false, // delete when unused
// false, // exclusive
// false, // no-wait
// nil, // arguments
// )
// 接收消息,返回go语言原生channel(只读)
deliverieCh, err := ch.Consume(
"hello", // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("basic.consume: %s", err)
}
// 从deliverieCh中读取消息
for delivery := range deliverieCh {
log.Printf("Received a message: %s", delivery.Body)
}
}
运行send.go
后可以在管理页面看到创建了一个队列。
运行receive.go
后可以看到打印出接收到的消息。程序并没有结束运行,因为deliverieCh
并没有关闭,所以consumer进程会一直阻塞等待下一条消息的到来。
2024/07/24 15:16:48 Received a message: Hello World!
回到管理页面可以看到该消息队列多了一个consumer
按下Ctrl+C结束consumer进程,可以看到该队列下的consumer没有了(就不截图了)。
此后可以测试启动receive.go
并反复执行send.go
(可以把声明队列注释掉),都是可以收到消息的。
【RabbitMQ】Go实现
https://sheep-in-box.github.io/2024/07/24/【RabbitMQ】Go实现/