编辑
2023-11-16
Redis源码阅读
00
请注意,本文编写于 540 天前,最后修改于 540 天前,其中某些信息可能已经过时。

目录

复制方式
主从复制四大阶段
1.初始化阶段
2.建立连接
3.主从握手
4.复制类型判断与执行
状态机
初始化中状态机的变化
建立连接时状态机变化
主从握手阶段
复制类型判断与执行

复制方式

  • 全量复制:传输RDB文件
  • 增量复制:传递断连时的命令
  • 长连接同步:主节点收到的请求发送给从节点

主从复制四大阶段

1.初始化阶段

  • 方式1:从库执行 replicaof masterip masterhost
  • 方式2:从库配置文件replicaof masterip masterhost
  • 方式3:从库设置启动参数-replicaof masterip masterhost

2.建立连接

用获取到的API去连接主库,并开始监听主库的命令

3.主从握手

互相发送ping-pong,然后把自己的地址发给主库,还有自己对PSYNC协议支持情况

4.复制类型判断与执行

握手之后,从库向主库发送PSYNC命令,主库根据这条命令决定执行增量,全量复制,或者错误处理。

状态机

初始化中状态机的变化

redisServer结构体中集群相关字段

c
/* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ sds masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ int masterport; /* Port of master */ int repl_timeout; /* Timeout after N seconds of master idle */ client *master; /* Client that is master for this slave */ client *cached_master; /* Cached master to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */

server.c的main函数里初始化配置InitServerConfig 函数里面第一次设置状态机的状态

c
server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; server.repl_transfer_tmpfile = NULL; server.repl_transfer_fd = -1; server.repl_transfer_s = NULL; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;

执行replicaofCommand其实是执行以下命令

c
void replicaofCommand(client *c) { /* SLAVEOF is not allowed in cluster mode as replication is automatically * configured using the current address of the master node. */ if (server.cluster_enabled) { addReplyError(c,"REPLICAOF not allowed in cluster mode."); return; } if (server.failover_state != NO_FAILOVER) { addReplyError(c,"REPLICAOF not allowed while failing over."); return; } /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); } } else { long port; if (c->flags & CLIENT_SLAVE) { /* If a client is already a replica they cannot run this command, * because it involves flushing all replicas (including this * client) */ addReplyError(c, "Command is not valid when client is a replica."); return; } if (getRangeLongFromObjectOrReply(c, c->argv[2], 0, 65535, &port, "Invalid master port") != C_OK) return; /* Check if we are already attached to the specified master */ if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " "with the master we are already connected " "with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified " "master\r\n")); return; } /* There was no previous master or the user specified a different one, * we can continue. */ replicationSetMaster(c->argv[1]->ptr, port); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')", server.masterhost, server.masterport, client); sdsfree(client); } addReply(c,shared.ok); } ************************************************************************ /* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) { int was_master = server.masterhost == NULL; sdsfree(server.masterhost); server.masterhost = NULL; if (server.master) { freeClient(server.master); } disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ /* Setting masterhost only after the call to freeClient since it calls * replicationHandleMasterDisconnection which can trigger a re-connect * directly from within that call. */ server.masterhost = sdsnew(ip); server.masterport = port; /* Update oom_score_adj */ setOOMScoreAdj(-1); /* Here we don't disconnect with replicas, since they may hopefully be able * to partially resync with us. We will disconnect with replicas and force * them to resync with us when changing replid on partially resync with new * master, or finishing transferring RDB and preparing loading DB on full * sync with new master. */ cancelReplicationHandshake(0); /* Before destroying our master state, create a cached master using * our own parameters, to later PSYNC with the new master. */ if (was_master) { replicationDiscardCachedMaster(); replicationCacheMasterUsingMyself(); } /* Fire the role change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, NULL); /* Fire the master link modules event. */ if (server.repl_state == REPL_STATE_CONNECTED) moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, NULL); server.repl_state = REPL_STATE_CONNECT; // serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); connectWithMaster(); }

建立连接时状态机变化

c
/* Slave replication state. Used in server.repl_state for slaves to remember * what to do next. */ typedef enum { REPL_STATE_NONE = 0, /* No active replication */ REPL_STATE_CONNECT, /* Must connect to master */ REPL_STATE_CONNECTING, /* Connecting to master */ /* --- Handshake states, must be ordered --- */ REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */ REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_CONNECTED, /* Connected to master */ } repl_state;

在redis的周期任务中replicationCron

c
/* Check if we should connect to a MASTER */ if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); connectWithMaster(); }

这个任务1000ms执行一次

c
int connectWithMaster(void) { server.repl_transfer_s = connCreate(connTypeOfReplication()); if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport, server.bind_source_addr, syncWithMaster) == C_ERR) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_transfer_s); server.repl_transfer_s = NULL; return C_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_state = REPL_STATE_CONNECTING; //改变状态 serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); return C_OK; }

主从握手阶段

c
/* Send a PING to check the master is able to reply without errors. */ if (server.repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ connSetReadHandler(conn, syncWithMaster); connSetWriteHandler(conn, NULL); server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; //改变状态 /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ err = sendCommand(conn,"PING",NULL); if (err) goto write_error; return; } -------------------------------------------------- /* This handler fires when the non blocking connect was able to * establish a connection with the master. */ void syncWithMaster(connection *conn) { char tmpfile[256], *err = NULL; int dfd = -1, maxtries = 5; int psync_result; /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ if (server.repl_state == REPL_STATE_NONE) { connClose(conn); return; } /* Check for errors in the socket: after a non blocking connect() we * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING,"Error condition on socket for SYNC: %s", connGetLastError(conn)); goto error; } /* Send a PING to check the master is able to reply without errors. */ if (server.repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ connSetReadHandler(conn, syncWithMaster); connSetWriteHandler(conn, NULL); server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ err = sendCommand(conn,"PING",NULL); if (err) goto write_error; return; } /* Receive the PONG command. */ if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) { err = receiveSynchronousResponse(conn); /* The master did not reply */ if (err == NULL) goto no_response_error; /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. * Note that older versions of Redis replied with "operation not * permitted" instead of using a proper error code, so we test * both. */ if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-NOPERM",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) { serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); sdsfree(err); goto error; } else { serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); } sdsfree(err); err = NULL; server.repl_state = REPL_STATE_SEND_HANDSHAKE; } if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { /* AUTH with the master if required. */ if (server.masterauth) { char *args[3] = {"AUTH",NULL,NULL}; size_t lens[3] = {4,0,0}; int argc = 1; if (server.masteruser) { args[argc] = server.masteruser; lens[argc] = strlen(server.masteruser); argc++; } args[argc] = server.masterauth; lens[argc] = sdslen(server.masterauth); argc++; err = sendCommandArgv(conn, argc, args, lens); if (err) goto write_error; } /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ { int port; if (server.slave_announce_port) port = server.slave_announce_port; else if (server.tls_replication && server.tls_port) port = server.tls_port; else port = server.port; sds portstr = sdsfromlonglong(port); err = sendCommand(conn,"REPLCONF", "listening-port",portstr, NULL); sdsfree(portstr); if (err) goto write_error; } /* Set the slave ip, so that Master's INFO command can list the * slave IP address port correctly in case of port forwarding or NAT. * Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ if (server.slave_announce_ip) { err = sendCommand(conn,"REPLCONF", "ip-address",server.slave_announce_ip, NULL); if (err) goto write_error; } /* Inform the master of our (slave) capabilities. * * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. * * The master will ignore capabilities it does not understand. */ err = sendCommand(conn,"REPLCONF", "capa","eof","capa","psync2",NULL); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; return; } if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.masterauth) server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; /* Receive AUTH reply. */ if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; if (err[0] == '-') { serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); err = NULL; server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; return; } /* Receive REPLCONF listening-port reply. */ if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF listening-port: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; return; } if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip) server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; /* Receive REPLCONF ip-address reply. */ if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; /* Ignore the error if any, not all the Redis versions support * REPLCONF ip-address. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF ip-address: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; return; } /* Receive CAPA reply. */ if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; /* Ignore the error if any, not all the Redis versions support * REPLCONF capa. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); } sdsfree(err); err = NULL; server.repl_state = REPL_STATE_SEND_PSYNC; } /* Try a partial resynchronization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC * to start a full resynchronization so that we get the master replid * and the global offset, to try a partial resync at the next * reconnection attempt. */ if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); abortFailover("Write error to failover target"); goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { serverLog(LL_WARNING,"syncWithMaster(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); goto error; } psync_result = slaveTryPartialResynchronization(conn,1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Check the status of the planned failover. We expect PSYNC_CONTINUE, * but there is nothing technically wrong with a full resync which * could happen in edge cases. */ if (server.failover_state == FAILOVER_IN_PROGRESS) { if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { clearFailoverState(); } else { abortFailover("Failover target rejected psync request"); return; } } /* If the master is in an transient error, we should try to PSYNC * from scratch later, so go to the error path. This happens when * the server is loading the dataset or is not connected with its * master and so forth. */ if (psync_result == PSYNC_TRY_LATER) goto error; /* Note: if PSYNC does not return WAIT_REPLY, it will take care of * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n"); } return; } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the server.master_replid and master_initial_offset are * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE,"Retrying with SYNC..."); if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING,"I/O error writing to MASTER: %s", connGetLastError(conn)); goto error; } } /* Prepare a suitable temp file for bulk transfer */ if (!useDisklessLoad()) { while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); goto error; } server.repl_transfer_tmpfile = zstrdup(tmpfile); server.repl_transfer_fd = dfd; } /* Setup the non blocking download of the bulk file. */ if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } server.repl_state = REPL_STATE_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_lastio = server.unixtime; return; no_response_error: /* Handle receiveSynchronousResponse() error when master has no reply */ serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake"); /* Fall through to regular error handling */ error: if (dfd != -1) close(dfd); connClose(conn); server.repl_transfer_s = NULL; if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile); server.repl_transfer_tmpfile = NULL; server.repl_transfer_fd = -1; server.repl_state = REPL_STATE_CONNECT; return; write_error: /* Handle sendCommand() errors. */ serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err); sdsfree(err); goto error; }

一系列的握手

复制类型判断与执行

前面一系列操作之后,从库的状态来到了REPL_STATE_SEND_PSYNC,表示要发送PSYNC命令,执行slaveTryPartialResynchronization

c
//全量复制 if (!strncmp(reply,"+FULLRESYNC",11)) { char *replid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the replid * and the replication offset. */ replid = strchr(reply,' '); if (replid) { replid++; offset = strchr(replid,' '); if (offset) offset++; } if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master * replid to make sure next PSYNCs will fail. */ memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); } else { memcpy(server.master_replid, replid, offset-replid-1); server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; server.master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", server.master_replid, server.master_initial_offset); } sdsfree(reply); return PSYNC_FULLRESYNC; } //增量复制 if (!strncmp(reply,"+CONTINUE",9)) { /* Partial resync was accepted. */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); /* Check the new replication ID advertised by the master. If it * changed, we need to set the new ID as primary ID, and set * secondary ID as the old master ID up to the current offset, so * that our sub-slaves will be able to PSYNC with us after a * disconnection. */ char *start = reply+10; char *end = reply+9; while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; if (end-start == CONFIG_RUN_ID_SIZE) { char new[CONFIG_RUN_ID_SIZE+1]; memcpy(new,start,CONFIG_RUN_ID_SIZE); new[CONFIG_RUN_ID_SIZE] = '\0'; if (strcmp(new,server.cached_master->replid)) { /* Master ID changed. */ serverLog(LL_NOTICE,"Master replication ID changed to %s",new); /* Set the old ID as our ID2, up to the current offset+1. */ memcpy(server.replid2,server.cached_master->replid, sizeof(server.replid2)); server.second_replid_offset = server.master_repl_offset+1; /* Update the cached master ID and our own primary ID to the * new one. */ memcpy(server.replid,new,sizeof(server.replid)); memcpy(server.cached_master->replid,new,sizeof(server.replid)); /* Disconnect all the sub-slaves: they need to be notified. */ disconnectSlaves(); } }

这整个过程,主库是不需要状态机的,因为整个过程都是从库在主动沟通,而主库只需要判断从库的命令并回复就行,是响应式的。

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!