从代码实现看分布式锁的原子性保证

了解一下Redis命令处理过程 及 Redis分布式锁

(1) Redis实现分布式锁

通过Redis SET key value NX 可以简单地实现分布式锁。

127.0.0.1:6379> SET key_lock value1 NX
OK
127.0.0.1:6379>
127.0.0.1:6379> SET key_lock value1 NX
(nil)
127.0.0.1:6379>

在解锁时可以先判断key是否存在,然后对比值是否相等,相等后再删除key,释放锁。

有没有想过一个问题,Redis SET key value NX 是怎么保证分布式锁的原子性的?


(2) Redis命令的处理过程

Redis Server 和客户端建立连接后,就会在事件驱动框架中注册可读事件,这就对应了客户端的命令请求。
而对于整个命令处理的过程来说,我认为主要可以分成五个阶段,它们分别对应了 Redis 源码中的不同函数。

  1. 接收请求,对应 acceptTcpHandler 函数
  2. 命令读取,对应 readQueryFromClient 函数;
  3. 命令解析,对应 processInputBuffer 函数;
  4. 命令执行,对应 processCommand 函数;
  5. 结果返回,对应 addReply 函数;

(2.1) 接收请求阶段-acceptTcpHandler()

// file: src/networking.c 
/*
 * 接收tcp处理器
 * 
 * @param *el
 * @param fd
 * @param *privdata
 * @param mask
 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 // ... 
 // 接收tcp请求
 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
 // ... 
 // 接收通用处理
 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
 acceptTcpHandler -> anetTcpAccept -> acceptCommonHandler -> createClient -> readQueryFromClient

(2.2) 命令读取阶段-readQueryFromClient()

readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。

// file: src/networking.c
/**
 * @param *conn
 */ 
void readQueryFromClient(connection *conn) {
 // 从连接的私有数据获取client 
 // 在创建连接时把client放到了connection的private_data字段
 client *c = connGetPrivateData(conn);
 // 1024*16 = 16KB
 readlen = PROTO_IOBUF_LEN;
 // 
 qblen = sdslen(c->querybuf);
 // 为查询缓冲区分配空间
 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
 // 调用read从描述符为fd的客户端socket中读取数据
 nread = connRead(c->conn, c->querybuf+qblen, readlen);
 // 客户端输入缓冲区中有更多的数据,请继续解析它,以防检查是否有要执行的完整命令。 
 processInputBuffer(c); 
}


(2.3) 命令解析-processInputBuffer()

// file: src/networking.c
/* 
 * 这个函数每次都被调用,在客户端结构'c'中,有更多的查询缓冲区需要处理,
 * 因为我们从socket中读取了更多的数据,或者因为客户端被阻塞并稍后重新激活,
 * 所以可能有待处理的查询缓冲区,已经 表示要处理的完整命令。
 * 
 * @param *c 客户端 
 */
void processInputBuffer(client *c) {
 // 输入缓冲区中有内容时继续处理 
 while(c->qb_pos < sdslen(c->querybuf)) {
 
 // 省略部分代码
 // 判断请求类型
 if (!c->reqtype) {
 // 根据客户端输入缓冲区的命令开头字符判断命令类型
 if (c->querybuf[c->qb_pos] == '*') {
 // 符合RESP协议的命令
 c->reqtype = PROTO_REQ_MULTIBULK;
 } else {
 // 管道类型命令
 c->reqtype = PROTO_REQ_INLINE;
 }
 }
 if (c->reqtype == PROTO_REQ_INLINE) { // 管道类型命令
 // 调用processInlineBuffer函数解析
 if (processInlineBuffer(c) != C_OK) break;
 // 如果 Gopher 模式并且我们得到零个或一个参数,则以 Gopher 模式处理请求。 
 // 为避免数据竞争,如果启用 io 线程读取查询,Redis 将不支持 Gopher。 
 if (server.gopher_enabled && !server.io_threads_do_reads &&
 ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
 c->argc == 0))
 {
 processGopherRequest(c);
 resetClient(c);
 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
 break;
 }
 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { // RESP协议命令
 // 调用processMultibulkBuffer函数解析
 if (processMultibulkBuffer(c) != C_OK) break;
 } else {
 serverPanic("Unknown request type");
 }
 // 批量处理可以看 <= 0 长度。
 if (c->argc == 0) {
 resetClient(c); // 重置客户端
 } else {
 // 如果我们处于 I/O 线程的上下文中,我们无法真正执行此处的命令。 
 // 我们所能做的就是将客户端标记为需要处理命令的客户端。 
 if (c->flags & CLIENT_PENDING_READ) {
 c->flags |= CLIENT_PENDING_COMMAND;
 break;
 }
 // 执行命令并重置客户端 
 if (processCommandAndResetClient(c) == C_ERR) {
 // 如果客户端不再有效,我们将避免退出此循环并稍后修剪客户端缓冲区。 
 // 所以在这种情况下我们会尽快返回。 
 return;
 }
 }
 }
} 
// file: src/networking.c
/* 
 * 此函数调用 processCommand(),但也为客户端执行一些在该上下文中有用的子任务:
 * 1. 它将当前客户端设置为客户端“c”。
 * 2. 如果处理了命令,则调用 commandProcessed()。
 *
 * 如果客户端因处理命令的副作用而被释放,则该函数返回 C_ERR,否则返回 C_OK。
 *
 * @param *c 客户端 
 */
int processCommandAndResetClient(client *c) {
 int deadclient = 0;
 server.current_client = c;
 // 处理命令
 if (processCommand(c) == C_OK) {
 commandProcessed(c);
 }
 
 return deadclient ? C_ERR : C_OK;
}


(2.4) 命令执行-processCommand()

// file: src/server.c
/**
 * 处理各种命令 get set del exits quit lpush sadd 等
 *
 * @param *c 
 */
int processCommand(client *c) {
 // 查找命令,并进行命令合法性检查,以及命令参数个数检查
 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
 // ... 省略其他命令处理逻辑 
 // 处理命令
 if (c->flags & CLIENT_MULTI &&
 c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
 { // 如果是 MULTI 事务,则入队
 queueMultiCommand(c);
 addReply(c,shared.queued);
 } else { 
 // 调用 call 直接处理
 call(c,CMD_CALL_FULL);
 c->woff = server.master_repl_offset;
 if (listLength(server.ready_keys))
 handleClientsBlockedOnKeys();
 }
 return C_OK;
}

(2.4.1) 查找对应命令-lookupCommand

/*
 * 查找命令
 */
struct redisCommand *lookupCommand(sds name) {
 // 
 return dictFetchValue(server.commands, name);
}

server.commands对应的redisCommandTable如下

struct redisCommand redisCommandTable[] = {
 {"get",getCommand,2,
 "read-only fast @string",
 0,NULL,1,1,1,0,0,0},
 /* Note that we can't flag set as fast, since it may perform an
 * implicit DEL of a large key. */
 {"set",setCommand,-3,
 "write use-memory @string",
 0,NULL,1,1,1,0,0,0},
 // 省略部分内容
}

server.commands是在populateCommandTable函数里赋值的

/* Populates the Redis Command Table starting from the hard coded list
 * we have on top of server.c file. */
void populateCommandTable(void) {
 int j;
 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
 for (j = 0; j < numcommands; j++) {
 struct redisCommand *c = redisCommandTable+j;
 int retval1, retval2;
 /* Translate the command string flags description into an actual
 * set of flags. */
 if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
 serverPanic("Unsupported command flag");
 c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
 retval1 = dictAdd(server.commands, sdsnew(c->name), c);
 /* Populate an additional dictionary that will be unaffected
 * by rename-command statements in redis.conf. */
 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
 serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
 }
}

(2.4.2) 执行命令-call


/*
 * call() 是 Redis 执行命令的核心。
 *
 * @param *c
 * @param flags 
 */
void call(client *c, int flags) {
 // 要执行的redis命令
 struct redisCommand *real_cmd = c->cmd;
 // 调用命令处理函数 
 c->cmd->proc(c);
}
// file: src/t_string.c
/**
 * 
 * @param *c
 * @param flags
 * @param *key
 * @param *val
 * @param *expire
 * @param unit
 * @param *ok_reply
 * @param *abort_reply
 */
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
 // 64位精度整数 
 long long milliseconds = 0; /* initialized to avoid any harmness warning */
 if (expire) {
 if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
 return;
 if (milliseconds <= 0) {
 addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
 return;
 }
 if (unit == UNIT_SECONDS) milliseconds *= 1000;
 }
 // 如果有NX选项,那么查找key是否已经存在
 if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
 (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
 {
 // 如果已存在,返回空值
 addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
 return;
 }
 // 
 genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
 server.dirty++;
 // 设置过期
 if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
 // 发布key事件
 notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
 if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
 "expire",key,c->db->id);
 addReply(c, ok_reply ? ok_reply : shared.ok);
}

(2.5) 结果返回-addReply()

// file: src/networking.c 
/* -----------------------------------------------------------------------------
 * 更高级别的函数用于在客户端输出缓冲区上对数据进行排队。
 * 以下函数是命令实现将调用的函数。
 * -------------------------------------------------------------------------- */
/* 
 * 将对象“obj”字符串表示添加到客户端输出缓冲区。 
 * 
 * @param *c redis client 
 * @param *obj 命令执行的结果 类型是redisObject
 */
void addReply(client *c, robj *obj) {
 // 判断client是否可以接收新数据 (假客户端不能接收)
 if (prepareClientToWrite(c) != C_OK) return;
 // 根据redisobject格式把数据写入缓存
 if (sdsEncodedObject(obj)) { // obj如果是row或者embstr格式
 // 尝试将应答添加到客户端结构中的静态缓冲区。
 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
 // 将回复添加到回复列表中。
 _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); 
 } else if (obj->encoding == OBJ_ENCODING_INT) { // obj 是数字格式
 // 对于整数编码字符串,我们只需使用优化函数将其转换为字符串,并将结果字符串附加到输出缓冲区。
 char buf[32];
 // 数字转为字符串
 size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
 if (_addReplyToBuffer(c,buf,len) != C_OK)
 _addReplyProtoToList(c,buf,len);
 } else {
 serverPanic("Wrong obj->encoding in addReply()");
 }
}

(3) IO多路复用对命令原子性的影响

IO 多路复用机制是在 readQueryFromClient 函数执行前发挥作用的。它实际是在事件驱动框架中调用 aeApiPoll 函数,获取一批已经就绪的 socket 描述符。然后执行一个循环,针对每个就绪描述符上的读事件,触发执行 readQueryFromClient 函数。

在实际处理时,Redis 的主线程仍然是针对每个事件逐一调用回调函数进行处理的。而且对于写事件来说,IO 多路复用机制也是针对每个事件逐一处理的。

/*
 * 处理事件 返回处理完的事件个数
 *
 * 0 不做任何处理
 * 1 AE_FILE_EVENTS 处理文件事件
 * 2 AE_TIME_EVENTS 处理时间事件 
 * 3 AE_ALL_EVENTS 所有事件 
 * 4 AE_DONT_WAIT 
 * 8 AE_CALL_BEFORE_SLEEP 
 * 16 AE_CALL_AFTER_SLEEP 
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
 int processed = 0, numevents;
 struct timeval tv, *tvp;
 // 如果eventLoop处理前的函数不为空,就执行
 if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
 eventLoop->beforesleep(eventLoop);
 // 调用多路复用 API,仅在超时或某些事件触发时返回 
 // 处理文件事件,阻塞时间由tvp决定 
 numevents = aeApiPoll(eventLoop, tvp);
 // 处理后的函数不为空
 /* After sleep callback. */
 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
 eventLoop->aftersleep(eventLoop);
 for (j = 0; j < numevents; j++) {
 // 先从eventLoop->fired[j]获取已就绪事件结构体(aeFiredEvent) 获取fd后 再从eventLoop->events注册事件里获取对应的事件结构体(aeFileEvent) 
 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
 
 // ...
 // 如果可读
 if (!invert && fe->mask & mask & AE_READABLE) {
 // 调用读事件回调函数 对应 acceptTcpHandler 
 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
 fired++;
 fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
 }
 // 如果可写 触发写事件
 if (fe->mask & mask & AE_WRITABLE) {
 if (!fired || fe->wfileProc != fe->rfileProc) {
 // 调用写事件回调函数 对应 acceptTcpHandler 
 fe->wfileProc(eventLoop,fd,fe->clientData,mask);
 fired++;
 }
 }
 processed++;
 }
 
 return processed; /* return the number of processed file/time events */
}

即使使用了 IO 多路复用机制,命令的整个处理过程仍然可以由 IO 主线程来完成,也仍然可以保证命令执行的原子性。

参考资料

RedisIO模型 https://weikeqin.com/2022/01/...

Redis源码剖析与实战 学习笔记 Day14 14 | 从代码实现看分布式锁的原子性保证
https://time.geekbang.org/col...

作者:wkq2786130原文地址:https://segmentfault.com/a/1190000043362549

%s 个评论

要回复文章请先登录注册