【RabbitMQ】Go实现

RabbitMQ是一个消息代理,底层为队列,支持AMQP(高级消息队列协议)。

消息队列的作用

  • 应用解耦:生产者和消费者之间通过消息传递进行通信,而非直接调用。
  • 异步通信:消息的发送方可以在不等待响应的情况下继续执行,消息会被存储在队列中等待接收者处理,提高了系统的吞吐量。
  • 削峰填谷:在遇到瞬时的高流量情况下,消息队列作为缓冲区,防止数据过载。

消息队列的缺点

  • 降低系统的可用性:系统引入外部依赖越多,越容易挂掉。
  • 系统复杂度提高:需要面对各种问题,比如保证消息没有被重复消费、处理信息丢失的情况、保证消息传递的顺序性。
  • 一致性问题:多个系统处理消息可能成功可能不成功,造成数据不一致。

Go使用RabbitMQ

首先,安装rabbitmq

choco install rabbitmq

安装后,会自动启动服务,默认运行在本地5672端口,管理界面为localhost:15672,默认用户和密码均为guest

接下来用go编写两段程序send.goreceive.go模拟消息发送方和接收方。

send.go:

package main

import (
	"context"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// 连接到RabbitMQ服务器(broker)
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("connection.open: %s", err)
	}
	defer conn.Close()
	// 创建channel(amqp.Channel)
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("channel.open: %s", err)
	}
	defer ch.Close()

	// 声明一个队列
	ch.QueueDeclare(
		"hello", // 队列名称,若该队列不存在则创建
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)

	// 发送消息到队列,超时时间设为2秒
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel() // 带超时context的固定写法
	msg := "Hello World!"
	ch.PublishWithContext(
		ctx,
		"",      // exchange
		"hello", // exchange为空时,key为队列名称
		false,   // mandatory
		false,   // immediate
		amqp.Publishing{ // 消息内容
			ContentType: "text/plain", // MIME content type
			Body:        []byte(msg),
		},
	)
}

receive.go

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// 连接到RabbitMQ服务器(broker)
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("connection.open: %s", err)
	}
	defer conn.Close()
	// 创建channel(amqp.Channel)
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("channel.open: %s", err)
	}
	defer ch.Close()

	// 声明一个队列,这里不需要声明队列,因为send.go已经声明过了
	// 如果声明队列参数不一致,会报错
	// ch.QueueDeclare(
	// 	"hello", // 队列名称,若该队列不存在则创建
	// 	false,   // durable
	// 	false,   // delete when unused
	// 	false,   // exclusive
	// 	false,   // no-wait
	// 	nil,     // arguments
	// )

	// 接收消息,返回go语言原生channel(只读)
	deliverieCh, err := ch.Consume(
		"hello", // queue
		"",      // consumer
		true,    // auto-ack
		false,   // exclusive
		false,   // no-local
		false,   // no-wait
		nil,     // args
	)
	if err != nil {
		log.Fatalf("basic.consume: %s", err)
	}

	// 从deliverieCh中读取消息
	for delivery := range deliverieCh {
		log.Printf("Received a message: %s", delivery.Body)
	}
}

运行send.go后可以在管理页面看到创建了一个队列。

运行receive.go后可以看到打印出接收到的消息。程序并没有结束运行,因为deliverieCh并没有关闭,所以consumer进程会一直阻塞等待下一条消息的到来。

2024/07/24 15:16:48 Received a message: Hello World!

回到管理页面可以看到该消息队列多了一个consumer

按下Ctrl+C结束consumer进程,可以看到该队列下的consumer没有了(就不截图了)。

此后可以测试启动receive.go并反复执行send.go(可以把声明队列注释掉),都是可以收到消息的。


【RabbitMQ】Go实现
https://sheep-in-box.github.io/2024/07/24/【RabbitMQ】Go实现/
作者
Sheep
发布于
2024年7月24日
许可协议