Elasticsearch簡介
什麼是 Elasticsearch?
Elasticsearch 是一個開源的分佈式 RESTful搜索和分析引擎,能夠解決越來越多不同的應用場景。
本文內容
本文主要是介紹了ES GEO數據寫入和空間檢索,ES版本為7.3.1
數據準備
Qgis使用漁網工具,對範圍進行切割,得到網格的Geojson
新建索引設置映射
<code>def set_mapping(es,index_name="content_engine",doc_type_name="en",my_mapping={}): # ignore 404 and 400 es.indices.delete(index=index_name, ignore=[400, 404]) print("delete_index") # ignore 400 cause by IndexAlreadyExistsException when creating an index my_mapping = { "properties": { "location": {"type": "geo_shape"}, "id": {"type": "long"} } } create_index = es.indices.create(index=index_name) mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping, include_type_name=True) print("create_index") if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True: print("Index creation failed...") /<code>
數據插入
使用multiprocessing和elasticsearch.helpers.bulk進行數據寫入,每一萬條為一組寫入,剩下的為一組,然後多線程寫入。分別寫入4731254條點和麵數據。寫入時候使用多核,ssd,合適的批量數據可以有效加快寫入速度,通過這些手段可以在三分鐘左右寫入四百多萬的點或者面數據。
<code>def mp_worker(features): count = 0 es = Elasticsearch(hosts=[ip], timeout=5000) success, _ = bulk(es,features, index=index_name, raise_on_error=True) count += success return count def mp_handler(input_file, index_name, doc_type_name="en"): with open(input_file, 'rb') as f: data = json.load(f) features = data["features"] del data act=[] i=0 count=0 actions = [] for feature in features: action = { "_index": index_name, "_type": doc_type_name, "_source": { "id": feature["properties"]["id"], "location": { "type": "polygon", "coordinates": feature["geometry"]["coordinates"] } } } i=i+1 actions.append(action) if (i == 9500): act.append(actions) count=count+i i = 0 actions = [] if i!=0: act.append(actions) count = count + i del features print('read all %s data ' % count) p = multiprocessing.Pool(4) i=0 for result in p.imap(mp_worker, act): i=i+result print('write all %s data ' % i) /<code>
GEO(point)查詢距離nkm附近的點和範圍選擇
<code>from elasticsearch import Elasticsearch from elasticsearch.helpers import scan import time starttime = time.time() _index = "gis_point" _doc_type = "20190824" ip = "127.0.0.1:9200" # 附近nkm 選擇 _body = { "query": { "bool": { "must": { "match_all": {} }, "filter": { "geo_distance": { "distance": "9km", "location": { "lat": 18.1098857850465471, "lon": 109.1271036098896730 } } } } } } # 範圍選擇 # _body={ # "query": { # "geo_bounding_box": { # "location": { # "top_left": { # "lat": 18.4748659238899933, # "lon": 109.0007435371629470 # }, # "bottom_right": { # "lat": 18.1098857850465471, # "lon": 105.1271036098896730 # } # } # } # } # } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = scan(es, query=_body, scroll="10m", index=_index, timeout="10m") for resp in scanResp: print(resp) endtime = time.time() print(endtime - starttime) /<code>
GEO(shape)範圍選擇
<code>from elasticsearch import Elasticsearch from elasticsearch.helpers import scan import time starttime = time.time() _index = "gis" _doc_type = "20190823" ip = "127.0.0.1:9200" # envelope format, [[minlon,maxlat],[maxlon,minlat]] _body = { "query": { "bool": { "must": { "match_all": {} }, "filter": { "geo_shape": { "location": { "shape": { "type": "envelope", "coordinates": [[108.987103609889, 18.474865923889993], [109.003537162947, 18.40988578504]] }, "relation": "within" } } } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m") for resp in scanResp: print(resp) endtime = time.time() print(endtime - starttime) /<code>
GEO(point)距離聚合
<code>from elasticsearch import Elasticsearch import time starttime = time.time() _index = "gis_point" _doc_type = "20190824" ip = "127.0.0.1:9200" # 距離聚合 _body = { "aggs" : { "rings_around_amsterdam" : { "geo_distance" : { "field" : "location", "origin" : "18.1098857850465471,109.1271036098896730", "ranges" : [ { "to" : 100000 }, { "from" : 100000, "to" : 300000 }, { "from" : 300000 } ] } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search( body=_body, index=_index) for i in scanResp['aggregations']['rings_around_amsterdam']['buckets']: print(i) endtime = time.time() print(endtime - starttime) /<code>
中心點聚合
<code>_body ={ "aggs" : { "centroid" : { "geo_centroid" : { "field" : "location" } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search( body=_body, index=_index) print(scanResp['aggregations']) /<code>
範圍聚合
<code>_body = { "aggs": { "viewport": { "geo_bounds": { "field": "location" } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search(body=_body, index=_index) print(scanResp['aggregations']['viewport']) /<code>
geohash聚合
<code>##低精度聚合,precision代表geohash長度 _body = { "aggregations": { "large-grid": { "geohash_grid": { "field": "location", "precision": 3 } } } } # 高精度聚合,範圍聚合以及geohash聚合 # _body = { # "aggregations": { # "zoomed-in": { # "filter": { # "geo_bounding_box": { # "location": { # "top_left": "18.4748659238899933,109.0007435371629470", # "bottom_right": "18.4698857850465471,108.9971036098896730" # } # } # }, # "aggregations": { # "zoom1": { # "geohash_grid": { # "field": "location", # "precision": 7 # } # } # } # } # } # } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search(body=_body, index=_index) for i in scanResp['aggregations']['large-grid']['buckets']: print(i) #for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']: # print(i) /<code>
切片聚合
<code># 低精度切片聚合,precision代表級別 _body = { "aggregations": { "large-grid": { "geotile_grid": { "field": "location", "precision": 8 } } } } # 高精度切片聚合,範圍聚合以切片聚合 # _body={ # "aggregations" : { # "zoomed-in" : { # "filter" : { # "geo_bounding_box" : { # "location" : { # "top_left": "18.4748659238899933,109.0007435371629470", # "bottom_right": "18.4698857850465471,108.9991036098896730" # } # } # }, # "aggregations":{ # "zoom1":{ # "geotile_grid" : { # "field": "location", # "precision": 18 # } # } # } # } # } # } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search(body=_body, index=_index) for i in scanResp['aggregations']['large-grid']['buckets']: print(i) # for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']: # print(i) /<code>
Elasticsearch和PostGIS相同功能對比
PostGIS最近點查詢
<code>SELECT id,geom, ST_DistanceSphere(geom,'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry) FROM h5 ORDER BY geom 'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry LIMIT 1 /<code>
Elasticsearch最近點查詢
<code>from elasticsearch import Elasticsearch import time starttime = time.time() _index = "gis_point" _doc_type = "20190824" ip = "127.0.0.1:9200" _body={ "sort": [ { "_geo_distance": { "unit": "m", "order": "asc", "location": [ 109.1681036098896730, 18.1299957850465471 ], "distance_type": "arc", "mode": "min", "ignore_unmapped": True } } ], "from": 0, "size": 1, "query": { "bool": { "must": { "match_all": {} } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = es.search(body=_body, index=_index) endtime = time.time() print(endtime - starttime) /<code>
PostGIS範圍查詢
<code>select id,geom,fid FROM public."California" where ST_Intersects(geom,ST_MakeEnvelope(-117.987103609889,33.40988578504,-117.003537162947,33.494865923889993, 4326))=true [-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504] /<code>
Elasticsearch範圍查詢
<code>from elasticsearch import Elasticsearch from elasticsearch.helpers import scan import time starttime = time.time() _index = "gis_california" ip = "127.0.0.1:9200" # envelope format, [[minlon,maxlat],[maxlon,minlat]] _body = { "query": { "bool": { "must": { "match_all": {} }, "filter": { "geo_shape": { "geom": { "shape": { "type": "envelope", "coordinates": [[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]] }, "relation": "INTERSECTS" } } } } } } es = Elasticsearch(hosts=[ip], timeout=5000) scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m") i=0 for resp in scanResp: i=i+1 a=resp print(i) endtime = time.time() print(endtime - starttime) /<code>
兩種場景中PostGIS的性能更好
參考資料:
1.Elasticsearch(GEO)空間檢索查詢
2.Elasticsearch官網
3.PostGIS拆分LineString為segment,point
4.億級“附近的人”,打通“特殊服務”通道
5.PostGIS教程二十二:最近鄰域搜索