概览
本文分以下几点:
- Eureka的高可用实现
a. eureka客户端与服务端通讯机制
b. eureka服务端与其他服务节点的通讯
c. eureka服务端的自我保护机制 - Eureka与ZK的比较
a. CAP模型
b. 使用场景
Eureka高可用实现
Eureka的高可用通过以下三点实现:
a. eureka客户端与服务端通讯机制
b. eureka服务端与其他服务节点的通讯
c. eureka服务端的自我保护机制
eureka客户端与服务端通讯机制
可以看下客户端与Eureka服务端之间的通讯的概述流程:
同时为了更好的说明高可用的实现,在这里大概描述一下Eureka服务端的存储结构:
服务注册
客户端请求
POST /eureka/v2/apps
接口,将客户端的服务信息上传PeerAwareInstanceRegistryImpl.java
399行:1
2
3
4
5
6
7
8int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 本地注册
super.register(info, leaseDuration, isReplication);
// 协同其他eureka服务节点同步注册
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);Eureka服务端将客户端信息保存至
Registry
注册信息载体内将此次添加变更增加到变更队列
AbstractInstanceRegistry.java
194行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 读锁
read.lock();
// 存储本次注册信息(appName -> instanceId -> instanceInfo)
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// ...其他逻辑
// 将本次添加变更加入最近变更队列内
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 设置租约的上一次更新时间
registrant.setLastUpdatedTimestamp();
// 执行缓存清空
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
// 读锁解锁
read.unlock();
}
}清空读写缓存(先清理APP、再清理ALL_APPS、最后清理ALL_APPS_DELTA)
ResponseCacheImpl.java
251行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
// 删除appName本身的缓存
// 删除ALL_APPS的缓存
// 删除ALL_APPS_DELTA的缓存
invalidate(
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
// 存在虚拟地址名称的,删除虚拟地址为key的缓存
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
// 存在安全虚拟地址名称的,删除安全虚拟地址为key的缓存
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
ResponseCacheImpl.java
277行:
1 | public void invalidate(Key... keys) { |
判断不存存在其他eureka服务节点,或者本次请求为同步请求则完成注册
PeerAwareInstanceRegistryImpl.java
620行:1
2
3if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}否则,获取所有其他的服务节点,排除本身服务,将本次注册同步至其他eureka服务节点
PeerAwareInstanceRegistryImpl.java
624行:1
2
3
4
5
6
7
8for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// 排除自身
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 同步其他服务节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
服务续约
- 客户端请求
PUT /eureka/v2/apps/{appId}
- 根据appName获取租约
- 更新租约
- 同步至其他eureka服务节点
续约类似心跳机制,客户端会按照默认时间的30秒定时做一次续约,如果超过3次没有成功,则服务端会将该客户端剔除。
服务取消注册
- 客户端请求
DELETE /eureka/v2/apps/{appId}
- 根据appName获取租约
- 设置租约过期时间
evictionTimestamp
为当前时间 - 将此次删除增加到变更队列中
- 清空读写缓存(先清理APP、再清理ALL_APPS、最后清理ALL_APPS_DELTA)
服务剔除
Eureka服务端会在内部会初始化一个
Timer
定时器用于定时调度处理剔除任务;剔除时间间隔为evictionIntervalTimerInMs
AbstractInstanceRegistry.java
1212行:1
2
3
4
5
6
7
8
9protected void postInit() {
// ....
// 设置新的过期任务
evictionTaskRef.set(new EvictionTask());
// 任务调度
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}判断是否启用自我保护,如果禁用,则不进行服务剔除
判断 上一分钟实际的续约次数 <=
numberOfRenewsPerMinThreshold
,则会触发自我保护机制,停止服务剔除PeerAwareInstanceRegistryImpl.java
474行:1
2
3
4
5
6// 判断自我保护是否禁用
if (!isSelfPreservationModeEnabled()) {
return true;
}
// 自我保护机制阈值判断
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;遍历
Registry
内所有的租约信息,判断当前租约是否过期将这些过期的租约放置到一个过期列表内
AbstractInstanceRegistry.java
597行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
// 遍历租约
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 判断租约是否过期
// additionalLeaseMs是补偿时间,防止由于GC或者本地时间造成的一个时间误差,确保能够按照预期时间执行
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
// 将过期租约加入过期列表
expiredLeases.add(lease);
}
}
}
}计算可被剔除的过期实例数(过期数 = Math.min(过期列表大小, (本地租约数 - 本地租约数 * 续约百分比)))
AbstractInstanceRegistry.java
612行:1
2
3
4
5
6// 获取本地租约数
int registrySize = (int) getLocalRegistrySize();
// 计算租约
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
// 实际需要剔除的租约个数
int evictionLimit = registrySize - registrySizeThreshold;使用洗牌算法找出需要剔除的n个实例
AbstractInstanceRegistry.java
620行:1
2
3
4
5
6
7
8// 按照剔除数遍历,每次的交换对象都是基于上次的随机结果
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// 筛选出需要交换位置的索引next并与i交换位置
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
// 剔除逻辑
}设置租约过期时间
evictionTimestamp
为当前时间- 将此次删除增加到变更队列中
- 清空读写缓存
eureka服务端与其他服务节点的通讯
eureka服务端与其他服务节点的通讯主要包含两部分:
- eureka服务端启动时候自动拉取其他服务节点的注册信息并落入本地
Registry
中 - 一旦有诸如
register
,renew
,cancel
请求,则会将这些请求通过线程池自动同步至其他服务节点
启动初始化
Servlet容器初始化,调用eureka环境初始化以及eureka上下文初始化
EurekaBootstrap.java
111行:1
2
3
4
5
6
7
8
9
10
11
12
13
14public void contextInitialized(ServletContextEvent event) {
try {
// 初始化环境
initEurekaEnvironment();
// 初始化上下文
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}初始化上下文的过程中会开始同步其他eureka服务节点的信息
EurekaBootstrap.java
147行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53protected void initEurekaServerContext() throws Exception {
// ...其他逻辑
ApplicationInfoManager applicationInfoManager = null;
// 初始化eureka客户端,用于做为获取其他eureka服务信息的client
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
// 初始化其他服务实例感知的注册器
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
// aws 服务
// ...
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
// 为其他eureka服务注册eurekaNode实例
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
// servlet容器初始化
// ...
// 从其他服务拷贝注册信息
int registryCount = registry.syncUp();
// 等待接收请求
registry.openForTraffic(applicationInfoManager, registryCount);
// 其他操作
// ...
}
同步逻辑:
1 | public int syncUp() { |
- 完成后开始接收请求
过程中同步
以注册同步为例子。
根据请求类型判断执行动作
PeerAwareInstanceRegistry.java
648行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
// 调用PeerEurekaNode的register方法执行注册动作的同步
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;将任务提交至任务处理器
TaskDispatcher
进行PeerEurekaNode.java
135行:1
2
3
4
5
6
7
8
9
10
11long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
// 同步任务信息
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);调用Jersey2执行REST请求
AbstractJersey2EurekaHttpClient.java
85行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
Response response = null;
try {
// 封装请求
Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
addExtraProperties(resourceBuilder);
// 设置x-netflix-discovery-replication头信息为true,其他eureka服务节点接收后会知晓这是个同步请求
addExtraHeaders(resourceBuilder);
// 执行请求并获取结果
response = resourceBuilder
.accept(MediaType.APPLICATION_JSON)
.acceptEncoding("gzip")
.post(Entity.json(info));
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
Eureka自我保护机制
Eureka是Netflix为了解决现有AWS的注册服务无法解决的一些场景而专门研发的。在设计之初,就考虑到高可用特性,防止AWS突然性的大规模断点造成服务不可用的情况设计了Eureka的自我保护机制。
Eureka的自我保护机制设定为:服务总数 每分钟续约数(60s / 客户端续约间隔) 自我保护阈值因子
举个例子说明:
如果某个应用A有100个服务实例,那么按照公式计算,它在一分钟内续约次数必须 >= 170。
如果在上一个分钟内,续约数 > 170,那么服务正常,某个实例就算失败也只会认为是客户端存在问题,会被剔除;
如果在上一个分钟内,续约数 < 170,那么Eureka就会认为是Eureka服务存在问题,会停止剔除流程,保护现有的注册信息,防止服务大规模下线。
Eureka与ZK的比较
CAP模型
Eureka是AP模型,ZK是CP模型。前者强调高可用,即使在某些情况下不同region看到的视图可能不一致;而后者强调的是强一致性,且在超过一定阈值后会造成ZK集群整体不可用
应用场景
场景 | Eureka | ZK |
---|---|---|
数据分发和订阅 | × | √ |
异地大规模集群需求的注册中心 | √ | × |
小规模单机房注册中心 | √ | √ |