# 同步编程和异步编程

异步化其实就是异步编程,相对的就是同步编程,这两种是不同的编程模型,它们的核心区别在于任务的执行方式和处理顺序。以下是它们的主要区别:

  • 同步编程(Synchronous Programming)
    • 任务按顺序执行。一个任务必须完成后,才能执行下一个任务。
    • 如果某个任务耗时较长,比如从网络获取数据,整个程序会等待该任务完成,导致其他任务处于阻塞状态。
  • 异步编程(Asynchronous Programming)
    • 任务可以并发执行,且不需要等待前一个任务完成。
    • 异步任务可以在等待(比如I/O操作)时,先让其他任务执行,提高程序的响应速度和效率。

同步编程的优势在于简单易用,但在处理复杂并发任务时性能较差。异步编程虽然复杂度更高,但能更好地处理I/O密集型任务,提高系统的性能和响应速度。

# 异步编程思想

异步编程(Asynchronous Programming)广泛应用于Web服务器、数据处理、GUI应用程序等领域。例如,Node.js通过异步I/O机制处理大量并发请求,从而提高了服务器的吞吐量。

这种技术旨在实现程序在执行耗时操作(如I/O操作、网络请求、数据库查询等)时不阻塞主线程或主流程。它通过将耗时任务的执行与主程序流程分离,使得程序能够继续处理其他任务,直到耗时操作完成后,再执行相应的回调或处理。

而在同步编程中,程序需要等待某一任务完成后才能继续执行后续代码,这往往导致程序的阻塞,特别是遇到需要等待响应的操作时。而异步编程则避免了这种等待,通过调度机制在任务完成后回调处理。

例如下图是同步编程和异步编程的请求顺序,两种不同的技术直接就导致下面两个请求响应耗时的差别,明显可以看到异步编程方式响应速度更快,因为没有阻塞等待结果。

Python Synchronous Asynchronous Web Model Synchronous vs. Asynchronous Programming: What's the Difference? - BuildFire

以下是异步化技术的几个关键点:

非阻塞操作:传统的同步操作需要程序等待某一任务完成后才能执行下一步操作,而异步操作则通过非阻塞的方式执行,程序不会被卡住,能够并行处理多个任务。

回调机制:回调机制是最早期且广泛使用的异步编程技术之一。在使用异步操作时,程序不直接返回结果,而是传递一个回调函数。异步操作完成后,系统会自动调用该回调函数并将结果传递给它。

消息队列:消息队列(Message Queue, MQ)是一种基于异步消息传递的机制,常用于实现异步化和解耦服务。它通过将消息放入队列中,发送者和接收者可以异步通信,从而实现任务的并发处理。在基于消息队列实现异步化时,生产者可以在不等待消费者处理消息的情况下继续执行其他任务,消费者可以独立于生产者的时间点从队列中读取消息并进行处理。

  • 消息发送者(生产者):服务A将需要处理的任务封装为消息,放入队列中,立即返回并继续其他任务。
  • 消息消费者(消费者):服务B从队列中读取消息并处理。由于服务B和服务A是松耦合的,服务A不需要等待B的处理完成。

事件驱动机制:事件驱动是一种基于回调的异步模式,通过事件循环来调度任务。Node.js中的事件驱动机制是异步I/O操作的典型代表,所有I/O操作都是异步的,事件循环负责调度任务。事件循环是一个无限循环,监听事件并分发处理。主线程可以继续处理其他任务,当事件发生时,相关的回调函数会被调度执行。

多线程与多进程:异步化技术在某些情况下通过多线程或多进程实现,使得不同的任务能够并行运行,最大化利用多核处理器的计算能力。与异步I/O不同的是,多线程和多进程允许多个任务真正同时运行。

# 基于消息队列实现异步化

# 实现原理

消息队列通过以下两个角色进行异步通信:

  • 生产者(Producer):负责将消息发布到消息队列中。生产者将需要处理的任务封装为消息,放入队列后即可继续执行其他操作。
  • 消费者(Consumer):从消息队列中获取消息并处理。消费者可以是多个,能够并发处理队列中的消息,处理完毕后继续监听队列。

# 实现流程

(1)生产者发送消息

在异步系统中,生产者执行任务时,将任务相关信息封装为消息并发送到消息队列。此时生产者的主要任务已经完成,可以继续其他工作,而不需要等待任务结果。

(2)消息队列存储消息

消息队列接收生产者发送的消息,并将其放入一个先进先出(FIFO)队列中。消息会按照发送的顺序存储,等待消费者处理。

(3)消费者处理消息

消费者是独立的进程或服务,它从消息队列中拉取消息进行处理。消费者可以是多个,允许并发处理多个任务。在完成消息处理后,消费者可以选择确认处理结果(如ACK机制),以告知消息队列该消息已经成功处理,也可以通过另一个消息队列或持久化存储返回。

# 代码实现举例

使用Golang和Redis来实现消息队列,通常会利用Redis的LIST数据结构。Redis提供了LPUSHBRPOP命令,分别用于将消息放入队列和从队列中获取消息。通过这种方式,可以轻松实现基于消息队列的异步化处理。

在下面的示例中,我们将使用Redis作为消息队列的基础,通过一个生产者将任务发送到队列中,多个消费者从队列中异步消费消息并处理任务。

# 环境准备

  1. 安装Golang Redis客户端库:go-redis (opens new window)
  2. 确保已安装并运行了Redis服务。

使用以下命令安装go-redis库:

go get github.com/go-redis/redis/v8

# 代码示例

假设我们要模拟一个订单处理系统,订单服务将订单任务异步发送到Redis队列中,而多个消费者会从队列中获取订单并处理。

# 1. 生产者代码:将订单消息放入Redis队列

生产者的任务是将订单消息放入Redis的队列中。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "log"
    "time"
)

// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
    Addr: "localhost:6379", // Redis地址
    DB:   0,                // Redis数据库
})

func main() {
    // 模拟发送订单任务到Redis队列中
    for i := 1; i <= 5; i++ {
        orderID := fmt.Sprintf("order_%d", i)
        err := sendOrderToQueue(orderID)
        if err != nil {
            log.Printf("Failed to send order: %v\n", err)
        } else {
            log.Printf("Order %s has been sent to the queue.\n", orderID)
        }
        time.Sleep(1 * time.Second) // 模拟每秒发送一笔订单
    }
}

// 将订单消息推送到Redis的任务队列
func sendOrderToQueue(orderID string) error {
    return rdb.LPush(ctx, "order_queue", orderID).Err()
}
# 2. 消费者代码:从Redis队列中异步获取并处理消息

消费者从Redis队列中获取订单消息并进行处理。我们使用BRPOP命令来阻塞地等待消息,一旦有消息进入队列,就开始处理。

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "log"
    "time"
)

// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
    Addr: "localhost:6379", // Redis地址
    DB:   0,                // Redis数据库
})

func main() {
    // 启动两个消费者,模拟并发处理任务
    go processOrders("Consumer_1")
    go processOrders("Consumer_2")

    // 防止主协程退出
    select {}
}

// 消费者从队列中异步处理订单
func processOrders(consumerName string) {
    for {
        // 从Redis队列中阻塞式地获取订单消息
        order, err := rdb.BRPop(ctx, 0*time.Second, "order_queue").Result()
        if err != nil {
            log.Printf("%s failed to get order: %v\n", consumerName, err)
            continue
        }

        // 模拟订单处理
        orderID := order[1] // BRPop返回的结果是一个包含队列名和消息的切片
        log.Printf("%s processing order: %s\n", consumerName, orderID)
        time.Sleep(2 * time.Second) // 模拟订单处理时间
        log.Printf("%s finished processing order: %s\n", consumerName, orderID)
    }
}
# 3. 运行示例
  1. 启动Redis服务器。

  2. 运行生产者程序,将订单任务发送到Redis队列:

    go run producer.go
    
  3. 运行消费者程序,消费者会异步从Redis队列中获取任务并处理:

    go run consumer.go
    

# 代码解析

生产者(Producer)

  • LPUSH:将订单消息推送到Redis的order_queue中。Redis的LPUSH命令将消息推送到列表的左侧,类似于在队列的头部插入消息。
  • 生产者通过每隔1秒发送一个订单的方式,模拟异步发送任务的场景。

消费者(Consumer)

  • BRPOP:消费者使用Redis的BRPOP命令从队列的右侧阻塞式地弹出消息。BRPOP会阻塞等待,直到有新消息进入队列,保证消费者不会忙轮询等待消息。
  • 每个消费者处理完一个订单后,会继续等待下一个订单消息,模拟了异步任务处理的并发场景。
  • 消费者是通过go关键字并发运行的,这意味着可以启动多个消费者来从同一个队列中并发处理消息。

并发处理

  • 消费者Consumer_1Consumer_2分别是两个独立的Go协程(Goroutine),它们从同一个Redis队列order_queue中获取订单消息。由于Redis队列的特性,消息只会被一个消费者获取并处理,从而实现了任务的并发分发和处理。

优势

  1. 异步任务处理:生产者将任务放入队列,消费者可以异步获取任务,双方不必同步等待。
  2. 松耦合:生产者和消费者通过队列解耦,彼此独立,生产者只负责将任务放入队列,消费者可以在不同的时间获取和处理任务。
  3. 高可扩展性:可以轻松通过增加更多消费者来提升任务处理的并发能力,保证系统在高并发场景下的性能。
  4. 可靠性:Redis的持久化选项可以确保即使在系统崩溃的情况下,队列中的任务不会丢失。当然这里也可以用Kafka等其他消息队列中间件。

# 工作流引擎中的异步化实现

如下图,每个触发器会创建execution实例。如果在触发时创建实例就立即存储到DB,可能会出现高频的上报数据导致数据库高负载。从而极大影响接口的性能,导致接口响应超时、并发度下降。

为了应付高频大量的调用场景,可以在触发器创建execution实例到落入数据库之间,通过增加消息队列进行存储,把任务异步化。后端通过worker集群进行消费,可以很好控制任务并发量,实现频率控制,降低DB的负载。

同时,针对不同的触发器类型,其生产的任务推送到不同的队列,避免出现类似api触发器大量任务阻碍了其他类型触发器的任务。

image-20231009094943277

由于execution的主键uid是通过snowflake算法生成的,可以不使用数据库的自增长主键ID作为uid,减少了db的操作。接口的响应速度可以大大提升。

image-20230927124803024