typescript?实现RabbitMQ死信队列和延迟队列(订单10分钟未付归还库存)的过程

 更新时间:2024年03月29日 10:35:10   作者:熊明才  
RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead?Letter?Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况,本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码,需要的朋友可以参考下
(福利推荐:【腾讯云】服务器最新限时优惠活动,云服务器1核2G仅99元/年、2核4G仅768元/3年,立即抢购>>>:9i0i.cn/qcloud

(福利推荐:你还在原价购买阿里云服务器?现在阿里云0.8折限时抢购活动来啦!4核8G企业云服务器仅2998元/3年,立即抢购>>>:9i0i.cn/aliyun

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

1.添加用户

rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

2.分配权限

rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

3.可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';
// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');
// 创建Channel
const channel = await connection.createChannel();
// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {
  // 设置死信交换机
  deadLetterExchange: 'my_dead_letter_exchange'
});
// 消费消息
channel.consume(queueName, (msg) => {
  // 处理消息
  if (msg) {
    // 处理失败,拒绝消息并将其重新放回队列
    // channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列
    // 处理失败,拒绝消息
    channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列
     // or 处理失败,拒绝消息并将其重新放回死信队列
    channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息
  }
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {
  expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {
  maxLength: 100, // 设置最大队列长度为100
  deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

npm install amqplib --save
npm install @types/amqplib --save-dev

总结

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=password
rabbitmqadmin list exchanges

查看 RabbitMQ 服务器信息

rabbitmqadmin status

列出所有交换机

rabbitmqadmin list exchanges

列出所有队列

rabbitmqadmin list queues

创建一个交换机

rabbitmqadmin declare exchange name=my_exchange type=direct

创建一个队列

rabbitmqadmin declare queue name=my_queue

绑定队列到交换机

rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key

发送消息到指定交换机

rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"

获取队列消息

rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    const exchange = 'routing_exchange';
    // 定义 dlx-exchange
    const dlxExchangeName = 'dlx-exchange';
    // 声明交换机
    await channel.assertExchange(exchange, 'direct', {durable: true});
    await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失
    const dlxqueueBindings= [
        {
            dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',
        },
        {
            dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'
        }
    ];
    for (const binding of dlxqueueBindings) {
        // 绑定延迟死信队列
        await channel.assertQueue(binding.dlxQueueName );
        //死信交换机和死信队列绑定 Routing key fast 的消息
        await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机
    }
    // 定义队列和路由键的映射
    const queueBindings = [
        {
            queue: '3_second_queue', routingKey: 'fast', arguments: {
                'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称
            }
        },
        {
            queue: '8_second_queue', routingKey: 'slow', arguments: {
                'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称
            }
        }
    ];
    // 声明队列,并将队列绑定到交换机上
    for (const binding of queueBindings) {
        await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});
        await channel.bindQueue(binding.queue, exchange, binding.routingKey);
    }
    for (let i = 0; i < 10; i++) {
        await new Promise((resolve) => {
            setTimeout(() => {
                resolve(1)
            }, 1000)
        })
        const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
        console.log('当前中国时间:', chinaTime);
        // 发送消息到交换机,并设置不同的路由键
        await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);
        await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);
    }
    // 关闭连接
    setTimeout(async () => {
        await channel.close();
        await connection.close();
    }, 10000); // 在 10 秒后关闭连接
}
async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {
    channel.publish(exchange, routingKey, Buffer.from(message));
    console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}
setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    let queue = 'dlx-3_second_queue'
    // 定义队列和路由键的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 确认消息已被处理
        }
    });
}
setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    let queue = 'dlx-8_second_queue'
    // 定义队列和路由键的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 确认消息已被处理
        }
    });
}
setupRouting().catch(console.error);

到此这篇关于typescript 实现RabbitMQ死信队列和延迟队列(订单10分钟未付归还库存)的文章就介绍到这了,更多相关RabbitMQ死信队列和延迟队列内容请搜索程序员之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持程序员之家!

相关文章

  • 关于爬虫和反爬虫的简略方案分享

    关于爬虫和反爬虫的简略方案分享

    这篇文章主要给大家介绍了一些关于爬虫和反爬虫的简略方案的相关资料,文中介绍的非常详细,对大家理解和学习爬虫与反爬虫具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2018-01-01
  • 联邦学习FedAvg中模型聚合过程的理解分析

    联邦学习FedAvg中模型聚合过程的理解分析

    这篇文章主要为大家介绍了FedAvg中模型聚合过程的理解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • git?push?origin?HEAD:refs/for/master?的意思分析

    git?push?origin?HEAD:refs/for/master?的意思分析

    这篇文章主要介绍了git?push?origin?HEAD:refs/for/master?的意思,补充介绍了git?提交代码常用命令,本文给大家介绍的非常详细,需要的朋友可以参考下
    2023-01-01
  • ChatGPT 中文调教指南总结

    ChatGPT 中文调教指南总结

    ChatGPT是一个训练有素的大型语言模型,可以帮助你回答各种问题,本文介绍了ChatGPT 中文调教指南,感兴趣的可以了解一下
    2023-05-05
  • mathtype的下载与使用技巧超详细教程

    mathtype的下载与使用技巧超详细教程

    这篇文章主要介绍了mathtype的下载与使用超详细教程,包括mathtype使用技巧常用快捷键,本文给大家讲解的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-01-01
  • 基于Jupyter notebook搭建Spark集群开发环境的详细过程

    基于Jupyter notebook搭建Spark集群开发环境的详细过程

    Jupyter Notebook是一个开源并且使用很广泛项目,本文介绍如何基于Jupyter notebook搭建Spark集群开发环境,通过实例截图相结合给大家介绍的非常详细,需要的朋友参考下吧
    2021-10-10
  • git版本库介绍及本地创建的三种场景方式

    git版本库介绍及本地创建的三种场景方式

    这篇文章主要为大家介绍了git版本库以及本地创建的三种场景方式图文教程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-04-04
  • 基于Python和Java实现单词计数(Word Count)

    基于Python和Java实现单词计数(Word Count)

    Spark框架也是MapReduce-like模型,采用“分治-聚合”策略来对数据分布进行分布并行处理,本文就来利用Spark实现单词统计的功能,需要的可以参考一下
    2023-05-05
  • 数据分析2020年全国各省高考成绩分布情况

    数据分析2020年全国各省高考成绩分布情况

    这篇文章主要介绍了数据分析2020年全国各省高考成绩分布情况,顺便可以用这个数据看每个省市的一本线划分比率,还有其他相关的数据,需要的朋友可以参考下
    2020-07-07
  • Viso 2019 下载与激活方法

    Viso 2019 下载与激活方法

    Visio 是一款专门绘制流程示意图工具,由于很多刚入职的小白没有安装过visio,今天小编抽空给大家分享下Viso 2019 下载与激活方法,感兴趣的朋友一起看看吧
    2023-02-02

最新评论

?


http://www.vxiaotou.com