大數據 Zookeepe 筆記大全(加強版)收藏+關注+轉發


大數據 Zookeepe 筆記大全(加強版)收藏+關注+轉發


一、概述

ZooKeeper是一個分佈式應用所涉及的分佈式的、開源的協調服務。

是Google的Chubby的開源實現

Zookeeper最早起源於雅虎的研究院的一個研究小組。在當時,研究人員發現,在雅虎內部很多大型的系統需要依賴一個類似的系統進行分佈式協調,但是這些系統往往存在分佈式單點問題。所以雅虎的開發人員就試圖開發一個通用的無單點問題的分佈式協調框架。在立項初期,考慮到很多項目都是用動物的名字來命名的(例如著名的Pig項目),雅虎的工程師希望給這個項目也取一個動物的名字。時任研究院的首席科學家Raghu Ramakrishnan開玩笑說:再這樣下去,我們這兒就變成動物園了。此話一出,大家紛紛表示就叫動物園管理員吧——因為各個以動物命名的分佈式組件放在一起,雅虎的整個分佈式系統看上去就像一個大型的動物園了,而Zookeeper正好用來進行分佈式環境的協調——於是,Zookeeper的名字由此誕生了。


分佈式的引用可以建立在同步配置管理、選舉、分佈式鎖、分組和命名等服務的更高級的實現的基礎之上。ZooKeeper意欲設計一個易於編程的環境,它的文件系統使用我們所熟悉的目錄樹結構。ZooKeeper使用Java所編寫,但是支持Java和C兩種編程語言。

二、ZooKeeper節點

ZooKeeper提供的命名空間與標準的文件系統非常相似。一個名稱是由通過斜線分割開來的路徑名序列所組成的。ZooKeeper中每一個節點都是通過路徑來識別。

ZooKeeper的節點是通過像樹一樣的結構來進行維護,並且並且每一個節點通過路徑標識以及訪問。除此之外,每個節點還擁有自身的一些信息,包括:數據、數據長度、創建時間、修改時間等等。從這樣一類既含有數據,又可以作為路徑表示的節點特點中可以看出,ZooKeeper的節點既可以被看做是一個文件,又可以看做是一個目錄,它同時具有二者的特點。


Znode還具有 原子性操作的特點:命名空間中。每一個Znode的數據將被原子地讀寫。讀操作將讀取與Znode相關的所有數據,寫操作將替換掉所有的數據,除此之外,每一個節點都有一個訪問控制列表,這個訪問控制列表規定了用戶操作的權限。

ZooKeeper節點是有生命週期的,這取決於節點的類型。在ZooKeeper中,節點類型可以分為持久節點(PERSISTENT)、臨時節點(EPHEMERAL),以及時序節點(SEQUENTIAL),具體在節點創建過程中。一般是組合使用,可以生成以下4種節點類型。


  • 持久節點(PERSISTENT)持久節點,是指在節點創建後,就一直存在,直到有刪除操作來主動清楚這個節點——不會因為創建該節點的客戶端端會話失效而消失。
  • 持久順序節點(PERSISTENT_SEQUENTIAL)類節點的基本特性和持久節點類型是一致的。額外的特性是,在ZK中,每個父節點會為他的第一級子節點維護一份時序,會記錄每個子節點創建的先後順序。
  • 臨時節點(EPHEMERAL)與持久節點不同的是,臨時節點的生命週期和客戶端會話綁定。也就是說,如果客戶端會話失效,那麼這個節點就會自動被清除掉。
  • 臨時順序節點(EPHEMERAL_SEQUENTIAL)類節點的基本特性和臨時子節點類型是一致的。額外的 特性是,在ZK中,每個父節點會為他的第一級

三、ZooKeeper使用場景

3.1 配置中心(數據發佈與訂閱)

在分佈式應用中為了實現對分佈式節點的統一配置,通常將服務中的配置文件集中存儲在一個配置服務中,例如 SpringCloud將配置信息存儲在Git/SVN中,Solr Cloud 則將配置數據集中存儲在Zookeeper中。這典型利用了Zookeeper節點的發佈訂閱特性。

3.2 命名服務/服務分組(Naming Service)

命名服務也是分佈式系統中比較常見的一類場景。在分佈式系統 中通過使用命名服務,客戶端應用能夠根據指定名字來獲取資源或者服務的地址,提供者等信息。被命名的實體可以是集群中的機器,提供的服務地址,遠程對象等等——這些我們都可以統稱他們為名字(Name)。其中較為常見的就是一些分佈式服務框架中的服務地址列表。通過調用ZK提供的創建節點的API,能夠很容易常見一個全局唯一的path,這個path就就可以作為一個名稱。繼而實現服務彈性部署。

3.3 分佈式鎖(場景)


Redis分佈式鎖 VS Zookeeper設計分佈式鎖的優勢

  • ZK有Watcher機制,實現簡單,客戶端無需重複讀取數據
  • Redis鎖不穩定,客戶端bug或者超時會導致鎖失效,zk客戶端掛掉znode直接銷燬(臨時節點)
  • zk相對公平,順序節點


四、ZooKeeper安裝

ZooKeeper的安裝模式分為三種,分別為:單機模式(stand-alone)、集群模式和集群偽分佈模式。ZooKeeper單機模式的安裝相對比較簡單,如果第一次接觸ZooKeeper建議安裝ZooKeeper單機模式或者偽集群模式。

下載地址:http://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

4.1 單機模式

zoo.cfg (存在於ZK安裝目錄下的conf目錄下)

<code>tickTime=2000        //session 過期時間為兩倍的 tickTime
dataDir=/var/zookeeper //zk數據文件
clientPort=2181 //外部服務端口/<code>

4.2 啟動

<code>[root@GuoJiafeng01 zookeeper-3.4.6]# ./bin/zkServer.sh  start ./conf/zk.conf
JMX enabled by default
Using config: ./conf/zk.conf
Starting zookeeper ... STARTED

進程名稱:QuorumPeerMain/<code>


五、ZK基本使用

5.1 啟動

<code>Usage: zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}/<code>

5.2 連接

<code>./bin/zkCli.sh -server 192.168.15.130:2181/<code>

5.3 其他指令

<code>connect host:port
get path [watch]
ls path [watch]
set path data [version]
rmr path
quit
printwatches on|off
create [-s] [-e] path data acl 默認持久節點 -s 順序節點 -e 臨時節點
stat path [watch]
close
ls2 path [watch]
history
setAcl path acl
getAcl path
addauth scheme auth
delete path [version]/<code>

5.4 close(關閉Session)

<code>關閉當前連接 ,在命令行頭部會顯示CLOSED/<code>

5.6 connect (鏈接ZooKeeper)

<code>connect localhost:2181/<code>

5.7 ls / ls2 (查看子節點)

<code>[zk: localhost:2181(CONNECTED) 19] ls /
[dubbo, hadoop-ha, zookeeper, baizhi]

--------------------------------------
[zk: localhost:2181(CONNECTED) 20] ls2 /
[dubbo, hadoop-ha, zookeeper, baizhi]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x1001
cversion = 4
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 4/<code>

5.8 create(創建節點)

<code>[zk: localhost:2181(CONNECTED) 15] create /baizhi '1'
Created /baizhi
/<code>

5.9 delete(刪除節點)

<code>[zk: localhost:2181(CONNECTED) 21] delete /baizhi
[zk: localhost:2181(CONNECTED) 22] ls /
[dubbo, hadoop-ha, zookeeper]
/<code>

5.10 rmr (遞歸刪除節點)

<code>[zk: localhost:2181(CONNECTED) 28] ls /baizhi
[gjf]
---------------------------------
[zk: localhost:2181(CONNECTED) 44] rmr /baizhi/gjf
[zk: localhost:2181(CONNECTED) 46] ls /baizhi
[]
/<code>

5.11 quit(退出ZookeeperSHell客戶端)

<code>quit
/<code>

5.12 stat(查看節點狀態)

<code>[zk: localhost:2181(CONNECTED) 51] stat /baizhi
cZxid = 0x1004

ctime = Tue Mar 12 15:28:00 CST 2019
mZxid = 0x1004
mtime = Tue Mar 12 15:28:00 CST 2019
pZxid = 0x100f
cversion = 10
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
/<code>


六、Java API操作

6.1 原生 API

1)Maven依賴

<code> 
<dependency>
<groupid>org.apache.zookeeper/<groupid>
<artifactid>zookeeper/<artifactid>
<version>3.4.6/<version>
<type>pom/<type>
/<dependency>
/<code>

2)相關操作

(1)獲得客戶端對象

<code> private  ZooKeeper zooKeeper;

@Before
public void getCline() throws Exception{

zooKeeper = new ZooKeeper("192.168.134.99:2181",2000,null);

}
/<code>

(2)獲取節點值

<code> @Test
public void getData()throws Exception{

/*獲取節點值*/
byte[] data = zooKeeper.getData("/baizhi/123", null, null);
System.out.println(new String(data));
}

/<code>

(3)獲取節點孩子

<code>  @Test
public void getChild() throws Exception{

/*獲取當前輸入節點下的子節點*/
List<string> children = zooKeeper.getChildren("/baizhi/gjf", null);
children.forEach((a)->{
System.out.println(a);
});

}
/<string>/<code>

(4)創建節點

<code>    @Test
public void createNode() throws Exception{


/*創建持久節點*/

Object value = null;

value = "12312312";


byte[] bytes = ((String) value).getBytes();


String s = zooKeeper.create("/baizhi/123", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(s);




}


/<code>

(5)更新節點值

<code> @Test
public void setData() throws Exception{

Stat stat = zooKeeper.setData("/baizhi/123", new String("123123").getBytes(), -1);

System.out.println(stat);


}
/<code>

(6)判斷節點是否存在

<code> @Test
public void exist() throws Exception{

Stat stat = zooKeeper.exists("/baizhi/13", false);

if (stat==null) {
System.out.println("該節點不存在!");
}else {
System.out.println("該節點存在!");
}

}

/<code>


6.2 zkCline API

1)Mavne依賴

<code><dependency>
<groupid>com.101tec/<groupid>
<artifactid>zkclient/<artifactid>
<version>0.8/<version>
/<dependency>
/<code>

2)核心API

<code> ZkClient client=new ZkClient("ip:port");                         
/<code>

3)相關操作

<code>package test;

import org.I0Itec.zkclient.ZkClient;

import java.util.List;

/**
* Create by GuoJF on 2019/3/12
*/
public class Main {
public static void main(String[] args) {


ZkClient zkClient = new ZkClient("192.168.134.5:2181");


/*
* 獲取根目錄下所有的node信息
*
* */
List<string> children = zkClient.getChildren("/");

children.forEach((a)->{
System.out.println(a);
});

}
}

/<string>/<code>

(1)創建節點

<code>    @Test
public void createNode() {

String result = zkClient.create("/baizhi/001", "001", CreateMode.PERSISTENT);

System.out.println(result);

}

/<code>

(2)獲取節點

<code> @Test
public void getData() {
Object readData = zkClient.readData("/baizhi/001");

System.out.println(readData.toString());

}
/<code>

(3)更新節點值

<code> @Test
public void setData() {
zkClient.writeData("/baizhi/001", "123123");
}
/<code>

(4)刪除節點

<code> @Test
public void delNode() {

boolean delete = zkClient.delete("/baizhi/001");
if (delete) {
System.out.println("刪除成功!");
} else {
System.out.println("刪除失敗或者節點不存在!");
}
}
/<code>

(5)判斷節點是否存在

<code> @Test
public void exist(){

boolean exists = zkClient.exists("/baizhi/001");
if (exists){
System.out.println("該節點已經存在!");
}else {
System.out.println("該節點不存在!");

}

}
/<code>

(6)獲取孩子節點

<code> @Test
public void getChild(){

List<string> children = zkClient.getChildren("/baizhi");

for (String child : children) {

System.out.println(child);
}
}
/<string>/<code>


6.3 Curator API

1)Maven依賴

<code> <dependency>
<groupid>org.apache.curator/<groupid>
<artifactid>curator-framework/<artifactid>
<version>2.7.1/<version>
/<dependency>
/<code>

2)相關操作

(1)獲取客戶端對象

<code>  private CuratorFramework curatorFramework;

@Before
public void getClient() {

/*
* 重連策略 四種實現
* ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed
* */


ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 1000);

curatorFramework = CuratorFrameworkFactory.newClient("192.168.134.99:2181", backoffRetry);

curatorFramework.start();


}

/<code>

(2)創建節點

<code>  @Test
public void createNode() throws Exception {
String s = curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/baizhi/002", new String("123").getBytes());
System.out.println(s);
}
/<code>

(3)獲取節點

<code> @Test
public void getData() throws Exception {

byte[] bytes = curatorFramework.getData().forPath("/baizhi/002");

System.out.println(new String(bytes));
}
/<code>

(4)更新節點值

<code>@Test
public void setData() {

try {
curatorFramework.setData().forPath("/baizhi/001", new String("123123").getBytes());

System.out.println("更新成功!");
} catch (KeeperException.NoNodeException e) {

System.out.println("更新失敗,該節點不存在!");
} catch (Exception e) {
e.printStackTrace();
}



}
/<code>

(5)獲取孩子節點

<code> @Test
public void getChild() throws Exception {
curatorFramework.getChildren().forPath("/baizhi").forEach((child) -> {
System.out.println(child);
});
}
/<code>

(6)刪除節點

<code>    @Test
public void delNode() throws Exception {
curatorFramework.delete().forPath("/baizhi/002");
}
/<code>


6.4 Watcher接口

1)原生API實現Watch接口

在這裡使用的是原生的Apache的Java API

<code>
<dependency>
\t<groupid>org.apache.zookeeper/<groupid>
\t\t<artifactid>zookeeper/<artifactid>
\t<version>3.4.6/<version>
\t<type>pom/<type>
/<dependency>
/<code>

ZooKeeper Watcher監視使客戶端能夠接收來自ZooKeeper服務器的通知,並在發生時處理這些事件。 ZooKeeper Java API提供了一個名為Watcher的公共接口,客戶端事件處理程序類必須實現該接口才能接收有關來自ZooKeeper服務器的事件通知。 以編程方式,使用這種客戶端的應用程序通過向客戶端註冊回調(callback)對象來處理這些事件。


<code>package test;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.UUID;

/**
* Create by GuoJF on 2019/3/12
*/
public class DataUpdater {

private static String hostPort = "192.168.134.5:2181";
private static String zooDataPath = "/baizhi/gjf";
ZooKeeper zk;

public DataUpdater() throws IOException {
try {
zk = new ZooKeeper(hostPort, 2000, null);
} catch (IOException e) {
e.printStackTrace();
}
}

public void run() throws InterruptedException, KeeperException {
while (true) {
String uuid = UUID.randomUUID().toString();
byte zoo_data[] = uuid.getBytes();
zk.setData(zooDataPath, zoo_data, -1);
try {
Thread.sleep(5000); // Sleep for 5 secs
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DataUpdater dataUpdater = new DataUpdater();
dataUpdater.run();
}
}
/<code>
<code>package test;


import org.apache.zookeeper.*;

/**
* Create by GuoJF on 2019/3/13
*/
public class DataWatcher implements Watcher,Runnable {


ZooKeeper zooKeeper;

public DataWatcher() {

try {
zooKeeper = new ZooKeeper("192.168.134.5:2181",2000,this);

if (zooKeeper.exists("/baizhi/gjf",this)==null) {

zooKeeper.create("/baizhi/gjf", "get".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

}catch (Exception e){
e.printStackTrace();
}

}

public static void main(String[] args) {

DataWatcher dataWatcher = new DataWatcher();

dataWatcher.run();

}


@Override
public void process(WatchedEvent event) {

if (event.getType() == Event.EventType.NodeDataChanged){
System.out.println("數據發改變了吧");

try {
byte[] data = zooKeeper.getData("/baizhi/gjf", this, null);

String s = new String(data);


System.out.println(s);

}catch (Exception e){

}

}

}

public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

/<code>

2)ZkClient實現Watcher接口

<code>@Test
public void watcherInterface() throws Exception {
zkClient.subscribeDataChanges("/baizhi/gjf", new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("節點名稱:" + dataPath);
System.out.println("節點名稱:" + data.toString());

}

@Override
public void handleDataDeleted(String dataPath) throws Exception {

}
});


Thread.sleep(Integer.MAX_VALUE);
}

/<code>


3)Curator API實現Watcher接口

ZooKeeper原生的API支持通過註冊Watcher來進行事件監聽,但是Watcher通知是一次性的,因此開發過程中需要反覆註冊Watcher,比較繁瑣。Curator引入了Cache來監聽ZooKeeper服務端的事件。Cache對ZooKeeper事件監聽進行了封裝,能夠自動處理反覆註冊監聽,簡化了ZooKeeper原生API繁瑣的開發過程。

<code>   \t\t <dependency>
<groupid>org.apache.curator/<groupid>
<artifactid>curator-recipes/<artifactid>
<version>2.7.1/<version>
/<dependency>
/<code>


(1)NodeCache

監聽指定的數據變化

<code>        /*
*NodeCache
*
* */

//在註冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理
ExecutorService pool = Executors.newFixedThreadPool(2);

NodeCache nodeCache = new NodeCache(curatorFramework, "/baizhi/gjf", false);

nodeCache.start(true);

nodeCache.getListenable().addListener(new NodeCacheListener() {

@Override
public void nodeChanged() throws Exception {
System.out.println(new String(nodeCache.getCurrentData().getData()));
}
}, pool);

/<code>


(2)PathChildrenCache

監聽指定節點下所有子節點的數據變化

<code>        /**
* 監聽子節點的變化情況
*/
PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, "/baizhi", true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED: " + event.getData().getPath());
break;
default:
break;
}
}
},
pool
);

/<code>


七、Zookeeper ACL


7.1 Shell 操作

zookeeper本身提供了ACL機制,表示為scheme: id:permissions,第一個字段表示採用哪一種機制,第二個id表示用戶,permissions表示相關權限(如只讀,讀寫,管理等)。

7.1 .1 scheme :id 介紹

  • world: 它下面只有一個id, 叫anyone, world:anyone代表任何人,zookeeper中對所有人有權限的結點就是屬於world:anyone的
  • auth: 它不需要id, 只要是通過authentication的user都有權限(zookeeper支持通過kerberos來進行authencation, 也支持username/password形式的authentication),使用auth來設置權限的時候,需要在zk裡註冊一個用戶才可以
  • digest: 它對應的id為username:BASE64(SHA1(password)),它需要先通過username:password形式的authentication
  • ip: 它對應的id為客戶機的IP地址,設置的時候可以設置一個ip段,比如ip:192.168.1.0/16, 表示匹配前16個bit的IP段
  • super: 在這種scheme情況下,對應的id擁有超級權限,可以做任何事情(cdrwa)

7.1 .2 permissions

權限ACL簡寫描述CREATEc可以創建子節點DELETEd可以刪除子節點(僅下一級節點)READr可以讀取節點數據及顯示子節點列表WRITEw可以設置節點數據ADMINa可以設置節點訪問控制列表權限

7.1.3 ACL Shell 命令

命令使用方式描述getAclgetAcl <path>讀取ACL權限setAclsetAcl <path> 設置ACL權限addauthaddauth <scheme> <auth>添加認證用戶/<auth>/<scheme>/<path>/<path>

7.1.4 操作

World scheme

其實默認就是Word Scheme


大數據 Zookeepe 筆記大全(加強版)收藏+關注+轉發


語法
<code>setAcl <path> world:anyone:
/<path>/<code>


<code>#隨便創建一個節點
[zk: localhost:2181(CONNECTED) 61] create /baizhiedu 1
Created /baizhiedu
[zk: localhost:2181(CONNECTED) 62] getAcl /baizhiedu
'world,'anyone
: cdrwa
/<code>


<code>#在創建完成後相關節點,還可以通過setAcl的方式設置相關權限
[zk: localhost:2181(CONNECTED) 64] setAcl的方式設置相關權限 /baizhiedu world:anyone:cdrw
cZxid = 0x1c631
ctime = Tue Jul 09 08:37:06 CST 2019
mZxid = 0x1c631
mtime = Tue Jul 09 08:37:06 CST 2019
pZxid = 0x1c631
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
[zk: localhost:2181(CONNECTED) 67] getAcl /baizhiedu
'world,'anyone
: cdrw
/<code>


IP scheme

對於特定IP適用,其他沒有設置過的IP沒有相關權限

語法
<code>setAcl <path> ip::
/<path>/<code>
<code>[zk: localhost:2181(CONNECTED) 73] setAcl /baizhi01 ip:192.168.123.111:cdrwa
cZxid = 0x1c635
ctime = Tue Jul 09 08:44:14 CST 2019
mZxid = 0x1c635
mtime = Tue Jul 09 08:44:14 CST 2019
pZxid = 0x1c635
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0


[zk: localhost:2181(CONNECTED) 78] getAcl /baizhi01
'ip,'192.168.123.111
: cdrwa

[zk: localhost:2181(CONNECTED) 79] get /baizhi01
Authentication is not valid : /baizhi01
/<code>


Auth scheme

語法


<code>addauth digest <user>:<password> #添加認證用戶 

setAcl <path> auth:<user>:
/<user>/<path>/<password>/<user>/<code>


<code>[zk: localhost:2181(CONNECTED) 81] addauth digest gjf:root
[zk: localhost:2181(CONNECTED) 82] setAcl /baizhi03 auth:gjf:root
cZxid = 0x1c637
ctime = Tue Jul 09 08:47:00 CST 2019
mZxid = 0x1c637
mtime = Tue Jul 09 08:47:00 CST 2019
pZxid = 0x1c637
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

[zk: localhost:2181(CONNECTED) 95] getAcl /baizhi03
'digest,'gjf:bbYGkKPfBgiZDzcwrmVylqDlXnI=
: cdrwa
/<code>


Digest scheme

語法
<code>setAcl <path> digest:<user>:<password>:
/<password>/<user>/<path>/<code>
計算密文
<code>echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64
/<password>/<user>/<code>
<code>[root@GuoJiafeng01 ~]# echo -n gjf:root | openssl dgst -binary -sha1 | openssl base64
bbYGkKPfBgiZDzcwrmVylqDlXnI=

/<code>
<code>[zk: localhost:2181(CONNECTED) 98] create /baizhi04 1 

Created /baizhi04
[zk: localhost:2181(CONNECTED) 99] setAcl /baizhi04 digest:gjf:bbYGkKPfBgiZDzcwrmVylqDlXnI=:a
cZxid = 0x1c641
ctime = Tue Jul 09 08:59:18 CST 2019
mZxid = 0x1c641
mtime = Tue Jul 09 08:59:18 CST 2019
pZxid = 0x1c641
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
[zk: localhost:2181(CONNECTED) 100] getAcl /baizhi04
'digest,'gjf:bbYGkKPfBgiZDzcwrmVylqDlXnI=
: a

# 當前是沒有權限的
[zk: localhost:2181(CONNECTED) 101] get /baizhi04
Authentication is not valid : /baizhi04

# 在當前session中添加認證用戶
[zk: localhost:2181(CONNECTED) 102] addauth digest gjf:root
#就能獲取到相關的權限了
[zk: localhost:2181(CONNECTED) 107] get /baizhi04
1
cZxid = 0x1c641
ctime = Tue Jul 09 08:59:18 CST 2019
mZxid = 0x1c641
mtime = Tue Jul 09 08:59:18 CST 2019
pZxid = 0x1c641
cversion = 0
dataVersion = 0
aclVersion = 2
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0



/<code>


7.2 Java API


<code> @Before
public void getClient() {

/*
* 重連策略 四種實現
* ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed
* */

ACLProvider aclProvider = new ACLProvider() {
private List acl ;
@Override
public List getDefaultAcl() {
if(acl ==null){
ArrayList acl = ZooDefs.Ids.CREATOR_ALL_ACL;
acl.clear();
acl.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", "admin:123") ));
this.acl = acl;
}
return acl;
}

@Override
public List getAclForPath(String path) {
return null;
}
};

ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 1000);

//curatorFramework = CuratorFrameworkFactory.builder().aclProvider(aclProvider).authorization("digest", "admin:123".getBytes()).connectString("192.168.134.99:2181").retryPolicy(backoffRetry).build();
curatorFramework = CuratorFrameworkFactory.newClient("192.168.134.99:2181", backoffRetry);

this.curatorFramework.start();


}

/<code>


八、應用

8.1 分佈式高可用(簡單)

<code>package test.ha;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Create by GuoJF on 2019/5/12
*/
public class App {

public static void main(String[] args) throws Exception {

Integer port = 1233;
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 1000);

CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.134.99:2181", backoffRetry);

curatorFramework.start();

Stat stat = curatorFramework.checkExists().forPath("/baizhi/test01");
if (stat == null) {

curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/baizhi/test01", new String("localhost:" + port).getBytes());
soutFunction(port);
} else {

System.out.println("當前節點已經存在,主機服務為:" + new String(curatorFramework.getData().forPath("/baizhi/test01")));

System.out.println("當前主機自動切換為StandyBy狀態");

}




/*
*NodeCache
*
* */

//在註冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理
ExecutorService pool = Executors.newFixedThreadPool(2);


/**
* 監聽子節點的變化情況
*/
PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, "/baizhi", true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("節點上線: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("節點下線: " + event.getData().getPath());
System.out.println("正在上線其他節點,請稍後。");
if (event.getData().getPath().endsWith("/baizhi/test01")) {
try {
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/baizhi/test01", new String("localhost:" + port).getBytes());

System.out.println("當前節點上線成功");
soutFunction(port);
} catch (Exception e) {

if (e instanceof KeeperException.NodeExistsException) {
System.out.println("當前節點上線失敗,已有其他節點上線");

}

}
}
break;
case CHILD_UPDATED:
System.out.println("節點更新: " + event.getData().getPath());
break;
default:
break;
}
}
},
pool
);


Thread.sleep(Integer.MAX_VALUE);
}


public static void soutFunction(Integer port) {


while (true) {

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("localhost:" + port + ",正在提供服務");
}

}

}

/<code>

8.2 分佈式高可用(優雅)

<code>package test.ha;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Create by GuoJF on 2019/5/12
*/
public class AppPro {


public static void main(String[] args) throws Exception {
final String[] lockip = {null};

final Integer[] port = {1233, 1234, 1235, 1236};


ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 1000);

CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.123.129:2181", backoffRetry);

curatorFramework.start();


List<string> stringList = curatorFramework.getChildren().forPath("/ha");

System.out.println(stringList.size());


if (stringList.size() == 0) {

curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/ha/test01" + port[0], new String("localhost:" + port[0]).getBytes());
soutFunction(port[0]);
} else {

System.out.println("當前節點已經存在,主機服務為:" + lockip);

System.out.println("當前主機自動切換為StandyBy狀態");

}



/*
*NodeCache
*
* */

//在註冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理
ExecutorService pool = Executors.newFixedThreadPool(2);


/**
* 監聽子節點的變化情況
*/
PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, "/ha", true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
final String[] currentPath = {""};

List<string> childers = curatorFramework.getChildren().forPath("/ha");

Collections.sort(childers);

currentPath[0] = childers.get(0);

childrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
//System.out.println("節點上線: " + event.getData().getPath());
break;
case CHILD_REMOVED:
if (event.getData().getPath().endsWith(currentPath[0])) {
System.out.println("節點下線: " + event.getData().getPath());
System.out.println("正在上線其他節點,請稍後。");
String path = curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/ha/test01", new String("localhost:" + port[0]).getBytes());

List<string> childers = curatorFramework.getChildren().forPath("/ha");


Collections.sort(childers);

currentPath[0] = childers.get(0);
if (path.endsWith(childers.get(0))) {


System.out.println("當前節點上線成功");
soutFunction(port[0]);


} else {
System.out.println("當前節點上線失敗!");
System.out.println("刪除當前節點!");
try {
curatorFramework.delete().forPath((path));
} catch (Exception e) {
e.printStackTrace();
}

System.out.println((path) + "節點已經刪除!");
}

}
break;
case CHILD_UPDATED:
System.out.println("節點更新: " + event.getData().getPath());
break;
default:
break;
}
}
},
pool
);

Thread.sleep(Integer.MAX_VALUE);
}


public static void soutFunction(Integer port) {


while (true) {

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("localhost:" + port + ",正在提供服務");
}

}

}

/<string>/<string>/<string>/<code>


九、集群搭建

9.1 集群配置

1)添加添加配置文件

<code>tickTime=2000
dataDir=/home/zk/data
clientPort=2181
initLimit=5
syncLimit=2
server.1=guojiafeng01:2887:3887
server.2=guojiafeng01:2888:3888
server.3=guojiafeng01:2889:3889
/<code>

server.id=host:port:port

指出了不同的zk的服務器的自身的標示,作為集群中一部分及其應該知道集群中其他的機器,在datadir目錄中創建一個文件名為myid的文件,這個文件僅僅含有一行內容,制定的是自身的id值。比如服務器的id是1就在這個文件寫1,第一個port是保持和主機通信,第二個port是做選舉的。

2)新建myid文件

<code>在上述配置文件中對的data目錄下,創建一個myid文件,在節點1中的myid中寫 1 (就一個數字1),在節點2中寫數字2,依次列推 

/<code>

3)複製配置啟動


9.2 配置文件詳解

1)ZK的最小配置

最小配置是指Zookeeper運行所需的最小配置,Zookeeper只需要配置這些項就可以正常的運行Zookeeper。

clientPort

配置ZK監聽客戶端連接的端口


dataDir

內存數據庫快照存放地址,如果沒有指定事務日誌存放地址(dataLogDir),默認也是存放在這個路徑下,建議兩個地址分開存放到不同的設備上。


tickTime

心跳基本時間單位,毫秒級

2)ZK的高級配置(可選)

高級配置是指有的需要直接通過系統屬性進行設置)

dataLogDir

將事務日誌存儲在該路徑下,比較重要,這個日誌存儲的設備效率會影響ZK的寫吞吐量。


globalOutstandingLimit

(Java system property: zookeeper.globalOutstandingLimit)默認值是1000,限定了所有連接到服務器上但是還沒有返回響應的請求個數(所有客戶端請求的總數,不是連接總數),這個參數是針對單臺服務器而言,設定太大可能會導致內存溢出。


preAllocSize

(Java system property: zookeeper.preAllocSize)默認值64M,以KB為單位,預先分配額定空間用於後續transactionlog 寫入,每當剩餘空間小於4K時,就會又分配64M,如此循環。如果SNAP做得比較頻繁(snapCount比較小的時候),那麼請減少這個值。


snapCount

(Java system property: zookeeper.snapCount)默認值100,000,當transaction每達到snapCount/2+rand.nextInt(snapCount/2)時,就做一次SNAPSHOT,默認情況下是50,000~100,000條transactionlog就會做一次,之所以用隨機數是為了避免所有服務器可能在同一時間做snapshot.


traceFile (Java system property: requestTraceFile)


maxClientCnxns

默認值是10,一個客戶端能夠連接到同一個服務器上的最大連接數,根據IP來區分。如果設置為0,表示沒有任何限制。設置該值一方面是為了防止DoS攻擊。


clientPortAddress

與clientPort匹配,表示某個IP地址,如果服務器有多個網絡接口(多個IP地址),如果沒有設置這個屬性,則clientPort會綁定到所有IP地址上,否則只綁定到該設置的IP地址上。


minSessionTimeout

最小的session time時間,默認值是2個tick time,客戶端設置的session time 如果小於這個值,則會被強制協調為這個最小值。


maxSessionTimeout

最大的session time 時間,默認值是20個tick time. ,客戶端設置的session time 如果大於這個值,則會被強制協調為這個最大值。


3)ZK的集群配置選項

electionAlg

領導選舉算法,默認是3(fast leader election,基於TCP),0表示leader選舉算法(基於UDP),1表示非授權快速選舉算法(基於UDP),2表示授權快速選舉算法(基於UDP),目前1和2算法都沒有應用,不建議使用,0算法未來也可能會被幹掉,只保留3(fast leader election)算法,因此最好直接使用默認就好。


initLimit

tickTime的個數,表示在leader選舉結束後,followers與leader同步需要的時間,如果followers比較多或者說leader的數據非常多時,同步時間相應可能會增加,那麼這個值也需要相應增加。當然,這個值也是follower和observer在開始同步leader的數據時的最大等待時間(setSoTimeout)


syncLimit

tickTime的個數,這時間容易和上面的時間混淆,它也表示follower和observer與leader交互時的最大等待時間,只不過是在與leader同步完畢之後,進入正常請求轉發或ping等消息交互時的超時時間。


leaderServes

(Java system property: zookeeper.leaderServes) 如果該值不是no,則表示該服務器作為leader時是需要接受客戶端連接的。為了獲得更高吞吐量,當服務器數三臺以上時一般建議設置為no。


cnxTimeout

(Java system property: zookeeper.cnxTimeout) 默認值是5000,單位ms 表示leaderelection時打開連接的超時時間,只用在算法3中。


4)ZK安全配置

skipACL

(Java systemproperty: zookeeper.skipACL) 默認值是no,忽略所有ACL檢查,相當於開放了所有數據權限給任何人。


forceSync

(Java systemproperty: zookeeper.forceSync) 默認值是yes, 表示transactionlog在commit時是否立即寫到磁盤上,如果關閉這個選項可能會在斷電時丟失信息。


jute.maxbuffer

(Java system property: jute.maxbuffer)默認值0xfffff,單位是KB,表示節點數據最多1M。如果要設置這個值,必須要在所有服務器上都需要設置。


DigestAuthenticationProvider.superDigest

(Java system property only: zookeeper.DigestAuthenticationProvider.superDigest) 設置這個值是為了確定一個超級用戶,它的值格式為

super:<base64encoded> ,一旦當前連接addAuthInfo超級用戶驗證通過,後續所有操作都不會checkACL./<base64encoded>

十、 一致性算法——Paxos、Raft、ZAB

10.1 CAP理論

分佈式系統的CAP理論:理論首先把分佈式系統中的三個特性進行了如下歸納: ● 一致性(C):在分佈式系統中的所有數據備份,在同一時刻是否同樣的值。(等同於所有節點訪問同一份最新的數據副本)

● 可用性(A):在集群中一部分節點故障後,集群整體是否還能響應客戶端的讀寫請求。(對數據更新具備高可用性)

● 分區容錯性(P):以實際效果而言,分區相當於對通信的時限要求。系統如果不能在時限內達成數據一致性,就意味著發生了分區的情況,必須就當前操作在C和A之間做出選擇。


10.2 什麼是一致性?

有兩種一致性模型,分別是弱一致性和強一致性

1)弱一致性

(1)最終一致性

DNS 域名系統(英文:Domain Name System,縮寫:DNS

在域名解析服務提供商上添加域名解析,一般都是10分鐘以內之後才能通過域名訪問到當前添加的網站,因為你的域名解析ip服務只在當前供應商的服務器上添加,同步到全球或者全國需要一定的時間,在其他地區或者其他DNS服務器就有可能訪問不到當前網站,需要等待一段時間。


2 )強一致性

(1)同步

(2)Paxos

(3)Raft(multi-paxos)

(4)ZAB(multi-paxos)


10.3 Paxos

1)明確問題

一致性算法的為什麼會出現,因為數據不能存在單節點上,但是存在集群中有一致性的問題(網絡、宕機等原因)。

但是有其他的算法比如說主從,但是主從的可用性極低,一旦有一個節點宕機則無法對外提供服務。

所以需要在保持一致性的同時儘可能的提高可用性。

2)基本思想

<code>多數派:
每次寫入都要保證寫入N/2的節點,每次讀保證從何大於N/2個幾點中讀取
多數派加順序存取:
在其基礎上,因為有集群的概念,所以還有一個系統正確性需要保證,所以順序非常重要!
/<code>

3)Basic Paxos

Paxos算法是萊斯利·蘭伯特(Leslie Lamport,就是 LaTeX 中的"La",此人現在在微軟研究院)於1990年提出的一種基於消息傳遞的一致性算法。這個算法被認為是類似算法中最有效的。


(1)角色介紹:

Client:產生議題者或者是請求發起者 ,類似於民眾

Proposer :提議者,接收民眾的建議並提出議案,在衝突時有調節作用,類似於議員

Acceptor:決策者或者是投票者,國會成員或者議員等

Learner:最終決策記錄者 備份,對集群一致性沒有影響

(2)基本流程:

(3)全票通過的場景

(4)部分Accepter通過

但是達到了1/2以上的節點,提案通過,反之則不通過

(5)proposer宕機或者失敗

當前提案不會通過,但是有其他的proposer處理(客戶端需要有重試機制)

(6)活鎖問題的產生

https://www.cnblogs.com/sunnyCx/p/8108366.html

timeout解決


4)Multi Paxos

(1)Basix Paxos問題

<code>實現難
效率低(2輪RPC,來回驗證,效率低)
活鎖
/<code>


(2)區別

<code>Leader:唯一propser,選舉產生
所有的請求都經過leader
/<code>


(3)基本流程

(4)簡化角色


10.4 Raft

http://thesecretlivesofdata.com/raft/

1)角色

(1)Leader

(2)Follwer

(3)Candidate

2)三個場景

(1)Leader Election

(2)Log Repliction

(3)Safety


10.5 ZAB

<code>基本與Raft相同
叫法的區別:zab將某個一個leader成為epoch,而Raft稱之為term
心跳方向為leader向follower。ZAB則相反

/<code>

1)協議原理

ZAB主要包括消息廣播和崩潰恢復兩個過程,進一步可以分為三個階段,分別是發現(Discovery)、同步(Synchronization)、廣播(Broadcast)階段。ZAB的每一個分佈式進程會循環執行這三個階段,稱為主進程週期。

  · 發現,選舉產生PL(prospective leader),PL收集Follower epoch(cepoch),根據Follower的反饋,PL產生newepoch(每次選舉產生新Leader的同時產生新epoch)。

  · 同步,PL補齊相比Follower多數派缺失的狀態、之後各Follower再補齊相比PL缺失的狀態,PL和Follower完成狀態同步後PL變為正式Leader(established leader)。

  · 廣播,Leader處理客戶端的寫操作,並將狀態變更廣播至Follower,Follower多數派通過之後Leader發起將狀態變更落地(deliver/commit)。

  在正常運行過程中,ZAB協議會一直運行於階段三來反覆進行消息廣播流程,如果出現崩潰或其他原因導致Leader缺失,那麼此時ZAB協議會再次進入發現階段,選舉新的Leader。

2)運行狀態

 每個進程都有可能處於如下三種狀態之一

  · LOOKING:Leader選舉階段。

  · FOLLOWING:Follower服務器和Leader服務器保持同步狀態。

  · LEADING:Leader服務器作為主進程領導狀態。


大數據 Zookeepe 筆記大全(加強版)收藏+關注+轉發


分享到:


相關文章: