| 
                         如下是一个PSS(一主两备)架构的副本集,主节点除了与两个备节点执行数据复制之外,三个节点之间还会通过心跳感知彼此的存活。 
 
一旦主节点发生故障以后,备节点将在某个周期内检测到主节点处于不可达的状态,此后将由其中一个备节点事先发起选举并最终成为新的主节点。 这个检测周期  由electionTimeoutMillis 参数确定,默认是10s。 
 
接下来,我们通过一些源码看看该机制是如何实现的: 
<<来自 MongoDB 3.4源码>> 
db/repl/replication_coordinator_impl_heartbeat.cpp 
相关方法 
    - ReplicationCoordinatorImpl::_startHeartbeats_inlock 启动各成员的心跳
 
    - ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget 调度任务-(计划)向成员发起心跳
 
    - ReplicationCoordinatorImpl::_doMemberHeartbeat 执行向成员发起心跳
 
    - ReplicationCoordinatorImpl::_handleHeartbeatResponse 处理心跳响应
 
    - ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock  调度保活状态检查定时器
 
    - ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock  取消并重新调度选举超时定时器
 
    - ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1 发起主动选举
 
 
db/repl/topology_coordinator_impl.cpp 
 
相关方法 
    - TopologyCoordinatorImpl::prepareHeartbeatRequestV1 构造心跳请求数据
 
    - TopologyCoordinatorImpl::processHeartbeatResponse 处理心跳响应并构造下一步Action实例
 
 
下面这个图,描述了各个方法之间的调用关系 
 
图-主要关系 
心跳的实现 
首先,在副本集组建完成之后,节点会通过ReplicationCoordinatorImpl::_startHeartbeats_inlock方法开始向其他成员发送心跳: 
- void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { 
 -  const Date_t now = _replExecutor.now(); 
 -  _seedList.clear(); 
 -  //获取副本集成员 
 -  for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { 
 -  if (i == _selfIndex) { 
 -  continue; 
 -  } 
 -  //向其他成员发送心跳 
 -  _scheduleHeartbeatToTarget(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); 
 -  } 
 -  //仅仅是刷新本地的心跳状态数据 
 -  _topCoord->restartHeartbeats(); 
 -  //使用V1的选举协议(3.2之后) 
 -  if (isV1ElectionProtocol()) { 
 -  for (auto&& slaveInfo : _slaveInfo) { 
 -  slaveInfo.lastUpdate = _replExecutor.now(); 
 -  slaveInfo.down = false; 
 -  } 
 -  //调度保活状态检查定时器 
 -  _scheduleNextLivenessUpdate_inlock(); 
 -  } 
 - } 
 
  
在获得当前副本集的节点信息后,调用_scheduleHeartbeatToTarget方法对其他成员发送心跳, 
这里_scheduleHeartbeatToTarget 的实现比较简单,其真正发起心跳是由 _doMemberHeartbeat 实现的,如下: 
- void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget(const HostAndPort& target, 
 -  int targetIndex, 
 -  Date_t when) { 
 -  //执行调度,在某个时间点调用_doMemberHeartbeat 
 -  _trackHeartbeatHandle( 
 -  _replExecutor.scheduleWorkAt(when, 
 -  stdx::bind(&ReplicationCoordinatorImpl::_doMemberHeartbeat, 
 -  this, 
 -  stdx::placeholders::_1, 
 -  target, 
 -  targetIndex))); 
 - } 
 
  
ReplicationCoordinatorImpl::_doMemberHeartbeat 方法的实现如下: 
- void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, 
 -  const HostAndPort& target, 
 -  int targetIndex) { 
 -  LockGuard topoLock(_topoMutex); 
 -  //取消callback 跟踪 
 -  _untrackHeartbeatHandle(cbData.myHandle); 
 -  if (cbData.status == ErrorCodes::CallbackCanceled) { 
 -  return; 
 -  } 
 -  const Date_t now = _replExecutor.now(); 
 -  BSONObj heartbeatObj; 
 -  Milliseconds timeout(0); 
 -  //3.2 以后的版本 
 -  if (isV1ElectionProtocol()) { 
 -  const std::pair<ReplSetHeartbeatArgsV1, Milliseconds> hbRequest = 
 -  _topCoord->prepareHeartbeatRequestV1(now, _settings.ourSetName(), target); 
 -  //构造请求,设置一个timeout 
 -  heartbeatObj = hbRequest.first.toBSON(); 
 -  timeout = hbRequest.second; 
 -  } else { 
 -  ... 
 -  } 
 -  //构造远程命令 
 -  const RemoteCommandRequest request( 
 -  target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout); 
 -  //设置远程命令回调,指向_handleHeartbeatResponse方法 
 -  const ReplicationExecutor::RemoteCommandCallbackFn callback = 
 -  stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse, 
 -  this, 
 -  stdx::placeholders::_1, 
 -  targetIndex); 
 -  _trackHeartbeatHandle(_replExecutor.scheduleRemoteCommand(request, callback)); 
 - } 
 
                          (编辑:泰州站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |