kafka消费者(五):heartbeat

it2024-07-20  69

消费者定期向服务端的GgroupCoordinator发送HeartbeatRequest来确定彼此在线。首先介绍心跳请求的格式,比较简单,依次包含group_id(String)、group_generation_id、member_id(String)三个字段。HeartbeatResponse则只包含一个short类型的error_code。 HeartbeatTask是一个实现DelayedTask接口的定时任务,负责定时发送HeartbeatRequest并处理其响应。

public void run(final long now) { if (generation < 0 || needRejoin() || coordinatorUnknown()) { // 判断是否需要发送心跳 //1. GroupCoordinator已经确认并且已经连接 //2. 不处于正在等在Partition分配结果的状态 //3. 之前的心跳正常收到响应,并没有过期 return; } if (heartbeat.sessionTimeoutExpired(now)) { // 检测心跳回应是否超时,如果超时,就认为GroupCoordinator宕机,调用coordinatorDead() coordinatorDead(); return; } if (!heartbeat.shouldHeartbeat(now)) { // 检测心跳是否到期,如果没有到期则把heartbeatTask对象重新添加到DelayTaskQueue中,等到其后期执行。如果已经到期则发送Heartbeat请求 client.schedule(this, now + heartbeat.timeToNextHeartbeat(now)); } else { //更新最近一次发送Heartbeat请求的时间,把requestInFlight设置为true,表示有未响应的心跳请求,防止重复发送。 heartbeat.sentHeartbeat(now); requestInFlight = true; //创建并缓存心跳请求,调用ConsumerNetworkClient.send()方法,把请求放入unsent集合中缓存。 RequestFuture<Void> future = sendHeartbeatRequest(); //添加监听器 future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { requestInFlight = false; long now = time.milliseconds(); //设置heartbeat.lastHeartbeatReceive=now heartbeat.receiveHeartbeat(now); long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now); client.schedule(HeartbeatTask.this, nextHeartbeatTime); } @Override public void onFailure(RuntimeException e) { requestInFlight = false; client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs); } }); } } protected void coordinatorDead() { if (this.coordinator != null) { log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId); //把unsent中缓存要发送给Coordinator节点的请求全部清空,并标记异常 client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE); this.coordinator = null; } }

发送心跳

public RequestFuture<Void> sendHeartbeatRequest() { //创建HeartbeatResponse HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId); //使用HeartbeatCompletionHandler对RequestFuture<ClientResponse>进行适配 return client.send(coordinator, ApiKeys.HEARTBEAT, req) .compose(new HeartbeatCompletionHandler()); }

对心跳进行解析

protected abstract class CoordinatorResponseHandler<R, T> extends RequestFutureAdapter<ClientResponse, T> { //待处理的响应 protected ClientResponse response; //parse()和handle()两个抽象方法 public abstract R parse(ClientResponse response); public abstract void handle(R response, RequestFuture<T> future); @Override public void onFailure(RuntimeException e, RequestFuture<T> future) { // mark the coordinator as dead if (e instanceof DisconnectException) coordinatorDead(); //调用adapted对象的raise方法 future.raise(e); } @Override public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) { try { this.response = clientResponse; //解析ClientResponse,handle R responseObj = parse(clientResponse); handle(responseObj, future); } catch (RuntimeException e) { if (!future.isDone()) future.raise(e); } } }

处理心跳响应

private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { @Override public HeartbeatResponse parse(ClientResponse response) { return new HeartbeatResponse(response.responseBody()); } @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); //解析错误码,ILLEGAL_GENERATION Errors error = Errors.forCode(heartbeatResponse.errorCode()); if (error == Errors.NONE) { log.debug("Received successful heartbeat response for group {}", groupId); future.complete(null); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", groupId, coordinator); coordinatorDead(); future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) { //generationID过期,GroupCoordinator已经开始了新的rebalance操作,会把rejoinNeeded设置为true,重新加入消费者组。 log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) { //Coordinator识别不了Consumer,就清空memberId,重新加入消费者组 log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } } }

 

最新回复(0)