最近想学习一下redis源码,先看一下redis通信流程。由于功力有限,不足之处望大家指正。服务端和客户端通信,一般都是服务端先启动,那先从服务端的源码看起。
首先启动服务端会做一些初始化动作,初始化事件处理器状态,先看一下事件处理器状态的结构
//事件处理器的状态
typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
//事件状态
typedef struct aeApiState {
// epoll_event 实例描述符
int epfd;
// 事件槽
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 初始化事件槽空间
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 赋值给 eventLoop
eventLoop->apidata = state;
return 0;
}
上面创建的epoll句柄和初始化的事件槽保存到传入的eventLoop事件对象中。这个事件对象保存在全局的一个redisserver中,redisServer中结构体成员很多,这里只展示一个
struct redisServer {
//…
// 事件状态
aeEventLoop *el;
// 一个链表,保存了所有客户端状态结构
list *clients; /* List of active clients */
/…
};
aeEventLoop *el 存储刚才创建的事件状态
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
int s, rv;
char _port[6]; /* strlen(“65535″) */
struct addrinfo hints, *servinfo, *p;
snprintf(_port,6,”%d”,port);
memset(&hints,0,sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, “%s”, gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
goto end;
}
if (p == NULL) {
anetSetError(err, “unable to bind socket”);
goto error;
}
error:
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
上面的函数用来打开监听端口
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, //使文件读关联一个函数
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
“Unrecoverable error creating server.ipfd file event.”);
}
}
/*
* 根据 mask 参数的值,监听 fd 文件的状态,
* 当 fd 可用时,执行 proc 函数
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
if (fd >= eventLoop->setsize) return AE_ERR;
// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];
// 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 私有数据
fe->clientData = clientData;
// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
aeCreateFileEvent函数用来注册回调用,参数aeEventLoop *eventLoop就是前面初始化的事件处理器的状态,当AE_READABLE产生时就会调用acceptTcpHandler函数,这时是有客户端connect了。前面已经初始化了一定数量的处理器,aeApiAddEvent把所有的事件对象都注册到epoll,后面接着设置对应AE_READABLE和AE_WRITABLE对应的回调函数。
/* File event structure
*
* 文件事件结构
*/
typedef struct aeFileEvent {
// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件处理器
aeFileProc *rfileProc;
// 写事件处理器
aeFileProc *wfileProc;
// 多路复用库的私有数据
void *clientData;
} aeFileEvent;
上面是文件事件结构的结构体,对应的读和写的回调函数都保存在aeFileEvent(文件事件)中,aeFileEvent(文件事件)就是aeEventLoop(事件处理器状态)的成员,aeEventLoop(事件处理器状态)就是redisServer结构体中aeEventLoop *el(事件状态成员),所有的这些都保存在全局的redisServer结构体中。接下来就是事件处理器主循环中
/*
* 事件处理器的主循环
*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件处理前执行的函数,那么运行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 开始处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);//一直循环调用这个函数等到消息
}
}
//处理所有已到达的时间事件,以及所有已就绪的文件事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec – now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) – now_ms)*1000;
tvp->tv_sec –;
} else {
tvp->tv_usec = (shortest->when_ms – now_ms)*1000;
}
// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 文件事件可以阻塞直到有事件到达为止
tvp = NULL; /* wait forever */
}
}
// 处理文件事件,阻塞时间由 tvp 决定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & … code: maybe an already processed
* event removed an element that fired and we still didnt
* processed, so we check if the event is still valid. */
// 读事件
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
printf(“can writable\n”);
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
/*
* 获取可执行事件
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 等待时间
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);//epoll_wait用于向用户进程返回ready list
// 有至少一个事件就绪?
if (retval > 0) {
int j;
// 为已就绪事件设置相应的模式
// 并加入到 eventLoop 的 fired 数组中
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
// 返回已就绪事件个数
return numevents;
}
aeProcessEvents一直被循环调用用来处理就绪的文件事件(时间事件这里不考虑),通过调用aeApiPoll中的epoll_wait等待事件的促发。
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
/* A fired event
*
* 已就绪事件
*/
typedef struct aeFiredEvent {
// 已就绪文件描述符
int fd;
// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;
} aeFiredEvent;
上面列出了epoll结构体和aeFiredEvent(已就绪事件结构体),aeFiredEvent属于事件处理器状态(aeEventLoop)成员,并循环保存就绪文件事件对应中已就绪描述符和其类型,这些又都保存在事件处理器状态(aeEventLoop)中。函数返回到aeProcessEvents中,然后走对应的回调(这时候还没讲回调关联对应的函数)。
现在假如有客户端来连接了,按前面说的,套接字变的可读,acceptTcpHandler被调用,acceptTcpHandler函数接收客户端的连接,并为客户端创建状态,并注册读取客户端命令的函数readQueryFromClient。并把创建的客户端保存在redisServer里面的list *clients里面。
/*
* 创建一个新客户端
*/
redisClient *createClient(int fd) {
printf(“———–%s——–\n”,__FUNCTION__);
// 分配空间
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
// 当 fd 不为 -1 时,创建带网络连接的客户端
// 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
// 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时
// 需要用到这种伪终端
if (fd != -1) {
// 非阻塞
anetNonBlock(NULL,fd);
// 禁用 Nagle 算法
anetEnableTcpNoDelay(NULL,fd);
// 设置 keep alive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 绑定读事件到事件 loop (开始接收命令请求)
if (aeCreateFileEvent(server.el,fd,AE_READABLE, //客户端连接上之后,再为客户端关联一个读数据的函数。之前关联的建立连接
readQueryFromClient, c) == AE_ERR) //没有建立连接之前关联建立函数,建立连接之后关联读数据的函数
{
close(fd);
zfree(c);
return NULL;
}
}
// 初始化各个属性
// 默认数据库
selectDb(c,0);
// 套接字
c->fd = fd;
// 名字
c->name = NULL;
// 回复缓冲区的偏移量
c->bufpos = 0;
// 查询缓冲区
c->querybuf = sdsempty();
// 查询缓冲区峰值
c->querybuf_peak = 0;
// 命令请求的类型
c->reqtype = 0;
// 命令参数数量
c->argc = 0;
// 命令参数
c->argv = NULL;
// 当前执行的命令和最近一次执行的命令
c->cmd = c->lastcmd = NULL;
// 查询缓冲区中未读入的命令内容数量
c->multibulklen = 0;
// 读入的参数的长度
c->bulklen = -1;
// 已发送字节数
c->sentlen = 0;
// 状态 FLAG
c->flags = 0;
// 创建时间和最后一次互动时间
c->ctime = c->lastinteraction = server.unixtime;
// 认证状态
c->authenticated = 0;
// 复制状态
c->replstate = REDIS_REPL_NONE;
// 复制偏移量
c->reploff = 0;
// 通过 ACK 命令接收到的偏移量
c->repl_ack_off = 0;
// 通过 AKC 命令接收到偏移量的时间
c->repl_ack_time = 0;
// 客户端为从服务器时使用,记录了从服务器所使用的端口号
c->slave_listening_port = 0;
// 回复链表
c->reply = listCreate();
// 回复链表的字节量
c->reply_bytes = 0;
// 回复缓冲区大小达到软限制的时间
c->obuf_soft_limit_reached_time = 0;
// 回复链表的释放和复制函数
listSetFreeMethod(c->reply,decrRefCountVoid);
listSetDupMethod(c->reply,dupClientReplyValue);
// 阻塞类型
c->btype = REDIS_BLOCKED_NONE;
// 阻塞超时
c->bpop.timeout = 0;
// 造成客户端阻塞的列表键
c->bpop.keys = dictCreate(&setDictType,NULL);
// 在解除阻塞时将元素推入到 target 指定的键中
// BRPOPLPUSH 命令时使用
c->bpop.target = NULL;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
// 进行事务时监视的键
c->watched_keys = listCreate();
// 订阅的频道和模式
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
// 如果不是伪客户端,那么添加到服务器的客户端链表中
if (fd != -1) listAddNodeTail(server.clients,c);
// 初始化客户端的事务状态
initClientMultiState(c);
// 返回客户端
return c;
}
/* With multiplexing we need to take per-client state.
* Clients are taken in a liked list.
*
* 因为 I/O 复用的缘故,需要为每个客户端维持一个状态。
*
* 多个客户端状态被服务器用链表连接起来。
*/
typedef struct redisClient {
// 套接字描述符
int fd;
// 当前正在使用的数据库
redisDb *db;
// 当前正在使用的数据库的 id (号码)
int dictid;
// 客户端的名字
robj *name; /* As set by CLIENT SETNAME */
// 查询缓冲区
sds querybuf;
// 查询缓冲区长度峰值
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
// 参数数量
int argc;
// 参数对象数组
robj **argv;
// 记录被客户端执行的命令
struct redisCommand *cmd, *lastcmd;
// 请求的类型:内联命令还是多条命令
int reqtype;
// 剩余未读取的命令内容数量
int multibulklen; /* number of multi bulk arguments left to read */
// 命令内容的长度
long bulklen; /* length of bulk argument in multi bulk request */
// 回复链表
list *reply;
// 回复链表中对象的总大小
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
// 已发送字节,处理 short write 用
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
// 创建客户端的时间
time_t ctime; /* Client creation time */
// 客户端最后一次和服务器互动的时间
time_t lastinteraction; /* time of the last interaction, used for timeout */
// 客户端的输出缓冲区超过软性限制的时间
time_t obuf_soft_limit_reached_time;
// 客户端状态标志
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI … */
// 当 server.requirepass 不为 NULL 时
// 代表认证的状态
// 0 代表未认证, 1 代表已认证
int authenticated; /* when requirepass is non-NULL */
// 复制状态
int replstate; /* replication state if this is a slave */
// 用于保存主服务器传来的 RDB 文件的文件描述符
int repldbfd; /* replication DB file descriptor */
// 读取主服务器传来的 RDB 文件的偏移量
off_t repldboff; /* replication DB file offset */
// 主服务器传来的 RDB 文件的大小
off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
// 主服务器的复制偏移量
long long reploff; /* replication offset if this is our master */
// 从服务器最后一次发送 REPLCONF ACK 时的偏移量
long long repl_ack_off; /* replication ack offset, if this is a slave */
// 从服务器最后一次发送 REPLCONF ACK 的时间
long long repl_ack_time;/* replication ack time, if this is a slave */
// 主服务器的 master run ID
// 保存在客户端,用于执行部分重同步
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
// 从服务器的监听端口号
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
// 事务状态
multiState mstate; /* MULTI/EXEC state */
// 阻塞类型
int btype; /* Type of blocking op if REDIS_BLOCKED. */
// 阻塞状态
blockingState bpop; /* blocking state */
// 最后被写入的全局复制偏移量
long long woff; /* Last write global replication offset. */
// 被监视的键
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
// 这个字典记录了客户端所有订阅的频道
// 键为频道名字,值为 NULL
// 也即是,一个频道的集合
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
// 链表,包含多个 pubsubPattern 结构
// 记录了所有订阅频道的客户端的信息
// 新 pubsubPattern 结构总是被添加到表尾
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
/* Response buffer */
// 回复偏移量
int bufpos;
// 回复缓冲区
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
当客户端发送命令过来时,epoll返回,readQueryFromClient被调用,注意回调函数的转变。没建立连接之前是关联acceptTcpHandler,建立连接之后关联readQueryFromClient函数读取客户端的数据。
/*
* 读取客户端的查询缓冲区内容
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
printf(“———–%s——–\n”,__FUNCTION__);
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// 设置服务器的当前客户端
server.current_client = c;
// 读入长度(默认为 16 MB)
readlen = REDIS_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
// 获取查询缓冲区当前内容的长度
// 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面
// 这些滞留内容也许不能完整构成一个符合协议的命令,
qblen = sdslen(c->querybuf);
// 如果有需要,更新缓冲区内容长度的峰值(peak)
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 为查询缓冲区分配空间
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 读入内容到查询缓存
nread = read(fd, c->querybuf+qblen, readlen);//接收客户端发送过来的数据到
// 读入出错
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, “Reading from client: %s”,strerror(errno));
freeClient(c);
return;
}
// 遇到 EOF
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, “Client closed connection”);
freeClient(c);
return;
}
if (nread) {
// 根据内容,更新查询缓冲区(SDS) free 和 len 属性
// 并将 \0 正确地放到内容的最后
sdsIncrLen(c->querybuf,nread);
// 记录服务器和客户端最后一次互动的时间
c->lastinteraction = server.unixtime;
// 如果客户端是 master 的话,更新它的复制偏移量
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
// 在 nread == -1 且 errno == EAGAIN 时运行
server.current_client = NULL;
return;
}
// 查询缓冲区长度超出服务器最大缓冲区长度
// 清空缓冲区并释放客户端
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,”Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)”, ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
// 从查询缓存重读取内容,创建参数,并执行命令
// 函数会执行到缓存中的所有内容都被处理完为止
processInputBuffer(c);
server.current_client = NULL;
}
收到客户端的命令之后就要分析并执行命令,然后被结果返给客户端。readQueryFromClient->processInputBuffer->processCommand->addReply->prepareClientToWrite。prepareClientToWrite这个函数就是注册回复客户端的函数sendReplyToClient。
int prepareClientToWrite(redisClient *c) {
// LUA 脚本环境所使用的伪客户端总是可写的
if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
// 客户端是主服务器并且不接受查询,
// 那么它是不可写的,出错
if ((c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
// 无连接的伪客户端总是不可写的
if (c->fd <= 0) return REDIS_ERR; /* Fake client */
// 一般情况,为客户端套接字安装写处理器到事件循环
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
每一个阶段都关联一个回调函数,当事件触发后走回调函数。