# 同步编程和异步编程
异步化其实就是异步编程,相对的就是同步编程,这两种是不同的编程模型,它们的核心区别在于任务的执行方式和处理顺序。以下是它们的主要区别:
- 同步编程(Synchronous Programming)
- 任务按顺序执行。一个任务必须完成后,才能执行下一个任务。
- 如果某个任务耗时较长,比如从网络获取数据,整个程序会等待该任务完成,导致其他任务处于阻塞状态。
- 异步编程(Asynchronous Programming)
- 任务可以并发执行,且不需要等待前一个任务完成。
- 异步任务可以在等待(比如I/O操作)时,先让其他任务执行,提高程序的响应速度和效率。
同步编程的优势在于简单易用,但在处理复杂并发任务时性能较差。异步编程虽然复杂度更高,但能更好地处理I/O密集型任务,提高系统的性能和响应速度。
# 异步编程思想
异步编程(Asynchronous Programming)广泛应用于Web服务器、数据处理、GUI应用程序等领域。例如,Node.js通过异步I/O机制处理大量并发请求,从而提高了服务器的吞吐量。
这种技术旨在实现程序在执行耗时操作(如I/O操作、网络请求、数据库查询等)时不阻塞主线程或主流程。它通过将耗时任务的执行与主程序流程分离,使得程序能够继续处理其他任务,直到耗时操作完成后,再执行相应的回调或处理。
而在同步编程中,程序需要等待某一任务完成后才能继续执行后续代码,这往往导致程序的阻塞,特别是遇到需要等待响应的操作时。而异步编程则避免了这种等待,通过调度机制在任务完成后回调处理。
例如下图是同步编程和异步编程的请求顺序,两种不同的技术直接就导致下面两个请求响应耗时的差别,明显可以看到异步编程方式响应速度更快,因为没有阻塞等待结果。
以下是异步化技术的几个关键点:
非阻塞操作:传统的同步操作需要程序等待某一任务完成后才能执行下一步操作,而异步操作则通过非阻塞的方式执行,程序不会被卡住,能够并行处理多个任务。
回调机制:回调机制是最早期且广泛使用的异步编程技术之一。在使用异步操作时,程序不直接返回结果,而是传递一个回调函数。异步操作完成后,系统会自动调用该回调函数并将结果传递给它。
消息队列:消息队列(Message Queue, MQ)是一种基于异步消息传递的机制,常用于实现异步化和解耦服务。它通过将消息放入队列中,发送者和接收者可以异步通信,从而实现任务的并发处理。在基于消息队列实现异步化时,生产者可以在不等待消费者处理消息的情况下继续执行其他任务,消费者可以独立于生产者的时间点从队列中读取消息并进行处理。
- 消息发送者(生产者):服务A将需要处理的任务封装为消息,放入队列中,立即返回并继续其他任务。
- 消息消费者(消费者):服务B从队列中读取消息并处理。由于服务B和服务A是松耦合的,服务A不需要等待B的处理完成。
事件驱动机制:事件驱动是一种基于回调的异步模式,通过事件循环来调度任务。Node.js中的事件驱动机制是异步I/O操作的典型代表,所有I/O操作都是异步的,事件循环负责调度任务。事件循环是一个无限循环,监听事件并分发处理。主线程可以继续处理其他任务,当事件发生时,相关的回调函数会被调度执行。
多线程与多进程:异步化技术在某些情况下通过多线程或多进程实现,使得不同的任务能够并行运行,最大化利用多核处理器的计算能力。与异步I/O不同的是,多线程和多进程允许多个任务真正同时运行。
# 基于消息队列实现异步化
# 实现原理
消息队列通过以下两个角色进行异步通信:
- 生产者(Producer):负责将消息发布到消息队列中。生产者将需要处理的任务封装为消息,放入队列后即可继续执行其他操作。
- 消费者(Consumer):从消息队列中获取消息并处理。消费者可以是多个,能够并发处理队列中的消息,处理完毕后继续监听队列。
# 实现流程
(1)生产者发送消息
在异步系统中,生产者执行任务时,将任务相关信息封装为消息并发送到消息队列。此时生产者的主要任务已经完成,可以继续其他工作,而不需要等待任务结果。
(2)消息队列存储消息
消息队列接收生产者发送的消息,并将其放入一个先进先出(FIFO)队列中。消息会按照发送的顺序存储,等待消费者处理。
(3)消费者处理消息
消费者是独立的进程或服务,它从消息队列中拉取消息进行处理。消费者可以是多个,允许并发处理多个任务。在完成消息处理后,消费者可以选择确认处理结果(如ACK机制),以告知消息队列该消息已经成功处理,也可以通过另一个消息队列或持久化存储返回。
# 代码实现举例
使用Golang和Redis来实现消息队列,通常会利用Redis的LIST
数据结构。Redis提供了LPUSH
和BRPOP
命令,分别用于将消息放入队列和从队列中获取消息。通过这种方式,可以轻松实现基于消息队列的异步化处理。
在下面的示例中,我们将使用Redis作为消息队列的基础,通过一个生产者将任务发送到队列中,多个消费者从队列中异步消费消息并处理任务。
# 环境准备
- 安装Golang Redis客户端库:go-redis (opens new window)
- 确保已安装并运行了Redis服务。
使用以下命令安装go-redis
库:
go get github.com/go-redis/redis/v8
# 代码示例
假设我们要模拟一个订单处理系统,订单服务将订单任务异步发送到Redis队列中,而多个消费者会从队列中获取订单并处理。
# 1. 生产者代码:将订单消息放入Redis队列
生产者的任务是将订单消息放入Redis的队列中。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"log"
"time"
)
// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis地址
DB: 0, // Redis数据库
})
func main() {
// 模拟发送订单任务到Redis队列中
for i := 1; i <= 5; i++ {
orderID := fmt.Sprintf("order_%d", i)
err := sendOrderToQueue(orderID)
if err != nil {
log.Printf("Failed to send order: %v\n", err)
} else {
log.Printf("Order %s has been sent to the queue.\n", orderID)
}
time.Sleep(1 * time.Second) // 模拟每秒发送一笔订单
}
}
// 将订单消息推送到Redis的任务队列
func sendOrderToQueue(orderID string) error {
return rdb.LPush(ctx, "order_queue", orderID).Err()
}
# 2. 消费者代码:从Redis队列中异步获取并处理消息
消费者从Redis队列中获取订单消息并进行处理。我们使用BRPOP
命令来阻塞地等待消息,一旦有消息进入队列,就开始处理。
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"log"
"time"
)
// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis地址
DB: 0, // Redis数据库
})
func main() {
// 启动两个消费者,模拟并发处理任务
go processOrders("Consumer_1")
go processOrders("Consumer_2")
// 防止主协程退出
select {}
}
// 消费者从队列中异步处理订单
func processOrders(consumerName string) {
for {
// 从Redis队列中阻塞式地获取订单消息
order, err := rdb.BRPop(ctx, 0*time.Second, "order_queue").Result()
if err != nil {
log.Printf("%s failed to get order: %v\n", consumerName, err)
continue
}
// 模拟订单处理
orderID := order[1] // BRPop返回的结果是一个包含队列名和消息的切片
log.Printf("%s processing order: %s\n", consumerName, orderID)
time.Sleep(2 * time.Second) // 模拟订单处理时间
log.Printf("%s finished processing order: %s\n", consumerName, orderID)
}
}
# 3. 运行示例
启动Redis服务器。
运行生产者程序,将订单任务发送到Redis队列:
go run producer.go
运行消费者程序,消费者会异步从Redis队列中获取任务并处理:
go run consumer.go
# 代码解析
生产者(Producer)
LPUSH
:将订单消息推送到Redis的order_queue
中。Redis的LPUSH
命令将消息推送到列表的左侧,类似于在队列的头部插入消息。- 生产者通过每隔1秒发送一个订单的方式,模拟异步发送任务的场景。
消费者(Consumer)
BRPOP
:消费者使用Redis的BRPOP
命令从队列的右侧阻塞式地弹出消息。BRPOP
会阻塞等待,直到有新消息进入队列,保证消费者不会忙轮询等待消息。- 每个消费者处理完一个订单后,会继续等待下一个订单消息,模拟了异步任务处理的并发场景。
- 消费者是通过
go
关键字并发运行的,这意味着可以启动多个消费者来从同一个队列中并发处理消息。
并发处理
- 消费者
Consumer_1
和Consumer_2
分别是两个独立的Go协程(Goroutine),它们从同一个Redis队列order_queue
中获取订单消息。由于Redis队列的特性,消息只会被一个消费者获取并处理,从而实现了任务的并发分发和处理。
优势
- 异步任务处理:生产者将任务放入队列,消费者可以异步获取任务,双方不必同步等待。
- 松耦合:生产者和消费者通过队列解耦,彼此独立,生产者只负责将任务放入队列,消费者可以在不同的时间获取和处理任务。
- 高可扩展性:可以轻松通过增加更多消费者来提升任务处理的并发能力,保证系统在高并发场景下的性能。
- 可靠性:Redis的持久化选项可以确保即使在系统崩溃的情况下,队列中的任务不会丢失。当然这里也可以用Kafka等其他消息队列中间件。
# 工作流引擎中的异步化实现
如下图,每个触发器会创建execution实例。如果在触发时创建实例就立即存储到DB,可能会出现高频的上报数据导致数据库高负载。从而极大影响接口的性能,导致接口响应超时、并发度下降。
为了应付高频大量的调用场景,可以在触发器创建execution实例到落入数据库之间,通过增加消息队列进行存储,把任务异步化。后端通过worker集群进行消费,可以很好控制任务并发量,实现频率控制,降低DB的负载。
同时,针对不同的触发器类型,其生产的任务推送到不同的队列,避免出现类似api触发器大量任务阻碍了其他类型触发器的任务。
由于execution的主键uid是通过snowflake算法生成的,可以不使用数据库的自增长主键ID作为uid,减少了db的操作。接口的响应速度可以大大提升。