用實戰演練如何通過布隆過濾器防止緩存擊穿

來源:https://mp.weixin.qq.com/mp/profile_ext?action=home&__biz=MzI0MzQyMTYzOQ==&scene=161#wechat_redirect

為什麼引入

我們的業務中經常會遇到穿庫的問題,通常可以通過緩存解決。如果數據維度比較多,結果數據集合比較大時,緩存的效果就不明顯了。

因此為了解決穿庫的問題,我們引入Bloom Filter。

適合的場景

  • 數據庫防止穿庫 Google Bigtable,Apache HBase和Apache Cassandra以及Postgresql 使用BloomFilter來減少不存在的行或列的磁盤查找。避免代價高昂的磁盤查找會大大提高數據庫查詢操作的性能。如同一開始的業務場景。如果數據量較大,不方便放在緩存中。需要對請求做攔截防止穿庫。
  • 緩存宕機 緩存宕機的場景,使用布隆過濾器會造成一定程度的誤判。原因是除了Bloom Filter 本身有誤判率,宕機之前的緩存不一定能覆蓋到所有DB中的數據,當宕機後用戶請求了一個以前從未請求的數據,這個時候就會產生誤判。當然,緩存宕機時使用布隆過濾器作為應急的方式,這種情況應該也是可以忍受的。
  • WEB攔截器 相同請求攔截防止被攻擊。用戶第一次請求,將請求參數放入BloomFilter中,當第二次請求時,先判斷請求參數是否被BloomFilter命中。可以提高緩存命中率
  • 惡意地址檢測 chrome 瀏覽器檢查是否是惡意地址。首先針對本地BloomFilter檢查任何URL,並且僅當BloomFilter返回肯定結果時才對所執行的URL進行全面檢查(並且用戶警告,如果它也返回肯定結果)。
  • 比特幣加速 bitcoin 使用BloomFilter來加速錢包同步。

開源項目地址:https://github.com/luw2007/bloomfilter

我們先看看一般業務緩存流程:

用實戰演練如何通過布隆過濾器防止緩存擊穿

先查詢緩存,緩存不命中再查詢數據庫。然後將查詢結果放在緩存中即使數據不存在,也需要創建一個緩存,用來防止穿庫。

這裡需要區分一下數據是否存在。如果數據不存在,緩存時間可以設置相對較短,防止因為主從同步等問題,導致問題被放大。

這個流程中存在薄弱的問題是,當用戶量太大時,我們會緩存大量數據空數據,並且一旦來一波冷用戶,會造成雪崩效應。

對於這種情況,我們產生第二個版本流程:redis過濾冷用戶緩存流程

用實戰演練如何通過布隆過濾器防止緩存擊穿

我們將數據庫裡面中命中的用戶放在redis的set類型中,設置不過期。這樣相當把redis當作數據庫的索引,只要查詢redis,就可以知道是否數據存在。

redis中不存在就可以直接返回結果。如果存在就按照上面提到一般業務緩存流程處理。

聰明的你肯定會想到更多的問題:

  1. redis本身可以做緩存,為什麼不直接返回數據呢?
  2. 如果數據量比較大,單個set,會有性能問題?
  3. 業務不重要,將全量數據放在redis中,佔用服務器大量內存。投入產出不成比例?

問題1需要區分業務場景,結果數據少,我們是可以直接使用redis作為緩存,直接返回數據。結果比較大就不太適合用redis存放了。比如ugc內容,一個評論裡面可能存在上萬字,業務字段多。

redis使用有很多技巧。bigkey 危害比較大,無論是擴容或縮容帶來的內存申請釋放, 還是查詢命令使用不當導致大量數據返回,都會影響redis的穩定。這裡就不細談原因及危害了。

解決bigkey 方法很簡單。我們可以使用hash函數來分桶,將數據分散到多個key中。減少單個key的大小,同時不影響查詢效率。

問題3是redis存儲佔用內存太大。因此我們需要減少內存使用。重新思考一下引入redis的目的。redis像一個集合,整個業務就是驗證請求的參數是否在集合中。

用實戰演練如何通過布隆過濾器防止緩存擊穿

這個結構就像洗澡的時候用的雙向閥門:左邊熱水,右邊冷水。

大部分的編程語言都內置了filter。拿python舉例,filter函數用於過濾序列, 過濾掉不符合條件的元素,返回由符合條件元素組成的列表。

我們看個例子:

<code>$ python2
Python 2.7.10 (default, Oct  6 2017, 22:29:07)
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.31)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> s = {2, 4}
>>> filter(lambda x:x in s, [0, 1, 2])
[2]/<code>

集合s中存在 2,4兩個數字,我們需要查詢 0,1,2 那些在集合s中。 lambda x:x in s構造一個匿名函數,判斷入參x是否在集合s中。過濾器filter依次對列表中的數字執行匿名函數。最終返回列表[2]。

redis中實現set用了兩種結構:intset和hash table。非數字或者大量數字時都會退化成hash table。那麼是否好的算法可以節省hash table的大小呢?

其實早在1970年由Burton Howard Bloom提出的布隆過濾器(英語:Bloom Filter)。它實際上是一個很長的二進制向量和一系列隨機映射函數。布隆過濾器可以用於檢索一個元素是否在一個集合中。

它的優點是空間效率和查詢時間都遠遠超過一般的算法, 缺點是有一定的誤識別率和刪除困難。

BloomFilter原理

我們常見的將業務字段拼接之後md5,放在一個集合中。md5生成一個固定長度的128bit的串。如果我們用bitmap來表示,則需要

<code>2**128 = 340282366920938463463374607431768211456 bit/<code>

判斷一個值在不在,就變成在這個bitmap中判斷所在位是否為1。但是我們全世界的機器存儲空間也無法存儲下載。因此我們只能分配有限的空間來存儲。比如:

用實戰演練如何通過布隆過濾器防止緩存擊穿

當只有一個hash函數時:很容易發生衝突。

用實戰演練如何通過布隆過濾器防止緩存擊穿

可以看到上面1和2的hash結果都是7,發生衝突。如果增加hash函數,會發生什麼情況?

用實戰演練如何通過布隆過濾器防止緩存擊穿

我們使用更多的hash函數和更大的數據集合來測試。得到下面這張表

用實戰演練如何通過布隆過濾器防止緩存擊穿

由此可以看到當增加hash方法能夠有效的降低碰撞機率。比較好的數據如下:

用實戰演練如何通過布隆過濾器防止緩存擊穿

但是增加了hash方法之後,會降低空間的使用效率。當集合佔用總體空間達到25%的時候, 增加hash 的效果已經不明顯

用實戰演練如何通過布隆過濾器防止緩存擊穿

上面的使用多個hash方法來降低碰撞就是BloomFilter的核心思想。

算法優點:

  • 數據空間小,不用存儲數據本身。

算法本身缺點:

  • 元素可以添加到集合中,但不能被刪除。
  • 匹配結果只能是“絕對不在集合中”,並不能保證匹配成功的值已經在集合中。
  • 當集合快滿時,即接近預估最大容量時,誤報的概率會變大。
  • 數據佔用空間放大。一般來說,對於1%的誤報概率,每個元素少於10比特,與集合中的元素的大小或數量無關。查詢過程變慢,hash函數增多,導致每次匹配過程,需要查找多個位(hash個數)來確認是否存在。

對於BloomFilter的優點來說,缺點都可以忽略。畢竟只需要kN的存儲空間就能存儲N個元素。空間效率十分優秀。

如何使用BloomFilter

BloomFilter 需要一個大的bitmap來存儲。鑑於目前公司現狀,最好的存儲容器是redis。從github topics: bloom-filter中經過簡單的調研。

redis集成BloomFilter方案:

  • 原生python 調用setbit 構造 BloomFilter
  • lua腳本
  • Rebloom - Bloom Filter Module for Redis (注:redis Module在redis4.0引入)
  • 使用hiredis 調用redis pyreBloom

原生python 方法太慢,lua腳本和module 部署比較麻煩。於是我們推薦使用pyreBloom,底層使用。

<code>pyreBloom:master λ ls
Makefile      bloom.h       bloom.pxd     murmur.c      pyreBloom.pyx
bloom.c       bloom.o       main.c        pyreBloom.c/<code>

從文件命名上可以看到bloom 使用c編寫。pyreBloom 使用cython編寫。

bloom.h 裡面實現BloomFilter的核心邏輯,完成與redis server的交互;hash函數;添加,檢查和刪除方法的實現。

用實戰演練如何通過布隆過濾器防止緩存擊穿

pyreBloom.pyx

import math

import random

cimport bloom

class pyreBloomException(Exception):

'''Some sort of exception has happened internally'''

pass

cdef class pyreBloom(object):

cdef bloom.pyrebloomctxt context

cdef bytes key

property bits:

def __get__(self):

return self.context.bits

property hashes:

def __get__(self):

return self.context.hashes

def __cinit__(self, key, capacity, error, host='127.0.0.1', port=6379,

password='', db=0):

self.key = key

if bloom.init_pyrebloom(&self.context, self.key, capacity,

error, host, port, password, db):

raise pyreBloomException(self.context.ctxt.errstr)

def __dealloc__(self):

bloom.free_pyrebloom(&self.context)

def delete(self):

bloom.delete(&self.context)

def put(self, value):

if getattr(value, '__iter__', False):

r = [bloom.add(&self.context, v, len(v)) for v in value]

r = bloom.add_complete(&self.context, len(value))

else:

bloom.add(&self.context, value, len(value))

r = bloom.add_complete(&self.context, 1)

if r < 0:

raise pyreBloomException(self.context.ctxt.errstr)

return r

def add(self, value):

return self.put(value)

def extend(self, values):

return self.put(values)

def contains(self, value):

#If the object is 'iterable'...

if getattr(value, '__iter__', False):

r = [bloom.check(&self.context, v, len(v)) for v in value]

r = [bloom.check_next(&self.context) for i in range(len(value))]

if (min(r) < 0):

raise pyreBloomException(self.context.ctxt.errstr)

return [v for v, included in zip(value, r) if included]

else:

bloom.check(&self.context, value, len(value))

r = bloom.check_next(&self.context)

if (r < 0):

raise pyreBloomException(self.context.ctxt.errstr)

return bool(r)

def __contains__(self, value):

return self.contains(value)

def keys(self):

'''Return a list of the keys used in this bloom filter'''

return [self.context.keys[i] for i in range(self.context.num_keys)]

原生pyreBloom方法:

cdef class pyreBloom(object):cdef bloom.pyrebloomctxt contextcdef bytesproperty bits:property hashes:// 使用的hash方法數def delete(self):// 刪除,會在redis中刪除def put(self, value):// 添加 底層方法, 不建議直接調用def add(self, value):// 添加單個元素,調用put方法def extend(self, values):// 添加一組元素,調用put方法def contains(self, value):// 檢查是否存在,當`value`可以迭代時,返回`[value]`, 否則返回`bool`def keys(self):// 在redis中存儲的key列表

由於pyreBloom使用hiredis庫,本身沒有重連等邏輯,於是做了簡單的封裝。

#coding=utf-8

'''

bloom filter 基礎模塊

可用方法:

extend, keys, contains, add, put, hashes, bits, delete

使用方法:

>>> class TestModel(BaseModel):

... PREFIX = "bf:test"

>>> t = TestModel()

>>> t.add('hello')

1

>>> t.extend(['hi', 'world'])

2

>>> t.contains('hi')

True

>>> t.delete()

'''

import logging

from six import PY3 as IS_PY3

from pyreBloom import pyreBloom, pyreBloomException

from BloomFilter.utils import force_utf8

class BaseModel(object):

'''

bloom filter 基礎模塊

參數:

SLOT: 可用方法類型

PREFIX: redis前綴

BF_SIZE: 存儲最大值

BF_ERROR: 允許的出錯率

RETRIES: 連接重試次數

host: redis 服務器IP

port: redis 服務器端口

db: redis 服務器DB

_bf_conn: 內部保存`pyreBloom`實例

'''

SLOT = {'add', 'contains', 'extend', 'keys', 'put', 'delete',

'bits', 'hashes'}

PREFIX = ""

BF_SIZE = 100000

BF_ERROR = 0.01

RETRIES = 2

def __init__(self, redis=None):

'''

初始化redis配置

:param redis: redis 配置

'''

#這裡初始化防止類靜態變量多個繼承類複用,導致數據被汙染

self._bf_conn = None

self._conf = {

'host': '127.0.0.1', 'password': '',

'port': 6379, 'db': 0

}

if redis:

for k, v in redis.items():

if k in self._conf:

self._conf[k] = redis[k]

self._conf = force_utf8(self._conf)

@property

def bf_conn(self):

'''

初始化pyreBloom

'''

if not self._bf_conn:

prefix = force_utf8(self.PREFIX)

logging.debug(

'pyreBloom connect: redis://%s:%s/%s, (%s%s%s)',

self._conf['host'], self._conf['port'], self._conf['db'],

prefix, self.BF_SIZE, self.BF_ERROR,

)

self._bf_conn = pyreBloom(

prefix, self.BF_SIZE, self.BF_ERROR, **self._conf)

return self._bf_conn

def __getattr__(self, method):

'''調用pyrebloom方法

沒有枚舉的方法將從`pyreBloom`中獲取

:param method:

:return: pyreBloom.{method}

'''

#只提供內部方法

if method not in self.SLOT:

raise NotImplementedError()

#捕獲`pyreBloom`的異常, 打印必要的日誌

def catch_error(*a, **kwargs):

'''多次重試服務'''

args = force_utf8(a)

kwargs = force_utf8(kwargs)

for _ in range(self.RETRIES):

try:

func = getattr(self.bf_conn, method)

res = func(*args, **kwargs)

#python3 返回值和python2返回值不相同,

#手工處理返回類型

if method == 'contains' and IS_PY3:

if isinstance(res, list):

return [i.decode('utf8') for i in res]

return res

except pyreBloomException as error:

logging.warn(

'pyreBloom Error:%s%s', method, str(error))

self.reconnect()

if _ == self.RETRIES:

logging.error('pyreBloom Error')

raise error

return catch_error

def __contains__(self, item):

'''跳轉__contains__方法

:param item: 查詢元素列表/單個元素

:type item: list/basestring

:return: [bool...]/bool

'''

return self.contains(item)

def reconnect(self):

'''

重新連接bloom

`pyreBloom` 連接使用c driver,沒有提供timeout參數,使用了內置的timeout

同時為了保證服務的可靠性,增加了多次重試機制。

struct timeval timeout = { 1, 5000 };

ctxt->ctxt = redisConnectWithTimeout(host, port, timeout);

del self._bf_conn 會調用`pyreBloom`內置的C的del方法,會關閉redis連接

'''

if self._bf_conn:

logging.debug('pyreBloom reconnect')

del self._bf_conn

self._bf_conn = None

_ = self.bf_conn

進階:計數過濾器(Counting Filter)

提供了一種在BloomFilter上實現刪除操作的方法,而無需重新重新創建過濾器。在計數濾波器中,陣列位置(桶)從單個位擴展為n位計數器。實際上,常規布隆過濾器可以被視為計數過濾器,其桶大小為一位。

插入操作被擴展為遞增桶的值,並且查找操作檢查每個所需的桶是否為非零。然後,刪除操作包括遞減每個桶的值。

存儲桶的算術溢出是一個問題,並且存儲桶應該足夠大以使這種情況很少見。如果確實發生,則增量和減量操作必須將存儲區設置為最大可能值,以便保留BloomFilter的屬性。

計數器的大小通常為3或4位。因此,計算布隆過濾器的空間比靜態布隆過濾器多3到4倍。相比之下, Pagh,Pagh和Rao(2005)以及Fan等人的數據結構。(2014)也允許刪除但使用比靜態BloomFilter更少的空間。

計數過濾器的另一個問題是可擴展性有限。由於無法擴展計數布隆過濾器表,因此必須事先知道要同時存儲在過濾器中的最大鍵數。一旦超過表的設計容量,隨著插入更多密鑰,誤報率將迅速增長。

Bonomi等人。(2006)引入了一種基於d-left散列的數據結構,它在功能上是等效的,但使用的空間大約是計算BloomFilter的一半。此數據結構中不會出現可伸縮性問題。一旦超出設計容量,就可以將密鑰重新插入到雙倍大小的新哈希表中。

Putze,Sanders和Singler(2007)的節省空間的變體也可用於通過支持插入和刪除來實現計數過濾器。

Rottenstreich,Kanizo和Keslassy(2012)引入了一種基於變量增量的新通用方法,該方法顯著提高了計算布隆過濾器及其變體的誤報概率,同時仍支持刪除。

與計數布隆過濾器不同,在每個元素插入時,散列計數器以散列變量增量而不是單位增量遞增。要查詢元素,需要考慮計數器的確切值,而不僅僅是它們的正面性。如果由計數器值表示的總和不能由查詢元素的相應變量增量組成,則可以將否定答案返回給查詢。


分享到:


相關文章: