# 事件驱动架构
事件驱动机制的核心思想是,当某个特定的事件发生时,会触发相应的处理程序(事件处理器)来响应这个事件。这样的设计模式使得应用程序更加灵活、可扩展和易于维护。
事件驱动机制的主要组成部分包括:
- 事件源(Event Source):事件源是产生事件的对象或组件,例如按钮、定时器或者网络连接等。
- 事件(Event):事件可以由各种来源触发,如人工操作、API调用、定时器等。
- 事件监听器(Event Listener):事件监听器是一个接口或者守护进程服务,它定义了接收那些事件。当事件发生时,事件监听器负责将事件传递给相应的事件处理器。
- 事件处理器(Event Handler):事件处理器是用来处理特定事件的程序或方法。这里是调用流程引擎来执行任务。
事件驱动架构如下图所示:
它是基于生产者和消费者的模式实现,在工程实现上,可以基于Redis消息队列或者Kafka实现。
- 生产者(Producers):产生事件并将其传递给消息队列或主题。
- 消费者(Consumers):从消息队列或主题中获取事件并处理它们。
- 消息队列/主题(消息队列/Topic):用于存储和传递事件的中间件,可以基于Redis、Kafka等中间件。
# 基于Redis的轻量级综合实践
对于轻量级的系统,例如每日任务量在百万级别以内的,可以直接基于Redis的消息队列实现,如下是代码示例。
生产者
无限循环,每次循环将一个格式化的消息推送到名为 "queue" 的 Redis 列表中,模拟事件产生的过程。
func producer(rdb *redis.Client) {
for i := 0; ; i++ {
err := rdb.LPush(ctx, "queue", fmt.Sprintf("message-%d", i)).Err()
if err != nil {
fmt.Println("producer error:", err)
} else {
fmt.Println("produced message:", i)
}
time.Sleep(1 * time.Second)
}
}
消费者
无限循环,每次循环从名为 "queue" 的 Redis 列表中阻塞获取一条消息,模拟消费事件的过程。
func consumer(rdb *redis.Client) {
for {
result, err := rdb.BRPop(ctx, 0*time.Second, "queue").Result()
if err != nil {
fmt.Println("consumer error:", err)
} else {
fmt.Println("consumed message:", result[1])
}
}
}
主函数
main.go
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "test:test",
DB: 0,
})
err := rdb.Ping(ctx).Err()
if err != nil {
panic(err)
}
go producer(rdb)
go consumer(rdb)
select {}
}
在代码中,我们首先创建了一个 Redis 客户端,启动两个 goroutine 分别执行 producer 和 consumer 函数。使用 select {} 保持主 goroutine 永远运行,以保持程序运行。
要运行此代码,请确保你已经在本地安装了 Redis 并运行在默认端口 6379 上。然后,使用以下命令运行代码:
go run main.go
你应该会看到类似如下的输出:
produced message: 0
consumed message: message-0
produced message: 1
consumed message: message-1
produced message: 2
consumed message: message-2
produced message: 3
consumed message: message-3
这个例子仅用于演示如何使用 Go 和 Redis 实现一个简单的生产者和消费者消息队列。在实际应用中,你可能需要考虑更多的异常处理、并发控制和优雅地关闭程序等问题。
# 基于Kafka的高性能综合实践
如果业务系统的任务量非常大,每日在百万级别以上,这时候需要考虑使用Kafka消息引擎。
Kafka主要用于构建实时数据流处理应用程序,以满足高吞吐量、低延迟和高可用性等需求。
Kafka具有以下主要特点:
- 高吞吐量:Kafka可以处理大量的写入和读取操作,每秒可以处理数百万次操作。这使得它非常适合处理大量实时数据。
- 分布式:Kafka可以在多个服务器上部署,以实现负载均衡和容错。这意味着,即使某个服务器出现故障,Kafka仍然可以继续运行,不会影响到整个系统的性能。
- 持久性:Kafka将数据存储在磁盘上,这意味着即使系统崩溃,数据也不会丢失。此外,Kafka还支持数据备份,以进一步确保数据的安全性。
- 实时处理:Kafka可以实时处理数据流,这使得它非常适用于实时分析和监控系统。
- 可扩展性:Kafka可以轻松地扩展到更多的服务器和集群,以满足不断增长的数据处理需求。
Kafka的主要组件包括:
Producer
生产者是负责将数据发送到Kafka的应用程序或服务。它们将数据发布到一个或多个Kafka主题。
Broker
代理是Kafka集群中的服务器,它们负责接收来自生产者的消息并将其存储在磁盘上。代理还向消费者提供消息。
Topic
主题是Kafka中的消息分类,用于将消息分组。生产者将消息发送到特定主题,而消费者从特定主题订阅消息。
Partition
分区是Kafka主题的子集,用于将数据分布在多个服务器上,以实现负载均衡和容错。
Consumer
消费者是订阅Kafka主题并处理消息的应用程序或服务。消费者可以是分布式的,以便并行处理大量消息。
Consumer Group:
Consumer Group(消费者组)是允许一个或多个Kafka消费者共享一个公共ID,以协同处理数据。
当多个消费者共享同一个消费者组ID时,Kafka会将主题的每个分区的数据分发给消费者组中的一个消费者。这样,每条消息只会被消费者组中的一个消费者处理,从而实现负载均衡和并行处理。
如果消费者组中的一个消费者发生故障,Kafka会自动将其处理的分区重新分配给组中的其他消费者,这样可以保证数据的持续处理,提高系统的可用性。
同时,消费者组也支持消费者的动态扩展和缩减。当需要处理更多数据时,可以增加消费者组中的消费者数量,Kafka会自动重新分配分区;当消费者过多时,可以减少消费者数量,Kafka同样会自动进行分区的重新分配。
在实现上,我们可以根据事件的类型,例如是start、同步api或异步api或定时器等进行区分不同的主题。可以将任务数据流分散,避免集中堆积在一起。
创建Topic
const Host = "10.64.119.59"
func CreateTopic(topics []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true
brokers := []string{Host + ":9092"}
admin, err := sarama.NewClusterAdmin(brokers, config)
if err != nil {
log.Fatalf("Failed to create cluster admin: %v", err)
}
defer func() {
if err := admin.Close(); err != nil {
log.Fatalf("Failed to close cluster admin: %v", err)
}
}()
for _, topic := range topics {
detail := &sarama.TopicDetail{
NumPartitions: 1, // 设置分区数量
ReplicationFactor: 1, // 设置副本因子
ConfigEntries: make(map[string]*string),
}
err = admin.CreateTopic(topic, detail, false)
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
fmt.Printf("Topic %s created successfully\n", topic)
}
}
消费者
// MyConsumerGroupHandler 实现 sarama.ConsumerGroup 接口,作为自定义ConsumerGroup
type MyConsumerGroupHandler struct {
name string
count int64
}
// Setup 执行在 获得新 session 后 的第一步, 在 ConsumeClaim() 之前
func (MyConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
// Cleanup 执行在 session 结束前, 当所有 ConsumeClaim goroutines 都退出时
func (MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
// ConsumeClaim 具体的消费逻辑
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
fmt.Printf("[consumer] name:%s topic:%q partition:%d offset:%d, value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, msg.Value)
// 标记消息已被消费 内部会更新 consumer offset
sess.MarkMessage(msg, "")
h.count++
if h.count%10000 == 0 {
fmt.Printf("name:%s 消费数:%v\n", h.name, h.count)
}
// todo
// 执行具体的业务
case <-sess.Context().Done():
return nil
}
}
}
func ConsumerGroup(topic, group, name string, wg *sync.WaitGroup) {
defer wg.Done()
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cg, err := sarama.NewConsumerGroup([]string{Host + ":9092"}, group, config)
if err != nil {
log.Fatal("NewConsumerGroup err: ", err)
}
defer cg.Close()
handler := MyConsumerGroupHandler{name: name}
for {
fmt.Println("running: ", name)
err = cg.Consume(ctx, []string{topic}, handler)
if err != nil {
log.Println("Consume err: ", err)
}
if ctx.Err() != nil {
return
}
}
}
生产者
type NodeVal struct {
TriggerTemplate string
Engine string
ExecutionId string
AppInstId string
Value string
}
func produceEvent(producer sarama.SyncProducer, nodeVal *NodeVal) {
topic := nodeVal.TriggerTemplate
value := nodeVal.Value
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(value),
}
_, _, err := producer.SendMessage(msg)
if err != nil {
log.Fatalln("Error send message", err)
}
fmt.Printf("[producer] topic:%s produce msg:%s\n", topic, value)
}
主函数
main.go
package main
import (
"context"
"fmt"
"github.com/IBM/sarama"
"log"
"sync"
)
const Host = "10.64.119.59"
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true
brokers := []string{Host + ":9092"}
topics := []string{"start", "sync-api", "async-api", "form-trigger", "crontab"}
// 创建topic
//CreateTopic(topics)
var client sarama.Client
var err error
client, err = sarama.NewClient(brokers, config)
if err != nil {
return
}
defer client.Close()
var producer sarama.SyncProducer
producer, err = sarama.NewSyncProducerFromClient(client)
if err != nil {
return
}
defer producer.Close()
// 模拟一个节点实例
nodeVal := NodeVal{
TriggerTemplate: "start",
Engine: "standard",
ExecutionId: "1541815603606036480",
AppInstId: "inst-0001",
Value: `{
"instId": "xxx",
"name": "start节点",
"description": "start周期任务",
"template": "start",
"positions": [],
"connections": [],
"parameters": {},
"errorHandler": {},
"loopHandler": {},
"timeout": 0,
"isIgnore": false,
"runtimes": []
}`,
}
go produceEvent(producer, &nodeVal)
var wg sync.WaitGroup
wg.Add(len(topics))
for _, topic := range topics {
group := topic
go ConsumerGroup(topic, group, fmt.Sprintf("%s#%s", group, Host), &wg)
}
wg.Wait()
}
运行测试代码:
go run main.go
生产者运行后,你会看到类似如下输出:
running: sync-api#10.64.119.59
running: async-api#10.64.119.59
running: start#10.64.119.59
running: form-trigger#10.64.119.59
running: crontab#10.64.119.59
[producer] topic:start produce msg:{
"instId": "xxx",
"name": "start节点",
"description": "start周期任务",
"template": "start",
"positions": [],
"connections": [],
"parameters": {},
"errorHandler": {},
"loopHandler": {},
"timeout": 0,
"isIgnore": false,
"runtimes": []
}
消费者运行后,你会看到类似如下输出:
[consumer] name:start#10.64.119.59 topic:"start" partition:0 offset:18, value:{
"instId": "xxx",
"name": "start节点",
"description": "start周期任务",
"template": "start",
"positions": [],
"connections": [],
"parameters": {},
"errorHandler": {},
"loopHandler": {},
"timeout": 0,
"isIgnore": false,
"runtimes": []
}