Elasticsearch(GEO)數據寫入和空間檢索

Elasticsearch簡介

什麼是 Elasticsearch?

Elasticsearch 是一個開源的分佈式 RESTful搜索和分析引擎,能夠解決越來越多不同的應用場景。

本文內容

本文主要是介紹了ES GEO數據寫入和空間檢索,ES版本為7.3.1

數據準備

Qgis使用漁網工具,對範圍進行切割,得到網格的Geojson

新建索引設置映射

<code>def set_mapping(es,index_name="content_engine",doc_type_name="en",my_mapping={}):

# ignore 404 and 400

es.indices.delete(index=index_name, ignore=[400, 404])

print("delete_index")

# ignore 400 cause by IndexAlreadyExistsException when creating an index

my_mapping = {

"properties": {

"location": {"type": "geo_shape"},

"id": {"type": "long"}

}

}

create_index = es.indices.create(index=index_name)

mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping, include_type_name=True)

print("create_index")

if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:

print("Index creation failed...")
/<code>

數據插入

使用multiprocessing和elasticsearch.helpers.bulk進行數據寫入,每一萬條為一組寫入,剩下的為一組,然後多線程寫入。分別寫入4731254條點和麵數據。寫入時候使用多核,ssd,合適的批量數據可以有效加快寫入速度,通過這些手段可以在三分鐘左右寫入四百多萬的點或者面數據。

<code>def mp_worker(features):

count = 0

es = Elasticsearch(hosts=[ip], timeout=5000)

success, _ = bulk(es,features, index=index_name, raise_on_error=True)

count += success

return count

def mp_handler(input_file, index_name, doc_type_name="en"):

with open(input_file, 'rb') as f:

data = json.load(f)

features = data["features"]

del data

act=[]

i=0

count=0

actions = []

for feature in features:

action = {

"_index": index_name,

"_type": doc_type_name,

"_source": {

"id": feature["properties"]["id"],

"location": {

"type": "polygon",

"coordinates": feature["geometry"]["coordinates"]

}

}

}

i=i+1

actions.append(action)

if (i == 9500):

act.append(actions)

count=count+i

i = 0

actions = []

if i!=0:

act.append(actions)

count = count + i

del features

print('read all %s data ' % count)

p = multiprocessing.Pool(4)

i=0

for result in p.imap(mp_worker, act):

i=i+result

print('write all %s data ' % i)
/<code>

GEO(point)查詢距離nkm附近的點和範圍選擇

<code>from elasticsearch import Elasticsearch

from elasticsearch.helpers import scan

import time

starttime = time.time()

_index = "gis_point"

_doc_type = "20190824"

ip = "127.0.0.1:9200"

# 附近nkm 選擇

_body = {

"query": {

"bool": {

"must": {

"match_all": {}

},

"filter": {

"geo_distance": {

"distance": "9km",

"location": {

"lat": 18.1098857850465471,

"lon": 109.1271036098896730

}

}

}

}

}

}

# 範圍選擇

# _body={

# "query": {

# "geo_bounding_box": {

# "location": {

# "top_left": {

# "lat": 18.4748659238899933,

# "lon": 109.0007435371629470

# },

# "bottom_right": {

# "lat": 18.1098857850465471,

# "lon": 105.1271036098896730

# }

# }

# }

# }

# }

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = scan(es, query=_body, scroll="10m", index=_index, timeout="10m")

for resp in scanResp:

print(resp)

endtime = time.time()

print(endtime - starttime)
/<code>
Elasticsearch(GEO)數據寫入和空間檢索

GEO(shape)範圍選擇

<code>from elasticsearch import Elasticsearch

from elasticsearch.helpers import scan

import time

starttime = time.time()

_index = "gis"

_doc_type = "20190823"

ip = "127.0.0.1:9200"

# envelope format, [[minlon,maxlat],[maxlon,minlat]]

_body = {

"query": {

"bool": {

"must": {

"match_all": {}

},

"filter": {

"geo_shape": {

"location": {

"shape": {

"type": "envelope",

"coordinates": [[108.987103609889, 18.474865923889993], [109.003537162947, 18.40988578504]]

},

"relation": "within"

}

}

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")

for resp in scanResp:

print(resp)

endtime = time.time()

print(endtime - starttime)
/<code>
Elasticsearch(GEO)數據寫入和空間檢索

GEO(point)距離聚合

<code>from elasticsearch import Elasticsearch

import time

starttime = time.time()

_index = "gis_point"

_doc_type = "20190824"

ip = "127.0.0.1:9200"

# 距離聚合

_body = {

"aggs" : {

"rings_around_amsterdam" : {

"geo_distance" : {

"field" : "location",

"origin" : "18.1098857850465471,109.1271036098896730",

"ranges" : [

{ "to" : 100000 },

{ "from" : 100000, "to" : 300000 },

{ "from" : 300000 }

]

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search( body=_body, index=_index)

for i in scanResp['aggregations']['rings_around_amsterdam']['buckets']:

print(i)

endtime = time.time()

print(endtime - starttime)
/<code>
Elasticsearch(GEO)數據寫入和空間檢索

中心點聚合

<code>_body ={

"aggs" : {

"centroid" : {

"geo_centroid" : {

"field" : "location"

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search( body=_body, index=_index)

print(scanResp['aggregations'])
/<code>
Elasticsearch(GEO)數據寫入和空間檢索

範圍聚合

<code>_body = {

"aggs": {

"viewport": {

"geo_bounds": {

"field": "location"

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

print(scanResp['aggregations']['viewport'])
/<code>
Elasticsearch(GEO)數據寫入和空間檢索

geohash聚合

<code>##低精度聚合,precision代表geohash長度

_body = {

"aggregations": {

"large-grid": {

"geohash_grid": {

"field": "location",

"precision": 3

}

}

}

}

# 高精度聚合,範圍聚合以及geohash聚合

# _body = {

# "aggregations": {

# "zoomed-in": {

# "filter": {

# "geo_bounding_box": {

# "location": {

# "top_left": "18.4748659238899933,109.0007435371629470",

# "bottom_right": "18.4698857850465471,108.9971036098896730"

# }

# }

# },

# "aggregations": {

# "zoom1": {

# "geohash_grid": {

# "field": "location",

# "precision": 7

# }

# }

# }

# }

# }

# }

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

for i in scanResp['aggregations']['large-grid']['buckets']:

print(i)

#for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:

# print(i)
/<code>
Elasticsearch(GEO)數據寫入和空間檢索

Elasticsearch(GEO)數據寫入和空間檢索

切片聚合

<code># 低精度切片聚合,precision代表級別

_body = {

"aggregations": {

"large-grid": {

"geotile_grid": {

"field": "location",

"precision": 8

}

}

}

}

# 高精度切片聚合,範圍聚合以切片聚合

# _body={

# "aggregations" : {

# "zoomed-in" : {

# "filter" : {

# "geo_bounding_box" : {

# "location" : {

# "top_left": "18.4748659238899933,109.0007435371629470",

# "bottom_right": "18.4698857850465471,108.9991036098896730"

# }

# }

# },

# "aggregations":{

# "zoom1":{

# "geotile_grid" : {

# "field": "location",

# "precision": 18

# }

# }

# }

# }

# }

# }

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

for i in scanResp['aggregations']['large-grid']['buckets']:

print(i)

# for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:

# print(i)
/<code>
Elasticsearch(GEO)數據寫入和空間檢索

Elasticsearch(GEO)數據寫入和空間檢索

Elasticsearch和PostGIS相同功能對比

PostGIS最近點查詢

<code>SELECT id,geom, ST_DistanceSphere(geom,'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry)

FROM h5

ORDER BY geom 

'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry

LIMIT 1
/<code>

Elasticsearch最近點查詢

<code>from elasticsearch import Elasticsearch

import time

starttime = time.time()

_index = "gis_point"

_doc_type = "20190824"

ip = "127.0.0.1:9200"

_body={

"sort": [

{

"_geo_distance": {

"unit": "m",

"order": "asc",

"location": [

109.1681036098896730,

18.1299957850465471

],

"distance_type": "arc",

"mode": "min",

"ignore_unmapped": True

}

}

],

"from": 0,

"size": 1,

"query": {

"bool": {

"must": {

"match_all": {}

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

endtime = time.time()

print(endtime - starttime)
/<code>

PostGIS範圍查詢

<code>select id,geom,fid FROM public."California"

where

ST_Intersects(geom,ST_MakeEnvelope(-117.987103609889,33.40988578504,-117.003537162947,33.494865923889993, 4326))=true

[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]
/<code>

Elasticsearch範圍查詢

<code>from elasticsearch import Elasticsearch

from elasticsearch.helpers import scan

import time

starttime = time.time()

_index = "gis_california"

ip = "127.0.0.1:9200"

# envelope format, [[minlon,maxlat],[maxlon,minlat]]

_body = {

"query": {

"bool": {

"must": {

"match_all": {}

},

"filter": {

"geo_shape": {

"geom": {

"shape": {

"type": "envelope",

"coordinates": [[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]]

},

"relation": "INTERSECTS"

}

}

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")

i=0

for resp in scanResp:

i=i+1

a=resp

print(i)

endtime = time.time()

print(endtime - starttime)
/<code> 

兩種場景中PostGIS的性能更好


參考資料:

1.Elasticsearch(GEO)空間檢索查詢

2.Elasticsearch官網

3.PostGIS拆分LineString為segment,point

4.億級“附近的人”,打通“特殊服務”通道

5.PostGIS教程二十二:最近鄰域搜索


分享到:


相關文章: