Zookeeper:分布式命名服务

来自Wikioe
跳到导航 跳到搜索


关于“分布式命名服务”

“命名服务”是为系统中的资源提供标识能力。

ZooKeeper的命名服务主要是利用 ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。

应用场景

分布式API目录:为分布式系统中各种API接口服务的名称、链接地址,提供类似 JNDI(Java命名和目录接口)中的文件系统的功能。

借助于 ZooKeeper 的树形分层结构就能提供分布式的API调用功能。
  • 著名的 Dubbo 分布式框架就是应用了 ZooKeeper 的分布式的 JNDI 功能,大致的思路为:
    1. 服务提供者(Service Provider)在启动的时候,向 ZooKeeper 上的指定节点“/dubbo/${serviceName}/providers”写入自己的API地址,这个操作就相当于服务的公开。
    2. 服务消费者(Consumer)启动的时候,订阅节点“/dubbo/{serviceName}/providers”下的服务提供者的 URL 地址,获得所有服务提供者的API。


分布式的ID生成器:在分布式系统中,为每一个数据资源提供唯一性的ID标识功能。

在单体服务环境下,通常来说,可以利用数据库的主键自增功能,唯一标识一个数据资源。但是,在大量服务器集群的场景下,依赖单体服务的数据库主键自增生成唯一ID的方式,则没有办法满足高并发和高负载的需求。这时,就需要分布式的ID生成器,保障分布式场景下的ID唯一性。
在分布式系统中,分布式ID生成器的使用场景非常之多:
  • 大量的数据记录,需要分布式ID。
  • 大量的系统消息,需要分布式ID。
  • 大量的请求日志,如RESTful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析。
  • 分布式节点的命名服务,往往也需要分布式ID。


分布式节点的命名:一个分布式系统通常会由很多的节点组成,节点的数量不是固定的,而是不断动态变化的。

比如说,当业务不断膨胀和流量洪峰到来时,大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了,就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。
如何为大量的动态节点命名呢?一种简单的办法是可以通过配置文件,手动为每一个节点命名。但是,如果节点数据量太大,或者说变动频繁,手动命名则是不现实的,这就需要用到分布式节点的命名服务。

ID生成器

在分布式系统环境中,唯一ID系统需要满足以下需求:

  1. 全局唯一:不能出现重复ID。
  2. 高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响。

很明显,传统的数据库自增主键或者单体的自增主键,已经不能满足需求。


可能的分布式ID生成器方案:

  1. Java的UUID。
  2. 分布式缓存Redis生成ID:利用 Redis 的原子操作 INCRINCRBY,生成全局唯一的ID。
  3. Twitter 的 SnowFlake 算法。
  4. ZooKeeper 生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID。
  5. MongoDb 的 ObjectId:【???】
    MongoDB是一个分布式的非结构化 NoSQL 数据库,每插入一条记录会自动生成全局唯一的一个“_id”字段值,它是一个 12 字节的字符串,可以作为分布式系统中全局唯一的ID。


P.S. 关于“UUID”:

UUID 是“Universally Unique Identifier”的缩写,它是在一定的范围内(从特定的名字空间到全球)唯一的机器生成的标识符,所以,UUID 在其他语言中也叫“GUID”。

在Java中,生成UUID的代码:
    String uuid = UUID.randomUUID().toString()

UUID是经由一定的算法机器生成的,为了保证UUID的唯一性,规范定义了包括网卡MAC地址、时间戳、名字空间(Namespace)、随机或伪随机数、时序等元素,以及从这些元素生成UUID的算法。【UUID只能由计算机生成】

优点:UUID的优点是本地生成ID,不需要进行远程调用,时延低,性能高。
缺点:
    1、UUID过长,16字节共128位,通常以36字节长的字符串来表示,在很多应用场景不适用。
    2、UUID没有排序,无法保证趋势递增。

Zookeeper:分布式命名服务

ZooKeeper 实现分布式命名服务(分布式ID、分布式节点命名),主要是利用其顺序节点的特性:

ZooKeeper 的每一个节点都会为它的第一级子节点维护一份顺序编号(自动为创建后的节点路径在末尾加上一个数字),用于记录每个子节点创建的先后顺序,这个顺序编号是分布式同步的,也是全局唯一的。

    例如,在创建节点的时候只需要传入节点“/test_”,ZooKeeper自动会在“test_”后面补充数字顺序,例如“/test_0000000010”。


note:这个顺序值的最大上限就是整型的最大值。


在 ZooKeeper 节点的四种类型中,有两种自动编号的节点:

  1. PERSISTENT_SEQUENTIAL:持久化顺序节点,节点持久有效,可用于实现“分布式ID”。
  2. EPHEMERAL_SEQUENTIAL:临时顺序节点,节点随会话失效而删除,可用于实现“分布式节点命名”。


分布式ID

通过创建ZooKeeper的持久化顺序节点的方法,生成全局唯一的ID。


实现:

package com.crazymakercircle.zk.NameService;

import com.crazymakercircle.zk.ClientFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.ZooKeeper.CreateMode;
/**
*生成分布式ID
**/
public class IDMaker {
	//...省略其他的方法

	/**
	* 创建持久化顺序节点
	* @param pathPefix节点路径
	* @return 创建后的完整路径名称
	*/
	private String createSeqNode(String pathPefix) {
		try {
			// 创建一个ZNode顺序节点
			String destPath = client.create()
					.creatingParentsIfNeeded()
					.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
					.forPath(pathPefix);
			return destPath;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	// 生成ID
	public String makeId(String nodeName) {
		// 创建新的zookeeper节点
		String str = createSeqNode(nodeName);
		if (null == str) {
			return null;
		}
		
		// 截取新节点的末尾序号作为新ID
		int index = str.lastIndexOf(nodeName);
		if (index >= 0) {
			index += nodeName.length();
			return index <= str.length() ? str.substring(index) : "";
		}
		return str;
	}
}


测试:

@Slf4j
public class IDMakerTester {
	@Test
	public void testMakeId() {
		IDMaker idMaker = new IDMaker();
		String nodeName = "/test/IDMaker/ID-";
		
		for (int i = 0; i< 10; i++) {
			String id = idMaker.makeId(nodeName);
			log.info("第"+ i + "个创建的id为:" + id);
		}
	}
}

结果:

第0个创建的id为:0000000010
第1个创建的id为:0000000011
//…..省略其他的输出

分布式节点命名

通过创建ZooKeeper的临时顺序节点的方法,生成集群节点名:

  1. 启动节点服务,连接 ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点。
  2. 在根节点下创建一个临时顺序 ZNode节点,取回 ZNode 的编号把它作为分布式系统中节点的 NODEID。
  3. 如果临时节点太多,可以根据需要删除临时顺序 ZNode 节点。


实现:

/**
* 集群节点的命名服务
**/
public class PeerNode {
	// ZooKeeper客户端
	private CuratorFramework client = null;
	
	private String pathRegistered = null;
	private static PeerNode singleInstance = null;
	
	// 单例模式
	public static PeerNodegetInst() {
		if (null == singleInstance) {
			singleInstance = new PeerNode();
			singleInstance.client = ZKclient.instance.getClient();
			singleInstance.init();
		}
		return singleInstance;
	}
	
	private PeerNode() {
	}
	
	// 初始化,在ZooKeeper中创建当前的分布式节点
	public void init() {
		// 使用标准的前缀,创建父节点,父节点是持久化的(方法省略)
		createParentIfNeeded(ServerUtils.MANAGE_PATH);
	
		// 创建一个ZNode节点
		try {
			pathRegistered = client.create()
					.creatingParentsIfNeeded()
					//创建一个非持久化的临时节点
					//临时节点的前缀,也需要提前定义
					.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
					.forPath(ServerUtils.pathPrefix);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	// 获取节点的编号
	public long getId() {
		String sid = null;
		
		if (null == pathRegistered) {
			throw new RuntimeException("节点注册失败");
		}
		
		int index = pathRegistered.lastIndexOf(ServerUtils.pathPrefix);
		if (index >= 0) {
			index += ServerUtils.pathPrefix.length();
			sid = index <= pathRegistered.length() ? pathRegistered.substring(index) : null;
		}
		
		if (null == sid) {
			throw new RuntimeException("分布式节点错误");
		}
		return Long.parseLong(sid);
	}
}

SnowFlakeID 算法实现(分布式ID算法)

Twitter 的 SnowFlake 算法(雪花算法)是一种著名的分布式服务器用户ID生成算法。

SnowFlake 算法所生成的ID是一个64bit的长整型数字,这个64bit被划分成四个部分:

SnowFlakeID的四个部分.jpg
  1. 第一位:占用1 bit,其值始终是0,没有实际作用。
  2. 时间戳:占用41 bit,精确到毫秒,总共可以容纳约69年的时间。
  3. 工作机器ID:占用10 bit,最多可以容纳1024个节点。
  4. 序列号:占用12 bit,最多可以累加到4095。这个值在同一毫秒同一节点上从0开始不断累加。


如上:在工作节点(工作机器ID位数)达到 1024 顶配的场景下,SnowFlake算法在同一毫秒内最多可以生成:1024*4096=4194304,总计400多万个ID,也就是说,在绝大多数并发场景下都是够用的。

  • 在 SnowFlake 算法中,第三个部分是工作机器ID,可以结合上一节的命名方法,并通过 ZooKeeper 管理 workId,免去了手动频繁修改集群节点去配置机器ID的麻烦。

上面的 bit 数分配只是一个官方的推荐,是可以微调的:

比方说,如果1024的节点数不够,可以增加3个bit,扩大到8192个节点;再比方说,如果每毫秒生成4096个ID比较多,可以从12 bit减小到使用10 bit,则每毫秒生成1024个ID。这样,单个节点 1 秒(1000毫秒)可以生成 1024*1000,也就是100多万个ID,数量也是巨大的;剩下的位数为剩余时间,还剩下 40 bit的时间戳,比原来少1位,则可以持续 32 年。


实现:

package com.crazymakercircle.zk.NameService;

/**
* SnowFlake ID 算法实现
**/
public class SnowflakeIdGenerator {
	// 单例
	public static SnowflakeIdGenerator instance = new SnowflakeIdGenerator();
	
	/**
	* 初始化唯一实例
	*
	* @param workerId节点Id,最大8191
	* @return 这个唯一实例
	*/
	public synchronized void init(long workerId) {
		if (workerId > MAX_WORKER_ID) {
			// ZooKeeper分配的workerId过大
			throw new IllegalArgumentException("woker Id wrong: " + workerId);
		}
		instance.workerId = workerId;
	}
	
	private SnowflakeIdGenerator() {
	}
	
	
	// 开始使用该算法的时间为: 2017-01-01 00:00:00
	private static final long START_TIME = 1483200000000L;

	// worker id 的bit数,最多支持8192个节点
	private static final int WORKER_ID_BITS = 13;
	// 序列号的bit数,支持单节点最高每毫秒的最大ID数为1024
	private final static int SEQUENCE_BITS = 10;

	// 最大的 worker id,8091:-1的补码右移13位, 然后取反
	private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
	// 最大的序列号,1023:-1的补码(二进制数的所有位均为1)右移10位, 然后取反
	private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);

	// worker 节点编号的移位
	private final static long APP_HOST_ID_SHIFT = SEQUENCE_BITS;
	// 时间戳的移位
	private final static long TIMESTAMP_LEFT_SHIFT = ORKER_ID_BITS + APP_HOST_ID_SHIFT;

	// 该项目的worker 节点 id
	private long workerId;

	// 上次生成ID的时间戳
	private long lastTimestamp = -1L;

	// 当前毫秒生成的序列号
	private long sequence = 0L;


	/**
	* Next id long.
	*
	* @return the nextId
	*/
	public Long nextId() {
		return generateId();
	}

	/**
	* 生成唯一id的具体实现
	*/
	private synchronizedlong generateId() {
		long current = System.currentTimeMillis();
		
		if (current < lastTimestamp) {
			// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1
			return -1;
		}
		
		if (current == lastTimestamp) {
			// 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1
			sequence = (sequence + 1) & MAX_SEQUENCE;
			if (sequence == MAX_SEQUENCE) {
				// 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳
				current = this.nextMs(lastTimestamp);
			}
		} else {
			// 当前的时间戳已经是下一个毫秒
			sequence = 0L;
		}
		
		// 更新上次生成ID的时间戳
		lastTimestamp = current;
		
		// 进行移位操作生成int64的唯一ID
		// 时间戳右移动23位
		long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;
		
		// workerId右移动10位
		long workerId = this.workerId<< APP_HOST_ID_SHIFT;
		
		return time | workerId | sequence;
	}
	
	/**
	* 阻塞到下一个毫秒
	*/
	private long nextMs(long timeStamp) {
		long current = System.currentTimeMillis();
		while (current <= timeStamp) {
			current = System.currentTimeMillis();
		}
		return current;
	}
}

上述代码是一个相对比较简单的 SnowFlake 实现版本:

  1. 在单节点上获得下一个ID,使用 Synchronized 控制并发,没有使用 CAS(Compare And Swap,比较并交换)的方式,是因为CAS不适合并发量非常高的场景;
  2. 如果在一台机器上当前毫秒的序列号已经增长到最大值 1023,则使用 while 循环等待直到下一毫秒;
  3. 如果当前时间小于记录的上一个毫秒值,则说明这台机器的时间回拨了,于是阻塞,一直等到下一毫秒。

测试:

package com.crazymakercircle.zk.NameService;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* 测试SnowflakeIdWorker、SnowflakeIdGenerator
**/
@Slf4j
public class SnowflakeIdTest {
	/**
	* 测试用例
	*/
	@Test
	public void snowflakeIdTest() {
		//获取机器节点的id
		long workId = SnowflakeIdWorker.instance.getId();
		//初始化id生成器
		SnowflakeIdGenerator.instance.init(workId);
		
		//创建一个线程池,并发生成id
		ExecutorService es = Executors.newFixedThreadPool(10);
		
		final HashSet idSet = new HashSet();
		Collections.synchronizedCollection(idSet);
		
		long start = System.currentTimeMillis();
		
		log.info(" 开始生产 *");
		for (int i = 0; i< 10; i++)
			es.execute(() -> {
				for (long j = 0; j < 5000000; j++) {
					long id = SnowflakeIdGenerator.instance.nextId();
					synchronized (idSet) {
						idSet.add(id);
					}
				}
			});

		//关闭线程池
		es.shutdown();
		try {
		es.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
		e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		log.info(" 生成 id 结束");
		log.info("* 耗时: " + (end - start) + " ms!");
	}
}

总结

SnowFlake 算法的优点:

  1. 生成 ID 时不依赖于数据库,完全在内存生成,高性能和高可用性。
  2. 容量大,每秒可生成几百万个ID。
  3. ID 呈趋势递增,后续插入数据库的索引树时,性能较高。


SnowFlake 算法的缺点:

  1. 依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成 ID 冲突,或者 ID 乱序。
  2. 在启动之前,如果这台机器的系统时间回拨过,那么有可能出现 ID 重复的危险。