大数据干货丨激发RDD从HBase中读取、写入和删除


大数据干货丨激发RDD从HBase中读取、写入和删除


目录

  • 安装
  • 初步
  • 关于类型的注记
  • 读HBASE
  • 写信给HBASE
  • 从HBASE删除
  • 大量装载到HBASE
  • 示例项目
  • 安装

    本指南假设您正在使用SBT。使用类似的工具,如Maven或Leiningen,也应该有细微的区别。

    HBASE RDD可以作为SBT中的依赖项添加,其中:

    <code>dependencies += "eu.unicredit" %% "hbase-rdd" % "0.9.0"/<code>

    目前,该项目依赖于以下工件:

    <code>"org.apache.spark" %% "spark-core" % "2.4.3" % "provided","org.apache.hbase" % "hbase-common" % "2.1.0-cdh6.2.0" % "provided","org.apache.hbase" % "hbase-mapreduce" % "2.1.0-cdh6.2.0" % "provided","org.apache.hbase" % "hbase-server" % "2.1.0-cdh6.2.0" % "provided",/<code>

    所有依赖项都显示为provided范围,所以您必须要么在项目中拥有这些依赖项,要么在集群中具有本地可用的相应工件。其中大多数都可以在Cloudera存储库中获得,您可以在下面的行中添加这些存储库:

    <code>resolvers ++= Seq(  "Cloudera repos" at "https://repository.cloudera.com/artifactory/cloudera-repos",  "Cloudera releases" at "https://repository.cloudera.com/artifactory/libs-release")/<code>

    使用

    初步

    首先,添加以下导入以获得必要的关联:

    <code>import unicredit.spark.hbase._/<code>

    然后,您必须给出连接到HBASE的配置参数。这是通过提供unicredit.spark.hbase.HBaseConfig...这可以通过几种方式来实现,即增加普遍性。

    带着hbase-site.xml

    如果你碰巧在类路径上hbase-site.xml使用正确的配置参数,您可以只需

    <code>implicit val config = HBaseConfig()/<code>

    否则,您将不得不以编程方式配置HBASE RDD。

    有一个案例类

    最简单的方法是拥有一个包含两个字符串成员的case类。quorum和rootdir...然后,下面这样的东西就能工作了

    <code>case class Config(  quorum: String,  rootdir: String,  ... // Possibly other parameters)val c = Config(...)implicit val config = HBaseConfig(c)/<code>

    带着地图

    为了自定义更多的参数,可以提供(String, String),就像

    <code>implicit val config = HBaseConfig(  "hbase.rootdir" -> "...",  "hbase.zookeeper.quorum" -> "...",  ...)/<code>

    具有Hadoop配置对象

    最后,可以从现有的org.apache.hadoop.conf.Configuration

    <code>val conf: Configuration = ...implicit val config = HBaseConfig(conf)/<code>

    关于类型的注记

    在HBASE中,每个数据,包括表和列名,都存储为Array[Byte]...为了简单起见,我们假设所有表、列和列的姓氏实际上都是字符串。

    另一方面,单元格的内容可以具有任何类型,这些类型可以转换为Array[Byte]...为了做到这一点,我们定义了两个特征unicredit.spark.hbase:

    <code>trait Reads[A] { def read(data: Array[Byte]): A }trait Writes[A] { def write(data: A): Array[Byte] }/<code>

    读取类型的方法。A从HBASE将需要一个隐式Reads[A]在作用域上,以及写入HBASE的对称方法都需要隐式的。Writes[A].

    默认情况下,我们为String, org.json4s.JValue而那些微不足道的Array[Byte].

    读HBASE

    一些方法被添加到SparkContext为了读到HBASE。

    如果您知道要读取哪些列,则可以使用sc.hbase()...假设列cf1:col1, cf1:col2和cf2:col3表中t1,并将内容序列化为utf-8字符串,则可以这样做。

    <code>val table = "t1"val columns = Map(  "cf1" -> Set("col1", "col2"),  "cf2" -> Set("col3"))val rdd = sc.hbase[String](table, columns)/<code>

    总体而言,sc.hbase[K, Q, V]具有表示单元格的行键、限定符和内容类型的类型参数,并返回RDD[(K, Map[String, Map[Q, V]])]...结果RDD的每个元素都是一个键/值对,其中键是来自HBASE的行键,值是一个嵌套映射,它将列族和列与值关联起来。缺少的列将从映射中省略,因此,例如,可以将上面的列投影到col2列做类似的事情

    <code>rdd.flatMap({ case (k, v) =>  v("cf1") get "col2" map { col =>    k -> col  }  // or equivalently  // Try(k -> v("cf1")("col2")).toOption})/<code>

    你也可以用sc.hbase[K, V](table, columns)其中只有行键和值的类型参数是显式的,而限定符是字符串,或者sc.hbase[V](table, columns),其中行键和限定符的类型是字符串。这些选择适用于所有sc.hbase()和sc.hbaseTS()方法。

    第二种可能是获得整个列族。如果您事先不知道哪个是列名,这将是有用的。您可以使用方法来完成此操作。sc.hbase[A],就像

    <code>val table = "t1"val families = Set("cf1", "cf2")val rdd = sc.hbase[String](table, families)/<code>

    输出,就像sc.hbase[A],是一个RDD[(String, Map[String, Map[String, A]])].

    如果您需要同时读取时间戳,则可以在这两种情况下使用。sc.hbaseTS[K, Q, V]并获得一个RDD[(K, Map[String, Map[Q, (V, Long)]])]...生成的RDD的每个元素都是一个键/值对,其中键是来自HBASE的行键,值是一个嵌套映射,它将列族和列关联到元组(value,时间戳)。

    最后,有一个较低级别的原始访问权限。org.apache.hadoop.hbase.client.Result实例。为了这个,就这么做

    <code>val table = "t1"val rdd = sc.hbase[K](table)/<code>

    的返回值sc.hbase是RDD[(K, Result)]...第一个元素是行键,第二个元素是org.apache.hadoop.hbase.client.Result,所以您可以使用原始的HBASE API来查询它。

    还通过提供自定义过滤器或扫描对象来支持HBASE侧过滤器:

    <code>val filter = new PrefixFilter(Bytes.toBytes("abc"))val table = "t1"val families = Set("cf1", "cf2")val rdd = sc.hbase[String](table, families, filter)/<code>

    写信给HBASE

    为了写入HBASE,在某些类型的RDD上添加了一些方法。

    第一个是与你从HBASE读取的方式平行的。假设你有一个RDD[(K, Map[String, Map[Q, V]])]还有Writes[K], Writes[Q],和Writes[V]在范围内。然后,您可以使用该方法写入HBASE。toHBase,就像

    <code>val table = "t1"val rdd: RDD[(K, Map[String, Map[Q, V]])] = ...rdd.toHBase(table)/<code>

    在只需在单个列系列上编写的情况下,可以使用简化的表单。然后,一个类似的方法可以在RDD[(K, Map[Q, V])],如下所示

    <code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Map[Q, V])] = ...rdd.toHBase(table, cf)/<code>

    或者,如果您有一组固定的列,如

    <code>val table = "t1"val cf = "cf1"val headers: Seq[Q] = ...val rdd: RDD[(K, Seq[V])] = ...rdd.toHBase(table, cf, headers)/<code>

    哪里headers的列名Seq[V]价值。

    如果需要编写时间戳,可以在RDD中使用元组(V,Long),其中第二个元素表示时间戳,如

    <code>val rdd: RDD[(K, Map[String, Map[Q, (V, Long)]])] = .../<code>

    或者,对于简化的形式,比如

    <code>val rdd: RDD[(K, Map[Q, (V, Long)])] = .../<code>

    或者,用一组固定的列

    <code>val rdd: RDD[(K, Seq[(V, Long)])] = .../<code>

    你可以看看WriteTsvToHBase.scala在……里面HBASE-RDD-示例项目如何从Hdfs到HBase

    从HBASE删除

    为了从HBASE中删除,在某些类型的RDD上添加了一些方法。

    假设你有一个RDD[(K, Map[String, Set[Q])]行键和家庭/一组列的映射。然后可以使用方法从HBASE中删除deleteHBase,就像

    <code>val table = "t1"val rdd: RDD[(K, Map[String, Set[Q])] = ...rdd.deleteHBase(table)/<code>

    在只需从单个列家族中删除的情况下,可以使用简化的表单。然后,一个类似的方法可以在RDD[(K, Set[Q])]行键和一组列,如下所示

    <code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Set[Q])] = ...rdd.deleteHBase(table, cf)/<code>

    或者,如果要删除一个列族、整个列族或整行的固定列集,则可以在RDD[K]中的行键,可以如下所示

    <code>val table = "t1"val cf = "cf1"val headers: Set[Q] = ...val rdd: RDD[K] = ...rdd.deleteHBase(table, cf, headers)/<code>

    <code>val cfs = Set("cf1", "cf2")rdd.deleteHBase(table, cfs)/<code>

    <code>rdd.deleteHBase(table)/<code>

    如果需要使用时间戳删除,可以在rdd中使用元组(String,long),其中第一个元素是列,第二个元素表示时间戳,如下所示

    <code>val rdd: RDD[(K, Map[String, Set[(Q, Long)]])] = .../<code>

    或者,对于简化的形式,比如

    <code>val rdd: RDD[(K, Set[(Q, Long)])] = .../<code>

    使用HFiles将批量加载到HBASE

    在大量写入HBASE的情况下,直接将对象写入表中可能效率低下,并可能导致HBASE没有响应(例如,它会触发区域分裂)。更好的方法是创建HFiles,而不是调用LoadIncrementalHFiles作业将它们移动到HBASE的文件系统。不幸的是,这种方法相当繁琐,因为它意味着以下步骤:

    1. 确保该表存在并具有区域分割,以便将行均匀地分配到区域中(以获得更好的性能)。
    2. 使用HFileOutputFormat2输出格式,实现并执行映射(并减少)作业,将有序的PUT或KeyValue对象写入HFile文件。还原阶段是通过调用HFileOutputFormat2.figreIncrementalLoad在幕后配置的。
    3. 执行LoadIncrementalHFiles作业,将HFile文件移动到HBASE的文件系统。
    4. 清理临时文件和文件夹

    现在,您可以通过调用toHBaseBulk,就像

    <code>val table = "t1"val rdd: RDD[(K, Map[String, Map[Q, V]])] = ...rdd.toHBaseBulk(table)/<code>

    在只需在单个列系列上写入的情况下,可以使用简化的表单。

    <code>val table = "t1"val cf = "cf1"val rdd: RDD[(K, Map[Q, V])] = ...rdd.toHBaseBulk(table, cf)/<code>

    或者,如果您有一组固定的列,如

    <code>val table = "t1"val cf = "cf1"val headers: Seq[Q] = ...val rdd: RDD[(K, Seq[V])] = ...rdd.toHBaseBulk(table, cf, headers)/<code>

    哪里headers的列名Seq[A]价值。

    如果需要编写时间戳,可以使用元组。(A, Long)在你的RDD,其中第二个元素表示时间戳,如

    <code>val rdd: RDD[(K, Map[String, Map[Q, (V, Long)]])] = .../<code>

    或者,对于简化的形式,比如

    <code>val rdd: RDD[(K, Map[Q, (V, Long)])] = .../<code>

    或者,如果是一组固定的列,如

    <code>val rdd: RDD[(K, Seq[(V, Long)])] = .../<code>

    但是第一步呢?为此,一个Admin对象使用一些帮助方法来挽救。必须通过实例打开到HBASE的连接(如1.0.0版以来所要求的那样)

    <code>  val admin = Admin()/<code>

    然后

    • admin.tableExists(tableName: String, family: String)*检查表是否存在,并相应地返回true或false。如果表tableName存在但列族family不,一个IllegalArgumentException抛出
    • admin.tableExists(tableName: String, families: Set[String])*检查表是否存在,并相应地返回true或false。如果表tableName存在但至少有一个families不,一个IllegalArgumentException抛出
    • admin.snapshot(tableName: String)*创建表的快照tableName,命名<tablename>_yyyyMMddHHmmss(后缀是快照操作的日期和时间)/<tablename>
    • admin.snapshot(tableName: String, snapshotName: String)*创建表的快照tableName,命名为“快照名称”。
    • admin.createTable(tableName: String, family: String, splitKeys: Seq[K])*创建一个表tableName列族family和由已排序的拆分键序列定义的区域。splitKeys
    • admin.createTable(tableName: String, families: Set[String], splitKeys: Seq[K])*创建一个表tableName列族families和由已排序的拆分键序列定义的区域。splitKeys
    • admin.createTable(tableName: String, families: Set[String])*创建一个表tableName列族families
    • admin.createTable(tableName: String, families: String*)*创建一个表tableName列族families
    • admin.disableTable(tableName: String)*禁用表tableName(删除前必须禁用表)
    • admin.deleteTable(tableName: String)*删除表tableName
    • admin.truncateTable(tableName: String, preserveSplits: Boolean)截断表tableName,可选择地保持区域分裂。
    • admin.computeSplits(rdd: RDD[K], regionsCount: Int)*给予RDD键和所需区域的数目(regionsCount),返回一个已排序的拆分键序列,用于createTable()

    最后,必须关闭与HBASE的连接。

    <code>admin.close/<code>

    你可以看看ImportTsvToHFiles.scala在……里面HBASE-RDD-示例项目如何从Hdfs到HBase

    设置每个家庭每个区域的HFiles数

    为了获得最佳的性能,HBASE应该使用每个家庭每个区域1 HFile。另一方面,您使用的HFiles越多,SPark作业中的分区就越多,因此SPark任务运行得更快,占用的内存堆也更少。您可以通过将额外的可选参数传递给toHBaseBulk()方法,numFilesPerRegionPerFamily=其中N(默认值为1)是介于1和hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily参数(默认值为32)。

    <code>rdd.toHBaseBulk(table, numFilesPerRegionPerFamily=32)/<code>

    <code>rdd.toHBaseBulk(table, cf, numFilesPerRegionPerFamily=32)/<code>

    <code>rdd.toHBaseBulk(table, cf, headers, numFilesPerRegionPerFamily=32)/<code>


    分享到:


    相關文章: