又到了金三银四的时候,大家都按耐不住内心的躁动,我在这里给大家分享下之前面试中遇到的一个知识点(zookeeper应用场景),希望对大家有些帮助。如有不足,欢迎大佬们指点指点。
ZooKeeper 是分布式应用程序的分布式开源协调服务。它公开了一组简单的api,分布式应用程序可以基于这些api实现更高级别的同步、配置维护、分组和命名服务。它被设计为易于编程,并使用一种数据模型,该模型以熟悉的文件系统目录树结构为风格。它在 Java 中运行,并具有 Java 和 C 的绑定。

众所周知,协调服务很难做好。它们特别容易出现竞争条件和死锁等错误。ZooKeeper背后的动机是减轻分布式应用程序从头开始实现协调服务的负担。

下面的代码都需要一个序列化类,所以放在最前面声明
/**
* @author admin
*/
public class MyZkSerializer implements ZkSerializer {
String charset = "UTF-8";
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
@Override
public byte[] serialize(Object obj) throws ZkMarshallingError {
try {
return String.valueOf(obj).getBytes(charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
}
假设咱们的项目部署在5台机子上形成一个集群,那么这5个实例在启动时读取的配置信息应该是一样的,同时一旦咱们的配置信息更改了,需要马上通知到这5个实例上并生效,这就是配置中心的功能。

必要条件
1、znode能存储数据
2、Watch能监听数据改变
实现方式
// 1 将单个配置放到zookeeper上
public void putZk() {
ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
String configPath = "/config1";
String value = "1111111";
if (client.exists(configPath)) {
client.writeData(configPath, value);
} else {
client.createPersistent(configPath, value);
}
client.close();
}
// 需要配置的服务都从zk上取,并注册watch来实时获得配置更新
public void getConfigFromZk() {
ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
String configPath = "/config1";
String value = client.readData(configPath);
System.out.println("从zk读到配置config1的值为:" + value);
// 监控配置的更新,基于watch实现发布订阅功能
client.subscribeDataChanges(configPath, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// TODO 配置删除业务处理
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("获得更新的配置值:" + data);
}
});
// 这里只是为演示实时获取到配置值更新而加的等待。实际项目应用中根据具体场景写(可用阻塞方式)
try {
Thread.sleep(5 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 将配置文件的内容存放到zk节点上
public void putConfigFile2ZK() throws IOException {
File f = new File(this.getClass().getResource("/config.xml").getFile());
FileInputStream fin = new FileInputStream(f);
byte[] datas = new byte[(int) f.length()];
fin.read(datas);
fin.close();
ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new BytesPushThroughSerializer());
String configPath = "/config2";
if (client.exists(configPath)) {
client.writeData(configPath, datas);
} else {
client.createPersistent(configPath, datas);
}
client.close();
}
获取整个配置文件的方式跟步骤1类似,只不过需要解析对应的配置文件而已。
注册中心主要存储注册实例应用的名称和ip地址,供其他服务通过RPC来调用,其他服务只关心你的服务名是啥,而不必关心你的服务器地址对不对,有没有上线。

首先是服务发现问题,当一个实例启动后会向zookeeper创建一个临时节点,并存入自己的服务信息(包括应用名和ip等),其他服务通过zookeeper拿到该实例的注册信息即可调用。
一旦该服务宕机了或者主动下线,那么该临时节点则会被删除,其他服务通过watch监听到下线通知,也就不会在去调用该服务。
在一个主从部署的集群里,一般master实例负责所有请求的读写功能,其他slave实例同步master的数据,一旦master节点不可用了,那么就需要从他的slave实例中重新选举一个节点作为master实例。


首先是实例去竞争创建临时决定(Master节点),谁创建成功谁就是master,否则就是slave。
同时所有的实例都需要去servers节点(临时节点)注册自己的服务信息,方便通过该节点获取到所有在线的实例,有点类似注册中心的意思。
下面咱们通过代码来模拟一下master选举
/**
* @author yinfeng
*/
public class Server {
private final String cluster;
private final String name;
private final String address;
private final String path, value;
private String master;
public Server(String cluster, String name, String address) {
super();
this.cluster = cluster;
this.name = name;
this.address = address;
path = "/" + this.cluster + "Master";
value = "name:" + name + " address:" + address;
final ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
final Thread thread = new Thread(() -> {
electionMaster(client);
});
thread.setDaemon(true);
thread.start();
}
/**
* 选举方法
**/
public void electionMaster(ZkClient client) {
try {
client.createEphemeral(path, value);
master = client.readData(path);
System.out.println(value + "创建节点成功,成为Master");
} catch (ZkNodeExistsException e) {
master = client.readData(path);
System.out.println("Master为:" + master);
}
// 为阻塞自己等待而用
final CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(path, listener);
// 让自己阻塞
if (client.exists(path)) {
try {
cdl.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// 醒来后,取消watcher
client.unsubscribeDataChanges(path, listener);
// 递归调自己(下一次选举)
electionMaster(client);
}
}
咱们通过启动多个服务来看看是否测试成功
public static void main(String[] args) {
// 测试时,依次开启多个Server实例java进程,然后停止获取的master的节点,看谁抢到Master
Server s = new Server("cluster1", "server1", "192.168.1.11:8991");
Server s1 = new Server("cluster1", "server2", "192.168.1.11:8992");
Server s2 = new Server("cluster1", "server3", "192.168.1.11:8993");
Server s3 = new Server("cluster1", "server4", "192.168.1.11:8994");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

可以看到功能一切正常
队列的定义是先进先出,而在分布式环境下保证先进先出的队列就是分布式队列,有点类似于消息队列。

由上图可知,zookeeper主要通过顺序节点来保证队列的先进先出。
分布式锁指的是控制分布式系统不同进程共同访问共享资源的一种锁的实现。 如果在不同的系统或同一个系统的不同主机之间共享和竞争某个临界资源,往往需要互斥来防止彼此干扰,避免出现脏数据或非业务数据,保证数据一致性。
实现原理是zookeeper节点不可重名和watch的监听通知机制,使用临时节点主要是为了避免获取锁的节点由于异常原因无法释放锁而导致出现死锁情况。

竞争锁流程如下图:

代码实现如下
/**
* @author yinfeng
*/
public class ZKDistributeLock implements Lock {
private String lockPath;
private ZkClient client;
// 锁重入计数
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
public ZKDistributeLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
}
@Override
public boolean tryLock() {
// 锁重入不会阻塞
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 0) {
this.reentrantCount.set(++count);
return true;
}
}
// 创建节点
try {
client.createEphemeral(lockPath);
this.reentrantCount.set(1);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
@Override
public void unlock() {
// 重入释进行放锁处理
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 1) {
this.reentrantCount.set(--count);
return;
} else {
this.reentrantCount.set(null);
}
}
client.delete(lockPath);
}
@Override
public void lock() {
// 如果获取不到锁,阻塞等待
if (!tryLock()) {
// 没获得锁,阻塞自己
waitForLock();
// 再次尝试
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("----收到节点被删除了-------------");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(lockPath, listener);
// 阻塞自己
if (this.client.exists(lockPath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
client.unsubscribeDataChanges(lockPath, listener);
}
@Override
public void lockInterruptibly() {
}
@Override
public boolean tryLock(long time, TimeUnit unit) {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
咱们在写个测试类试一下效果,通过多线程来模拟多实例竞争锁
public static void main(String[] args) {
// 并发数
int currency = 50;
// 循环屏障
final CyclicBarrier cb = new CyclicBarrier(currency);
// 多线程模拟高并发
for (int i = 0; i < currency; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
// 等待一起出发
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁!");
try {
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " 释放锁!");
}
}
).start();
}
}

可以看到功能是正常的,但也有个很明显的问题,就是一旦释放锁之后所有的实例(线程)都会收到通知然后去重新竞争锁,当实例的数量达到一定程度之后,那么势必会对zookeeper造成很大的带宽和性能消耗,严重的话可能会把zookeeper集群搞挂了,这种情况也叫惊群效应,所以只通过顺序节点实现分布式锁还是有一定的问题的,下面咱们再来优化一下。
既然通过临时节点会造成惊群效应,那么咱们是否能将临时和顺序节点结合起来,通过最小的那个zNode节点来视为获得锁的标志呢?
答案是肯定能的,当释放锁时只通知他的下一个节点即可,完美的避免了惊群效应的发生。
原理图如下

流程图如下

接着咱们通过代码来实现吧
/**
* @author yinfeng
*/
public class ZKDistributeImproveLock implements Lock {
/**
* 利用临时顺序节点来实现分布式锁
* 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
* 释放锁:删除自己创建的临时顺序节点
*/
private final String lockPath;
private final ZkClient client;
private ThreadLocal<String> currentPath = new ThreadLocal<>();
private ThreadLocal<String> beforePath = new ThreadLocal<>();
/**
* 锁重入计数
*/
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
public ZKDistributeImproveLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(lockPath)) {
try {
this.client.createPersistent(lockPath);
} catch (ZkNodeExistsException ignored) {
}
}
}
@Override
public boolean tryLock() {
// 重入则直接返回获得锁成功
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 0) {
this.reentrantCount.set(++count);
return true;
}
}
if (this.currentPath.get() == null) {
currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
}
// 获得所有的子节点
List<String> children = this.client.getChildren(lockPath);
// 排序list
Collections.sort(children);
// 判断当前节点是否是最小的
if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
this.reentrantCount.set(1);
return true;
} else {
// 取到前一个
// 得到字节的索引号
int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
beforePath.set(lockPath + "/" + children.get(curIndex - 1));
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
// 阻塞等待
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(this.beforePath.get(), listener);
// 让自己阻塞
if (this.client.exists(this.beforePath.get())) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 醒来后,取消watcher
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}
@Override
public void unlock() {
// 重入的释放锁处理
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 1) {
this.reentrantCount.set(--count);
return;
} else {
this.reentrantCount.set(null);
}
}
// 删除节点
this.client.delete(this.currentPath.get());
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
最后咱们再来测试一下
public static void main(String[] args) {
// 并发数
int currency = 50;
// 循环屏障
final CyclicBarrier cb = new CyclicBarrier(currency);
// 多线程模拟高并发
for (int i = 0; i < currency; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
// 等待一起出发
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁!");
try {
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " 释放锁!");
}
}).start();
}
}

可以看到功能是正常的,同时在释放锁的时候只通知了下一节点,没有出现惊群效应,非常完美。
在微服务和分布式的时代,zookeeper作为协调服务的代表,在面试中很容易被问到,希望大家能掌握这方面的知识,提高自己的核心竞争力,在谈薪的时候拿到最高的那个区间。
最后,外出打工不易,希望各位兄弟找到自己心仪的工作,虎年发发发! 也希望兄弟们能关注、点赞、收藏、评论支持一波,非常感谢大家!
对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl
我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此
我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我是一个Rails初学者,但我想从我的RailsView(html.haml文件)中查看Ruby变量的内容。我试图在ruby中打印出变量(认为它会在终端中出现),但没有得到任何结果。有什么建议吗?我知道Rails调试器,但更喜欢使用inspect来打印我的变量。 最佳答案 您可以在View中使用puts方法将信息输出到服务器控制台。您应该能够在View中的任何位置使用Haml执行以下操作:-puts@my_variable.inspect 关于ruby-on-rails-如何在我的R
是否可以在应用程序中包含的gem代码中知道应用程序的Rails文件系统根目录?这是gem来源的示例:moduleMyGemdefself.included(base)putsRails.root#returnnilendendActionController::Base.send:include,MyGem谢谢,抱歉我的英语不好 最佳答案 我发现解决类似问题的解决方案是使用railtie初始化程序包含我的模块。所以,在你的/lib/mygem/railtie.rbmoduleMyGemclassRailtie使用此代码,您的模块将在
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
我们目前正在为ROR3.2开发自定义cms引擎。在这个过程中,我们希望成为我们的rails应用程序中的一等公民的几个类类型起源,这意味着它们应该驻留在应用程序的app文件夹下,它是插件。目前我们有以下类型:数据源数据类型查看我在app文件夹下创建了多个目录来保存这些:应用/数据源应用/数据类型应用/View更多类型将随之而来,我有点担心应用程序文件夹被这么多目录污染。因此,我想将它们移动到一个子目录/模块中,该子目录/模块包含cms定义的所有类型。所有类都应位于MyCms命名空间内,目录布局应如下所示:应用程序/my_cms/data_source应用程序/my_cms/data_ty