# 事件驱动架构

事件驱动机制的核心思想是,当某个特定的事件发生时,会触发相应的处理程序(事件处理器)来响应这个事件。这样的设计模式使得应用程序更加灵活、可扩展和易于维护。

事件驱动机制的主要组成部分包括:

  1. 事件源(Event Source):事件源是产生事件的对象或组件,例如按钮、定时器或者网络连接等。
  2. 事件(Event):事件可以由各种来源触发,如人工操作、API调用、定时器等。
  3. 事件监听器(Event Listener):事件监听器是一个接口或者守护进程服务,它定义了接收那些事件。当事件发生时,事件监听器负责将事件传递给相应的事件处理器。
  4. 事件处理器(Event Handler):事件处理器是用来处理特定事件的程序或方法。这里是调用流程引擎来执行任务。

事件驱动架构如下图所示:

它是基于生产者和消费者的模式实现,在工程实现上,可以基于Redis消息队列或者Kafka实现。

  • 生产者(Producers):产生事件并将其传递给消息队列或主题。
  • 消费者(Consumers):从消息队列或主题中获取事件并处理它们。
  • 消息队列/主题(消息队列/Topic):用于存储和传递事件的中间件,可以基于Redis、Kafka等中间件。
image-20240305193947258

# 基于Redis的轻量级综合实践

对于轻量级的系统,例如每日任务量在百万级别以内的,可以直接基于Redis的消息队列实现,如下是代码示例。

生产者

无限循环,每次循环将一个格式化的消息推送到名为 "queue" 的 Redis 列表中,模拟事件产生的过程。

func producer(rdb *redis.Client) {
	for i := 0; ; i++ {
		err := rdb.LPush(ctx, "queue", fmt.Sprintf("message-%d", i)).Err()
		if err != nil {
			fmt.Println("producer error:", err)
		} else {
			fmt.Println("produced message:", i)
		}
		time.Sleep(1 * time.Second)
	}
}

消费者

无限循环,每次循环从名为 "queue" 的 Redis 列表中阻塞获取一条消息,模拟消费事件的过程。

func consumer(rdb *redis.Client) {
	for {
		result, err := rdb.BRPop(ctx, 0*time.Second, "queue").Result()
		if err != nil {
			fmt.Println("consumer error:", err)
		} else {
			fmt.Println("consumed message:", result[1])
		}
	}
}

主函数

main.go

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"time"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "test:test",
		DB:       0,
	})

	err := rdb.Ping(ctx).Err()
	if err != nil {
		panic(err)
	}

	go producer(rdb)
	go consumer(rdb)

	select {}
}




在代码中,我们首先创建了一个 Redis 客户端,启动两个 goroutine 分别执行 producer 和 consumer 函数。使用 select {} 保持主 goroutine 永远运行,以保持程序运行。

要运行此代码,请确保你已经在本地安装了 Redis 并运行在默认端口 6379 上。然后,使用以下命令运行代码:

go run main.go

你应该会看到类似如下的输出:

produced message: 0
consumed message: message-0
produced message: 1
consumed message: message-1
produced message: 2
consumed message: message-2
produced message: 3
consumed message: message-3

这个例子仅用于演示如何使用 Go 和 Redis 实现一个简单的生产者和消费者消息队列。在实际应用中,你可能需要考虑更多的异常处理、并发控制和优雅地关闭程序等问题。

# 基于Kafka的高性能综合实践

如果业务系统的任务量非常大,每日在百万级别以上,这时候需要考虑使用Kafka消息引擎。

Kafka主要用于构建实时数据流处理应用程序,以满足高吞吐量、低延迟和高可用性等需求。

Kafka具有以下主要特点:

  1. 高吞吐量:Kafka可以处理大量的写入和读取操作,每秒可以处理数百万次操作。这使得它非常适合处理大量实时数据。
  2. 分布式:Kafka可以在多个服务器上部署,以实现负载均衡和容错。这意味着,即使某个服务器出现故障,Kafka仍然可以继续运行,不会影响到整个系统的性能。
  3. 持久性:Kafka将数据存储在磁盘上,这意味着即使系统崩溃,数据也不会丢失。此外,Kafka还支持数据备份,以进一步确保数据的安全性。
  4. 实时处理:Kafka可以实时处理数据流,这使得它非常适用于实时分析和监控系统。
  5. 可扩展性:Kafka可以轻松地扩展到更多的服务器和集群,以满足不断增长的数据处理需求。

Kafka的主要组件包括:

  • Producer

    生产者是负责将数据发送到Kafka的应用程序或服务。它们将数据发布到一个或多个Kafka主题。

  • Broker

    代理是Kafka集群中的服务器,它们负责接收来自生产者的消息并将其存储在磁盘上。代理还向消费者提供消息。

  • Topic

    主题是Kafka中的消息分类,用于将消息分组。生产者将消息发送到特定主题,而消费者从特定主题订阅消息。

  • Partition

    分区是Kafka主题的子集,用于将数据分布在多个服务器上,以实现负载均衡和容错。

  • Consumer

    消费者是订阅Kafka主题并处理消息的应用程序或服务。消费者可以是分布式的,以便并行处理大量消息。

  • Consumer Group:

    Consumer Group(消费者组)是允许一个或多个Kafka消费者共享一个公共ID,以协同处理数据。

    当多个消费者共享同一个消费者组ID时,Kafka会将主题的每个分区的数据分发给消费者组中的一个消费者。这样,每条消息只会被消费者组中的一个消费者处理,从而实现负载均衡和并行处理。

    如果消费者组中的一个消费者发生故障,Kafka会自动将其处理的分区重新分配给组中的其他消费者,这样可以保证数据的持续处理,提高系统的可用性。

    同时,消费者组也支持消费者的动态扩展和缩减。当需要处理更多数据时,可以增加消费者组中的消费者数量,Kafka会自动重新分配分区;当消费者过多时,可以减少消费者数量,Kafka同样会自动进行分区的重新分配。

image-20240305200413935

在实现上,我们可以根据事件的类型,例如是start、同步api或异步api或定时器等进行区分不同的主题。可以将任务数据流分散,避免集中堆积在一起。

image-20240305201031046

创建Topic

const Host = "10.64.119.59"


func CreateTopic(topics []string) {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Consumer.Return.Errors = true
	brokers := []string{Host + ":9092"}

	admin, err := sarama.NewClusterAdmin(brokers, config)
	if err != nil {
		log.Fatalf("Failed to create cluster admin: %v", err)
	}
	defer func() {
		if err := admin.Close(); err != nil {
			log.Fatalf("Failed to close cluster admin: %v", err)
		}
	}()

	for _, topic := range topics {

		detail := &sarama.TopicDetail{
			NumPartitions:     1, // 设置分区数量
			ReplicationFactor: 1, // 设置副本因子
			ConfigEntries:     make(map[string]*string),
		}
		err = admin.CreateTopic(topic, detail, false)
		if err != nil {
			log.Fatalf("Failed to create topic: %v", err)
		}

		fmt.Printf("Topic %s created successfully\n", topic)
	}

}

消费者

// MyConsumerGroupHandler 实现 sarama.ConsumerGroup 接口,作为自定义ConsumerGroup
type MyConsumerGroupHandler struct {
	name  string
	count int64
}

// Setup 执行在 获得新 session 后 的第一步, 在 ConsumeClaim() 之前
func (MyConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

// Cleanup 执行在 session 结束前, 当所有 ConsumeClaim goroutines 都退出时
func (MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim 具体的消费逻辑
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case msg, ok := <-claim.Messages():
			if !ok {
				return nil
			}
			fmt.Printf("[consumer] name:%s topic:%q partition:%d offset:%d, value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, msg.Value)
			// 标记消息已被消费 内部会更新 consumer offset
			sess.MarkMessage(msg, "")
			h.count++
			if h.count%10000 == 0 {
				fmt.Printf("name:%s 消费数:%v\n", h.name, h.count)
			}

			// todo
			// 执行具体的业务

		case <-sess.Context().Done():
			return nil
		}
	}
}

func ConsumerGroup(topic, group, name string, wg *sync.WaitGroup) {
	defer wg.Done()
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	cg, err := sarama.NewConsumerGroup([]string{Host + ":9092"}, group, config)
	if err != nil {
		log.Fatal("NewConsumerGroup err: ", err)
	}
	defer cg.Close()
	handler := MyConsumerGroupHandler{name: name}

	for {
		fmt.Println("running: ", name)

		err = cg.Consume(ctx, []string{topic}, handler)
		if err != nil {
			log.Println("Consume err: ", err)
		}

		if ctx.Err() != nil {
			return
		}
	}

}

生产者

type NodeVal struct {
	TriggerTemplate string
	Engine          string
	ExecutionId     string
	AppInstId       string
	Value           string
}

func produceEvent(producer sarama.SyncProducer, nodeVal *NodeVal) {
	topic := nodeVal.TriggerTemplate
	value := nodeVal.Value

	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(value),
	}

	_, _, err := producer.SendMessage(msg)
	if err != nil {
		log.Fatalln("Error send message", err)
	}

	fmt.Printf("[producer] topic:%s produce msg:%s\n", topic, value)
}

主函数

main.go

package main

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	"log"
	"sync"
)

const Host = "10.64.119.59"

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Consumer.Return.Errors = true
	brokers := []string{Host + ":9092"}
	topics := []string{"start", "sync-api", "async-api", "form-trigger", "crontab"}

	// 创建topic
	//CreateTopic(topics)

	var client sarama.Client
	var err error
	client, err = sarama.NewClient(brokers, config)
	if err != nil {
		return
	}
	defer client.Close()

	var producer sarama.SyncProducer
	producer, err = sarama.NewSyncProducerFromClient(client)
	if err != nil {
		return
	}
	defer producer.Close()

	// 模拟一个节点实例
	nodeVal := NodeVal{
		TriggerTemplate: "start",
		Engine:          "standard",
		ExecutionId:     "1541815603606036480",
		AppInstId:       "inst-0001",
		Value: `{
    "instId": "xxx",
    "name": "start节点",
    "description": "start周期任务",
    "template": "start",
    "positions": [],
    "connections": [],
    "parameters": {},
    "errorHandler": {},
    "loopHandler": {},
    "timeout": 0,
    "isIgnore": false,
    "runtimes": []
}`,
	}
	go produceEvent(producer, &nodeVal)

	var wg sync.WaitGroup
	wg.Add(len(topics))

	for _, topic := range topics {
		group := topic
		go ConsumerGroup(topic, group, fmt.Sprintf("%s#%s", group, Host), &wg)
	}

	wg.Wait()
}

运行测试代码:

go run main.go

生产者运行后,你会看到类似如下输出:

running:  sync-api#10.64.119.59
running:  async-api#10.64.119.59
running:  start#10.64.119.59
running:  form-trigger#10.64.119.59
running:  crontab#10.64.119.59
[producer] topic:start produce msg:{
    "instId": "xxx",
    "name": "start节点",
    "description": "start周期任务",
    "template": "start",
    "positions": [],
    "connections": [],
    "parameters": {},
    "errorHandler": {},
    "loopHandler": {},
    "timeout": 0,
    "isIgnore": false,
    "runtimes": []
}

消费者运行后,你会看到类似如下输出:

[consumer] name:start#10.64.119.59 topic:"start" partition:0 offset:18, value:{
    "instId": "xxx",
    "name": "start节点",
    "description": "start周期任务",
    "template": "start",
    "positions": [],
    "connections": [],
    "parameters": {},
    "errorHandler": {},
    "loopHandler": {},
    "timeout": 0,
    "isIgnore": false,
    "runtimes": []
}

最后更新: 9/22/2024, 10:43:02 PM