# 分布式锁介绍
分布式锁是一种控制在分布式系统环境下,多个节点对共享资源进行访问的一种同步机制。其主要目的是为了确保在分布式计算环境中,当多个节点并发访问共享资源时,只有一个节点能够成功,防止数据的不一致性。
分布式锁的基本原理主要是通过一种全局可见的方式(例如数据库、Redis、ETCD等)来创建一个锁,当一个节点获取到锁后,其他节点就无法再获取到锁,直到锁被释放。这样就可以保证在任何时刻,对共享资源的访问都是互斥的。
如下是分布式锁的几种实现方法:
- 基于缓存(如Redis)的分布式锁
使用缓存系统(如Redis)的原子操作(比如SETNX)来实现分布式锁。当一个进程需要访问共享资源时,尝试在缓存中设置一个键值对,表示获取到锁。如果设置成功,则表示获取到锁;如果设置失败,则表示锁已被占用,需要等待。为了防止死锁,可以设置键值对的过期时间。当持有锁的进程完成资源访问后,删除键值对,释放锁。
- 基于分布式协调服务(如etcd)的分布式锁
使用分布式协调服务(如etcd)的原子操作(比如Compare-and-Swap)来实现分布式锁。当一个进程需要访问共享资源时,尝试在etcd中设置一个键值对,表示获取到锁。如果设置成功,则表示获取到锁;如果设置失败,则表示锁已被占用,需要等待。为了防止死锁,可以设置键值对的过期时间。当持有锁的进程完成资源访问后,删除键值对,释放锁。
- 基于数据库的分布式锁
使用数据库表作为锁,当一个进程需要访问共享资源时,先向数据库插入一条记录,表示获取到锁。其他进程在访问资源之前也会尝试插入记录,如果插入失败则表示锁已被占用,需要等待。当持有锁的进程完成资源访问后,删除锁记录,释放锁。这种实现方式简单,但在高并发场景下可能会导致数据库压力过大。
# 流程引擎中的应用
这里提及分布式锁,是因为我们流程引擎中有一类Crontab周期任务,这需要在这个分布式系统中选举出一个主节点来进行Crontab任务的监控和分发,否则如果系统中每个节点都进行Crontab任务的分发,会导致任务重复下发运行。而选举出主节点,则需要用到分布式锁的技术。成为主节点以后,在自己生命周期内,还需要持续的去续约,保证在自己还存活的状态下,只有自己一个主节点进行Crontab任务的监控分发功能。当这个主节点出现异常不能提供服务时,则这个集群会通过选举再次选出一个新的主节点。
# 分布式锁和本地互斥锁的区别
分布式锁和互斥锁都是用于实现多个进程或线程之间对共享资源的互斥访问的同步机制,但它们的应用场景和实现方式有所不同。
应用场景:
分布式锁:主要应用于分布式系统中,用于解决多个节点之间对共享资源的互斥访问。
本地互斥锁:主要应用于单个系统、多线程环境中,用于解决多个线程之间对共享资源的互斥访问。
实现方式:
分布式锁:实现分布式锁需要借助一种全局可见的方式来创建和管理锁,例如使用数据库、Redis、Etcd等。分布式锁的实现需要考虑锁的性能、可靠性、安全性等因素。
本地互斥锁:互斥锁通常由操作系统或编程语言的库提供,它们的实现通常基于底层硬件原语(如原子操作等)或操作系统提供的同步原语(如信号量、临界区等),例如Java中的synchronized关键字、Python中的threading.Lock、Golang中的sync.Mutex等。。
性能和可扩展性:
分布式锁:由于分布式锁需要通过网络和外部系统(如数据库、Redis、ETCD等)来实现,因此它的性能相对较低。但分布式锁可以应用于多个节点之间的同步,具有较好的可扩展性。
本地互斥锁:互斥锁通常在本地内存中实现,因此性能较高。但互斥锁只能应用于单个系统、多线程环境,不适用于分布式系统中的同步。
# 分布式锁的几种实现方式
# 基于Redis实现分布式锁
下面的内容主要是基于Redis的方式来实现分布式锁,我会通过一个个的问题来循序渐进地引导。
一般加锁和解锁的流程如下:
Step1::获取锁:SETNX key value
Step2:执行其他操作
Step3:释放锁:DEL key
# setnx获取锁
在前面,我们提到分布式锁的其中一个特点就是要保证互斥性。
实际在Redis中,已经提供了一个SET
命令,通过参数组合的SETNX
这个命令是原子性的,可以保证互斥性(即只有一个客户端可以持有锁)。
SETNX
命令的意思是:如果key不存在则创建。
命令:
SET key value NX
# 如何处理死锁
在上面的内容,我们已经通过setnx
命令的互斥特性获得锁,但是这里存在一个问题:
如果在执行Step2的过程中,客户端出现crash,这时候会导致锁没有释放,从而出现死锁的问题,这个也是我们在第二部分内容中介绍分布式锁的特点---不会死锁,需要解决的其中一个问题。
为了保证不会发生死锁,在使用setnx
命令的时候给锁加一个过期时间即可。这样即使客户端在获取锁的时候发生crash,锁最终也会因为过期时间而释放。
命令:
set key value NX [EX seconds|PX milliseconds]
# 如何处理超时时,锁被其他客户端释放的问题
如下图所示,我们再考虑一种情况。如果一个客户端A在获取锁之后设置了超时时间,然后开始执行其他操作,但是这个操作很耗时从而超过了设定的超时时间。这时候锁被释放了,然后另一个客户端B获得了锁并进行其他操作。这时,第一个客户端A执行完以后,它还以为自己持有锁,就会进行解锁操作,实际上这时候它释放的是第二个客户端B持有的锁,最终导致了问题发生。
这里,我们分析可以发现有2个问题需要解决:
- 超时时间的设置
- 如何保证加锁和解锁是同一个客户端
(1)超时时间的设置
对于这个问题,一般有两种方式解决:
- 将过期时间设置的足够长,确保代码逻辑在锁释放之前能够执行完成,具体多长看自己的业务来定
- 为获取锁的线程增加一个守护线程,为将要过期但是未释放的锁增加有效时间
(2)如何保证加锁和解锁是同一个客户端
还是setnx
命令,我们每次加锁的时候给这个锁分配一个唯一的id,然后解锁的时候就释放这个id即可,这样就可以保证加锁和解锁都是同一个客户端。实现流程如下(伪代码):
# 加锁
uuid = UUID.RandomUUID().ToString()
set key uuid NX EX 60
# 业务逻辑
# 解锁
if redis.get(key) == uuid
return del key
else
return 0
在释放锁之前,我们查看锁的id是否是加锁时的id,如果不是,说明已经超时释放了,就不再进行解锁操作;否则就解锁。这样就实现了实现了加锁和解锁都是同一个客户端。
那么到这里是不是就完事了呢?还没!!请继续看下面的内容。
# 解锁操作(读+删除)如何保证原子性
在上一节里面,当进行解锁操作时,它是分两步执行的:
Step1:获取锁的id并判断是否是当前客户端设置的id
Step2:删除锁
这里面就有一个问题,判断id和删除锁的操作不是原子性的。如果客户端在执行Step1的时候,在删除锁之前发生了超时,后面删除锁释放的就是其他客户端获取的锁,这样也是没法保证加锁和解锁是同一个客户端的特性。
解决思路是保证Step1和Step2的操作在一个事务中进行,保证其原子性。
这里就可以用到lua脚本,我们把Step1和Step2的Get、判断、Del操作放在一个lua脚本中执行,如下:
$script = '
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
';
$token = uniqid(mt_rand(), true);
if ($redis->set('my:lock', $token, ['NX', 'EX' => 10])) {
# todo
$redis->eval($script, ["my:lock", $token], 1);
} else {
echo 'get lock failed!';
}
到这里,我们就把在Redis单机情况下的分布式锁给实现了。但是这里由于Redis是单机模式,如果机器挂了,就直接无法提供服务了。所以很多时候我们会搭建一个主从的架构,来提高Redis集群的容灾能力。Redis会把操作的指令记录在本地内存buffer中,然后将buffer中的指令同步到从节点。
# 看门狗模式
在上一步中,我们会发现还有一个问题,那就是 todo
里面的业务逻辑还没有执行完,但是锁已经过期释放了。简单粗暴的方法,可以把过期时间设置的长一点。当然我们还有另一个思路,就是可以给获得锁的线程,另外开一个定时守护线程,每隔10s中就去判断锁有没有释放,如果没有释放就对锁的过期时间延长(使用set key uniq_id ex timeout
,没有使用NX),当前开源的Redisson解决了这个问题。
到这里,我们给出完整的go实现代码:
package main
import (
"github.com/gomodule/redigo/redis"
"log"
"sync"
"time"
)
//====================================
// redis分布式锁+看门狗模式
//====================================
type RedisDistributedLock struct {
key string
token string
conn redis.Conn
timeout int // 秒
}
func NewRedisDistributedLock(key, token string, conn redis.Conn, timeout int) *RedisDistributedLock {
return &RedisDistributedLock{
key: key,
token: token,
conn: conn,
timeout: timeout,
}
}
func (lock *RedisDistributedLock) watchDog(interval time.Duration) {
defer func() {
log.Println("end watchDog:", lock.key, lock.token)
}()
log.Println("start watchDog:", lock.key, lock.token)
for {
// 对于未释放的锁,延长它的过期时间
time.Sleep(interval)
token, err := redis.String(lock.conn.Do("GET", lock.key))
if err != nil {
log.Println("GET", lock.key, "error:", err.Error())
return
}
// token已经释放(被其他线程获取锁并设置其他唯一token值)
if token != lock.token {
log.Println(lock.key, lock.token, "release lock")
return
}
// 延长过期时间
_, err = redis.String(lock.conn.Do("SET", lock.key, lock.token, "EX", lock.timeout))
if err != nil {
log.Println("SET", lock.key, lock.token, "EX", lock.timeout, "error:", err.Error())
return
} else {
log.Println("extend=>", "SET", lock.key, lock.token, "EX", lock.timeout)
}
}
}
func (lock *RedisDistributedLock) tryLock() (ok bool, err error) {
var result string
// set key value ex 10 nx,操作成功会返回"OK"字符串,失败返回nil
result, err = redis.String(lock.conn.Do("SET", lock.key, lock.token, "EX", lock.timeout, "NX"))
if result == "OK" {
go lock.watchDog(10 * time.Second)
return true, nil
} else {
return false, err
}
}
func (lock *RedisDistributedLock) Unlock() (err error) {
//====================================
//1、保证加锁和解锁是同一个线程(防止去解别的线程的锁)
//2、保证获取锁和删除锁是原子的
//====================================
code := `if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end`
lua := redis.NewScript(1, code)
cnt, err := redis.Int(lua.Do(lock.conn, lock.key, lock.token))
if err != nil || cnt == 0 {
log.Printf("Unlock key:%s, token:%s error:%v, cnt:%d\n", lock.key, lock.token, err, cnt)
} else {
log.Printf("Unlock key:%s, token:%s success, cnt:%d\n", lock.key, lock.token, cnt)
}
return
}
func main() {
redisAddress := "127.0.0.1:6379"
RedisPasswd := "test:test"
RdsPool := &redis.Pool{
MaxIdle: 300,
MaxActive: 0, //设置MaxActive,设MaxActive=0(表示无限大)或者足够大
IdleTimeout: 10 * time.Second,
Wait: true, //设置Wait=true,当程序执行get(),无法获得可用连接时,将会暂时阻塞
Dial: func() (conn redis.Conn, e error) {
conn, err := redis.Dial("tcp", redisAddress)
if err != nil {
return nil, err
}
_, err = conn.Do("AUTH", RedisPasswd)
if err != nil {
conn.Close()
return nil, err
}
return conn, err
},
}
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(wg *sync.WaitGroup, idx int) {
log.Printf("[%d]start...\n", idx)
defer wg.Done()
// redis连接
conn := RdsPool.Get()
defer conn.Close()
// 创建分布式锁
v := NewRedisDistributedLock("master", "hdhsksk", conn, 10)
if ok, err := v.tryLock(); ok != true || err != nil {
log.Printf("[%d] get lock failed, err:%v\n", idx, err)
return
}
log.Printf("[%d] get lock success\n", idx)
time.Sleep(20 * time.Second)
v.Unlock()
}(&wg, i)
}
wg.Wait()
}
但是这种主从模式,为分布式锁也带来了另一个问题。
如何解决Redis集群下failover的情况
如果在Redis主从架构中,主节点挂掉,系统会切换到从节点(failover)。但是偶遇Redis的主从复制是异步的,这可能导致在failover过程中丧失锁的互斥性,下面我们考虑一种情况:
- 客户端A成功获取锁,
- 主节点挂掉,但是指令(存储锁的key)还没有同步到从节点。
- 从节点成为了新的主节点,客户端B从新从新的主节点获取锁。
这个时候客户端A和B同时持有了同一个资源的锁,锁的互斥性被打破。
像这种Redis集群的情况,要实现分布式锁,要考虑的问题就比前面单节点Redis复杂多了。
但是不用担心,Redis的创始人已经提供了一个官方的解决方案:RedLock算法[^ 4 ],见下面内容。
RedLock算法原理
它基于N个完全独立的Redis节点,原理如下:
加锁步骤:
获取当前时间(毫秒数)
按顺序依次向N个Redis节点执行***加锁***操作。这个加锁操作跟前面单节点Redis的加锁操作一样,包含超时时间和唯一的字符串id。
这里为了保证在某个Redis节点不可用的时候算法能够正常运行,这个加锁操作还有一个超时时间,它要远小于锁的超时时间(几十毫秒)。客户端在向某个Redis节点获取锁失败后,就立即开始尝试下一个Redis节点。这里的失败包括Redis节点不可用,也包括该Redis节点上的锁已经被其他客户端持有等等情况。
计算整个加锁过程总共消耗的时间,计算方法是用当前时间减去第1步记录的时间。如果客户端从大多数Redis节点(>= N/2+1)成功获取到锁,并且获取锁总共消耗的时间没有超过锁的超时时间,那么这时才认为客户端最终加锁成功;否则,认为最终加锁失败。
如果最终加锁成功,那么这个锁的超时时间应该重新计算,它等于最初的锁的超时时间减去上一步计算出来的加锁消耗的时间。
如果最终加锁失败(可能由于获取到锁的Redis节点个数小于N/2+1,也可能是整个获取锁的过程消耗的时间超过了锁最初的超时时间),那么客户端应该立即想所有Redis节点发起解锁操作(见下面)。
解锁步骤:
- 解锁过程很简单,向所有Redis发起解锁操作,不用管这些节点当时在加锁的时候是成功还是失败,解锁过程跟前面介绍的单节点Redis解锁过程一样。
这一块的算法,官方已经提供了各种语言的实现,不需要我们重复去造轮子:
- Redlock-rb (opens new window) (Ruby implementation). There is also a fork of Redlock-rb (opens new window) that adds a gem for easy distribution and perhaps more.
- Redlock-py (opens new window) (Python implementation).
- Aioredlock (opens new window) (Asyncio Python implementation).
- Redlock-php (opens new window) (PHP implementation).
- PHPRedisMutex (opens new window) (further PHP implementation)
- cheprasov/php-redis-lock (opens new window) (PHP library for locks)
- Redsync (opens new window) (Go implementation).
- Redisson (opens new window) (Java implementation).
- Redis::DistLock (opens new window) (Perl implementation).
- Redlock-cpp (opens new window) (C++ implementation).
- Redlock-cs (opens new window) (C#/.NET implementation).
- RedLock.net (opens new window) (C#/.NET implementation). Includes async and lock extension support.
- ScarletLock (opens new window) (C# .NET implementation with configurable datastore)
- Redlock4Net (opens new window) (C# .NET implementation)
- node-redlock (opens new window) (NodeJS implementation). Includes support for lock extension.
当然RedLock的算法在网上也有很多争论,想要了解具体内容的可以自己上网查[^ 5 ],这里就不做展开。
# 基于ETCD实现分布式锁
etcd是一个高可用的分布式键值存储系统,它主要用于配置管理和服务发现。由于etcd的两个特性:一是强一致性,二是租约机制,因此也可以用于实现分布式锁。
- 强一致性:etcd保证所有的读写操作都是强一致的,这意味着当一个进程获取到锁(在etcd中创建了一个键值对)后,其他所有的进程都能立即看到这个变化,从而知道锁已经被占用。
- 租约机制:etcd支持为键值对设置一个租约,当租约到期时,etcd会自动删除该键值对。这个特性可以用来实现锁的自动过期,防止因为进程崩溃导致的锁无法释放的问题。
基于etcd实现分布式锁的主要原理是利用etcd客户端库提供的concurrency
包以及租约(lease)机制来创建和管理锁。
以下是基于etcd实现分布式锁的具体步骤:
1、创建一个租约(lease),用于设置锁的过期时间,防止死锁。例如,使用etcd的Go客户端:
ttl := 30 // 设置租约过期时间
session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl))
这里,ttl
表示租约的过期时间(以秒为单位)。
2、当一个进程需要获取锁时,尝试获取锁。例如:
mutex := concurrency.NewMutex(session, lockKey)
err = mutex.Lock(context.Background())
这里,lockKey
表示锁的键名。如果操作成功,则表示获取到锁;如果CAS操作失败,则表示锁已被占用,需要等待。
3、当持有锁的进程完成资源访问后,删除键值对,释放锁,例如:
err = mutex.Unlock(context.Background())
如下是完整的代码示例:
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/client/v3/concurrency"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
func main() {
// 创建etcd客户端
cfg := clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(cfg)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 锁的键名
lockKey := "/my-lock"
// 创建一个新的会话
ttl := 30 // 设置租约过期时间
session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl))
if err != nil {
log.Fatal(err)
}
defer session.Close()
// 创建一个分布式锁
mutex := concurrency.NewMutex(session, lockKey)
// 尝试获取锁
fmt.Println("Trying to acquire lock...")
err = mutex.Lock(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("Lock acquired")
// 访问共享资源
fmt.Println("Accessing shared resource...")
time.Sleep(10 * time.Second)
// 释放锁
err = mutex.Unlock(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("Lock released")
}
基于etcd实现的分布式锁具有较高的可靠性和性能,适用于分布式系统中的同步控制。但需要注意的是,在使用etcd实现分布式锁时,要确保etcd集群本身的可用性和稳定性,以免影响锁的正常使用。
# 基于DB实现分布式锁
实现原理
通过MySQL实现分布式锁,主要是利用数据库表作为锁,当一个进程需要访问共享资源时,先向数据库插入一条记录,表示获取到锁。其他进程在访问资源之前也会尝试插入记录,如果插入失败则表示锁已被占用,需要等待。当持有锁的进程完成资源访问后,删除锁记录,释放锁。
# 悲观锁和乐观锁
悲观锁和乐观锁是两种常见的并发控制策略,它们主要用于解决多个进程或线程在访问共享资源时可能出现的数据不一致问题。
- 悲观锁:
悲观锁的思想是假设在多个进程或线程访问共享资源时,总是存在数据被修改的风险,因此在访问资源之前先加锁,确保同一时刻只有一个进程或线程可以访问资源。
悲观锁的实现通常依赖于底层的同步原语,如数据库的行锁、表锁等,或者编程语言提供的互斥锁、信号量等。当一个进程或线程获取到悲观锁后,其他进程或线程必须等待锁释放才能访问资源。
悲观锁适用于高并发、竞争激烈的场景,但在锁的持有时间较长时,可能导致性能下降和资源浪费。
- 乐观锁:
乐观锁的思想是假设在多个进程或线程访问共享资源时,大部分情况下都不会出现数据被修改的情况,因此在访问资源时不加锁,而是在更新资源时检查数据是否被修改过。
乐观锁的实现通常依赖于数据的版本号或者时间戳等机制。当一个进程或线程需要更新资源时,先检查资源的版本号或时间戳是否发生变化,如果没有变化,则表示资源未被其他进程或线程修改,可以进行更新;如果发生变化,则表示资源已被其他进程或线程修改,需要重新获取资源并尝试更新。
# 悲观锁实现方式
以下是通过MySQL实现分布式锁的具体步骤:
1、创建一张用于存储锁信息的表,例如:
CREATE TABLE `distributed_lock` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`lock_name` varchar(255) NOT NULL,
`lock_time` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lock_name` (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
这里,lock_name
字段用于标识锁的名称,设置为唯一索引,以保证同一时刻只有一个进程可以插入相同的锁名称。
2、当一个进程需要获取锁时,尝试向distributed_lock
表中插入一条记录,例如:
INSERT INTO distributed_lock (lock_name, lock_time) VALUES ('my_lock', NOW());
如果插入成功,则表示获取到锁;如果插入失败(例如因为唯一索引冲突),则表示锁已被占用,进程需要等待。
3、当持有锁的进程完成资源访问后,删除锁记录,释放锁,例如:
DELETE FROM distributed_lock WHERE lock_name = 'my_lock';
这种通过MySQL实现分布式锁的方式简单易用,但在高并发场景下可能会导致数据库压力过大。此外,还需要注意防止死锁的问题,例如可以通过设置锁的过期时间,或者在获取锁时加入超时机制等方式来解决。
当然前面的设计同样也会遇到一个问题:就是当一个客户端成功获取锁后,由于异常出现崩溃导致没有释放锁,那么这就有可能导致这个锁一直被占用导致其他客户端无法获取。这里我们可以启一个定时任务,评估每种类型的锁的持有时间,如果超过这个时间就任务是节点出现问题导致没有释放锁,就可以进行删除。
# Airflow中乐观锁的使用
例如比较知名的开源调度平台Airflow,就使用了乐观锁。在Airflow中,任务实例(Task Instance)的状态会在数据库中进行存储和更新。为了保证在并发环境下任务实例状态的正确性,Airflow使用了乐观锁机制。
在Airflow中,任务实例的状态信息存储在task_instance
表中,该表有一个state
字段表示任务实例的状态,以及一个version
字段表示数据的版本。每次数据更新时,version
字段的值将递增。
以下是Airflow中使用乐观锁的一个示例:
1、当需要更新任务实例的状态时,首先查询任务实例的状态及其版本号,例如:
SELECT state, version FROM task_instance WHERE task_id = 'task1' AND execution_date = '2021-01-01 00:00:00';
假设查询结果为:state='running', version=1
。
2、更新任务实例的状态时,检查版本号是否发生变化,如果没有变化,则更新状态并递增版本号;如果发生变化,则表示任务实例的状态已被其他进程或线程修改,需要重新获取状态并尝试更新。例如:
UPDATE task_instance SET state='success', version=version+1 WHERE task_id = 'task1' AND execution_date = '2021-01-01 00:00:00' AND version=1;
如果更新成功(受影响的行数大于0),则表示乐观锁生效,任务实例的状态未被其他进程或线程修改;如果更新失败(受影响的行数等于0),则表示任务实例的状态已被其他进程或线程修改,需要重新获取状态并尝试更新。
# 乐观锁的实现
乐观锁主要是通过数据版本控制(如版本号或时间戳)来实现的。在MySQL中,我们可以使用一个额外的字段(如version
)来表示数据的版本。以下是使用MySQL实现乐观锁的一个例子:
1、首先,创建一个包含版本号字段的表,例如:
CREATE TABLE `product` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`price` decimal(10,2) NOT NULL,
`version` int(11) NOT NULL DEFAULT 1,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
这里,version
字段用于表示数据的版本,每次数据更新时,version
字段的值将递增。
2、当需要更新数据时,先查询数据及其版本号,例如:
SELECT id, name, price, version FROM product WHERE id = 1;
假设查询结果为:id=1, name='item1', price=100, version=1
。
3、更新数据时,检查版本号是否发生变化,如果没有变化,则更新数据并递增版本号;如果发生变化,则表示数据已被其他进程或线程修改,需要重新获取数据并尝试更新。例如:
UPDATE product SET name='item1_updated', price=120, version=version+1 WHERE id=1 AND version=1;
如果更新成功(受影响的行数大于0),则表示乐观锁生效,数据未被其他进程或线程修改;如果更新失败(受影响的行数等于0),则表示数据已被其他进程或线程修改,需要重新获取数据并尝试更新。