在PySpark中使用自定义UDF来计算Haversine距离

提供的脚本用于Spark会话,具有8个内核,4个执行程序,4Gb主内存和4Gb工作内存。

用Haversine计算地理距离

Haversine是一个公式,它采用两个坐标点(纬度和经度)并在一个对象上生成第三个坐标点,以计算两个原始点之间的表面距离,同时考虑对象的曲率。它假定物体的形状(在我们的例子中是地球)是一个球体。我们知道地球实际上并不是一个完美的球体,但半径是一个常数可以给出相当准确的结果。

在本例中,我们将生成包含新西兰所有电台的haversine距离的元数据。我们将选择新西兰。

首先,我们需要导入必须的Python库。

from pyspark import SparkContext

from pyspark.sql import SparkSession

from pyspark.sql.types import *

from pyspark.sql import functions as F

from pyspark.sql import DataFrameWriter as W

from math import radians, cos, sin, asin, sqrt

spark = (SparkSession.builder
.appName(‘’HDFS_Haversine_Fun”)
.getOrCreate())

接下来,我们需要加载所需的数据。对于此示例,您只需要站点元数据。但是稍后当您想要进行自己的分析时,您可能希望使用所有GHCN文件。Python代码如下:

input_stations = (spark.read.format(“text”)
.load(“hdfs:///data/ghcnd/stations”))

stations_df = input_stations.select(

F.trim(F.substring(F.col(“value”), 1 , 11–1 + 1 )).alias(“STATION_ID”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 13, 20–13 + 1)).alias(“LATITUDE”).cast(DoubleType()),

F.trim(F.substring(F.col(“value”), 22, 30–22 + 1)).alias(“LONGITUDE”).cast(DoubleType()),

F.trim(F.substring(F.col(“value”), 32, 37–32 + 1)).alias(“ELEVATION”).cast(DoubleType()),

F.trim(F.substring(F.col(“value”), 39, 40–39 + 1)).alias(“STATE_CODE”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 42, 71–42 + 1)).alias(“STATION_NAME”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 73, 75–73 + 1)).alias(“GSNFLAG”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 77, 79–77 + 1)).alias(“HCNFLAG_CRNFLAG”).cast(StringType()),

F.trim(F.substring(F.col(“value”), 81, 85–81 + 1)).alias(“WMOID”).cast(StringType())

)

请注意,您不需要定义Schema,然后将其传递到单独的load语句中,因为您可以使用pyspark.sql.functions将数据解析为具有所需类型的新列。实际上,这比在文本文件中首先定义StructFields要快得多。

让我们来看看stations_df的前五个观察结果。

stations_df.show(5)
在PySpark中使用自定义UDF来计算Haversine距离

接下来,我们将添加一个名为COUNTRY_CODE的列,以便我们稍后可以过滤以仅访问我们最喜欢的国家/地区中的电台。我们应该总是将udf传递给实现结果所需的最少量信息,因为udfs在大型数据集上的计算成本往往很高。

我们可以看到STATION_ID的前两个字符是国家代码。所以我们只需要取两个前两个字符并将其添加为新列COUNTRY_CODE。

stations_df = stations_df.withColumn(‘COUNTRY_CODE’, stations_df.STATION_ID.substr(1, 2))

完成后,我们可以过滤并选择所需的列。

nz_stations = (stations_df
.filter(stations_df.COUNTRY_CODE==”NZ”)
.select(“STATION_ID”,“STATION_NAME”,“LATITUDE”, “LONGITUDE”))

现在我们可以开始Haversine函数了!为此,我们创建了一个标准的python函数,其中我们使用地球的半径为6371km,并返回distance rounded的绝对值为2dp。Python代码如下:

def get_distance(longit_a, latit_b, longit_b, latit_b):

# Transform to radians

longit_a, latit_b, longit_b, latit_b = map(radians, [longit_a, latit_b, longit_b, latit_b])

dist_longit = longit_b — longit_a

dist_latit = latit_b — latit_a

# Calculate area


area = sin(dist_latit/2)**2 + cos(latit_a) * sin(dist_longit/2)**2

# Calculate the central angle

central_angle = 2 * asin(sqrt(area))

radius = 6371

# Calculate Distance

distance = central_angle * radius

return abs(round(distance, 2))

现在我们已经将它定义为python函数,我们可以创建一个用户定义的函数来在Spark DataFrame上使用它。用户定义函数允许我们在python或SQL中创建自定义函数,然后使用它们来操作Spark DataFrame中的列。

转换为UDF:

udf_get_distance = F.udf(get_distance)

现在,我们需要获取原始的nz_stations元数据,并将其与自身交叉连接,以支持列操作,并在过程中重命名列。

nz_station_pairs = (nz_stations.crossJoin(nz_stations).toDF(
“STATION_ID_A”, “STATION_NAME_A”, “LATITUDE_A”, “LONGITUDE_A”,

“STATION_ID_B”, “STATION_NAME_A”, “LATITUDE_B”, “LONGITUDE_B”))

然后通过删除重复的行来清理它。

nz_station_pairs = (nz_station_pairs.filter(
nz_station_pairs.STATION_ID_A != nz_station_pairs.STATION_ID_B))

现在我们可以将我们的udf_get_distance函数应用于nz_station_pairs以添加新列ABS_DISTANCE。

nz_pairs_distance = (nz_station_pairs.withColumn(“ABS_DISTANCE”, udf_get_distance(

nz_station_pairs.LONGITUDE_A, nz_station_pairs.LATITUDE_A,

nz_station_pairs.LONGITUDE_B, nz_station_pairs.LATITUDE_B)
))

注意:在这种情况下,udf不会将输出作为float返回,而是创建一个新的列,distances 为字符串。我想这是因为withColumn恢复了Spark的默认值,这是一个字符串。

如果您遇到此问题,则可以通过添加新列来修改此值,该列是前一个的复制,但将新列强制转换为Double Type。您无法更改上一列,因为Spark DataFrames是不可变的。

nz_pairs_distance = nz_pairs_distance.withColumn(“DISTANCE_FLOAT”, nz_pairs_distance.ABS_DISTANCE.cast(DoubleType()))

现在您的分析中包含了haversine距离元数据!

nz_pairs_distance.show(7)
在PySpark中使用自定义UDF来计算Haversine距离

您现在可以将其写入本地hfs目录,以用于R或Python中的可视化。

W(nz_pairs_distance).csv(path=”hdfs:///YOUR_DIRECTORY”, mode=’ignore’, header=’true’)

建议

  • 确保在使用UDF时始终使用实现结果所需的最小数据。这是因为如果UDF包含多个列操作,则作业将包含许多任务。
  • 始终检查原始函数返回的所需对象数据类型是否与UDF的对象数据类型一致。您可以通过将新变量插入终端来轻松地在Spark中进行检查。
  • 您可以使用Spark的DataFrameWriter将特定元数据保存到HDFS目录,然后用于使用其他工具(例如R,Python等)进行分析。这可以通过以下方式复制到local director :
hdfs dfs -copyToLocal hdfs:///user/YOUR_DIRECTORY/YOUR_FILE.csv


分享到:


相關文章: