在PySpark中使用自定義UDF來計算Haversine距離

提供的腳本用於Spark會話,具有8個內核,4個執行程序,4Gb主內存和4Gb工作內存。

用Haversine計算地理距離

Haversine是一個公式,它採用兩個座標點(緯度和經度)並在一個對象上生成第三個座標點,以計算兩個原始點之間的表面距離,同時考慮對象的曲率。它假定物體的形狀(在我們的例子中是地球)是一個球體。我們知道地球實際上並不是一個完美的球體,但半徑是一個常數可以給出相當準確的結果。

在本例中,我們將生成包含新西蘭所有電臺的haversine距離的元數據。我們將選擇新西蘭。

首先,我們需要導入必須的Python庫。

from pyspark import SparkContext

from pyspark.sql import SparkSession

from pyspark.sql.types import *

from pyspark.sql import functions as F

from pyspark.sql import DataFrameWriter as W

from math import radians, cos, sin, asin, sqrt

spark = (SparkSession.builder
.appName(‘’HDFS_Haversine_Fun”)
.getOrCreate())

接下來,我們需要加載所需的數據。對於此示例,您只需要站點元數據。但是稍後當您想要進行自己的分析時,您可能希望使用所有GHCN文件。Python代碼如下:

input_stations = (spark.read.format(“text”)
.load(“hdfs:///data/ghcnd/stations”))

stations_df = input_stations.select(

F.trim(F.substring(F.col(“value”), 1 , 11–1 + 1 )).alias(“STATION_ID”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 13, 20–13 + 1)).alias(“LATITUDE”).cast(DoubleType()),

F.trim(F.substring(F.col(“value”), 22, 30–22 + 1)).alias(“LONGITUDE”).cast(DoubleType()),

F.trim(F.substring(F.col(“value”), 32, 37–32 + 1)).alias(“ELEVATION”).cast(DoubleType()),

F.trim(F.substring(F.col(“value”), 39, 40–39 + 1)).alias(“STATE_CODE”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 42, 71–42 + 1)).alias(“STATION_NAME”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 73, 75–73 + 1)).alias(“GSNFLAG”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 77, 79–77 + 1)).alias(“HCNFLAG_CRNFLAG”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 81, 85–81 + 1)).alias(“WMOID”).cast(StringType())

)

請注意,您不需要定義Schema,然後將其傳遞到單獨的load語句中,因為您可以使用pyspark.sql.functions將數據解析為具有所需類型的新列。實際上,這比在文本文件中首先定義StructFields要快得多。

讓我們來看看stations_df的前五個觀察結果。

stations_df.show(5)
在PySpark中使用自定義UDF來計算Haversine距離

接下來,我們將添加一個名為COUNTRY_CODE的列,以便我們稍後可以過濾以僅訪問我們最喜歡的國家/地區中的電臺。我們應該總是將udf傳遞給實現結果所需的最少量信息,因為udfs在大型數據集上的計算成本往往很高。

我們可以看到STATION_ID的前兩個字符是國家代碼。所以我們只需要取兩個前兩個字符並將其添加為新列COUNTRY_CODE。

stations_df = stations_df.withColumn(‘COUNTRY_CODE’, stations_df.STATION_ID.substr(1, 2))

完成後,我們可以過濾並選擇所需的列。

nz_stations = (stations_df
.filter(stations_df.COUNTRY_CODE==”NZ”)
.select(“STATION_ID”,“STATION_NAME”,“LATITUDE”, “LONGITUDE”))

現在我們可以開始Haversine函數了!為此,我們創建了一個標準的python函數,其中我們使用地球的半徑為6371km,並返回distance rounded的絕對值為2dp。Python代碼如下:

def get_distance(longit_a, latit_b, longit_b, latit_b):

# Transform to radians

longit_a, latit_b, longit_b, latit_b = map(radians, [longit_a, latit_b, longit_b, latit_b])

dist_longit = longit_b — longit_a

dist_latit = latit_b — latit_a

# Calculate area


area = sin(dist_latit/2)**2 + cos(latit_a) * sin(dist_longit/2)**2

# Calculate the central angle

central_angle = 2 * asin(sqrt(area))

radius = 6371

# Calculate Distance

distance = central_angle * radius

return abs(round(distance, 2))

現在我們已經將它定義為python函數,我們可以創建一個用戶定義的函數來在Spark DataFrame上使用它。用戶定義函數允許我們在python或SQL中創建自定義函數,然後使用它們來操作Spark DataFrame中的列。

轉換為UDF:

udf_get_distance = F.udf(get_distance)

現在,我們需要獲取原始的nz_stations元數據,並將其與自身交叉連接,以支持列操作,並在過程中重命名列。

nz_station_pairs = (nz_stations.crossJoin(nz_stations).toDF(
“STATION_ID_A”, “STATION_NAME_A”, “LATITUDE_A”, “LONGITUDE_A”,

“STATION_ID_B”, “STATION_NAME_A”, “LATITUDE_B”, “LONGITUDE_B”))

然後通過刪除重複的行來清理它。

nz_station_pairs = (nz_station_pairs.filter(
nz_station_pairs.STATION_ID_A != nz_station_pairs.STATION_ID_B))

現在我們可以將我們的udf_get_distance函數應用於nz_station_pairs以添加新列ABS_DISTANCE。

nz_pairs_distance = (nz_station_pairs.withColumn(“ABS_DISTANCE”, udf_get_distance(

nz_station_pairs.LONGITUDE_A, nz_station_pairs.LATITUDE_A,

nz_station_pairs.LONGITUDE_B, nz_station_pairs.LATITUDE_B)
))

注意:在這種情況下,udf不會將輸出作為float返回,而是創建一個新的列,distances 為字符串。我想這是因為withColumn恢復了Spark的默認值,這是一個字符串。

如果您遇到此問題,則可以通過添加新列來修改此值,該列是前一個的複製,但將新列強制轉換為Double Type。您無法更改上一列,因為Spark DataFrames是不可變的。

nz_pairs_distance = nz_pairs_distance.withColumn(“DISTANCE_FLOAT”, nz_pairs_distance.ABS_DISTANCE.cast(DoubleType()))

現在您的分析中包含了haversine距離元數據!

nz_pairs_distance.show(7)
在PySpark中使用自定義UDF來計算Haversine距離

您現在可以將其寫入本地hfs目錄,以用於R或Python中的可視化。

W(nz_pairs_distance).csv(path=”hdfs:///YOUR_DIRECTORY”, mode=’ignore’, header=’true’)

建議

  • 確保在使用UDF時始終使用實現結果所需的最小數據。這是因為如果UDF包含多個列操作,則作業將包含許多任務。
  • 始終檢查原始函數返回的所需對象數據類型是否與UDF的對象數據類型一致。您可以通過將新變量插入終端來輕鬆地在Spark中進行檢查。
  • 您可以使用Spark的DataFrameWriter將特定元數據保存到HDFS目錄,然後用於使用其他工具(例如R,Python等)進行分析。這可以通過以下方式複製到local director :
hdfs dfs -copyToLocal hdfs:///user/YOUR_DIRECTORY/YOUR_FILE.csv


分享到:


相關文章: