Spark 網頁式 開發 (三)

上篇我們講述到了 設計了存儲代碼的表,這篇講下 如何動態加載文本代碼。

在此之前,先給大家看下 Janino 的使用。

Janino 是一個超小型,超快的Java編譯器。



<code>    import scala.collection.JavaConverters._
val ee = CompilerFactoryFactory.getDefaultCompilerFactory().newExpressionEvaluator()
ee.setExpressionType(classOf[Boolean])
ee.setParameters(Array[String]("total", "count"), Array[Class[_]](classOf[Double], classOf[String]))
ee.cook("count.equals(\"3\") || total >= 200.0 ");

val res = ee.evaluate(List(120, "3").asJava.toArray());
println("Result = " + res);/<code>

這是一個非常基本的例子,

運行的代碼就是 count.equals("3") || total >= 200.0

首先定義了一個執行器ee

然後定義了返回類型是Boolean,參數的名稱 和 參數的類型,

接著設置了表達式 最後設定了 參數的值,就輸出結果了,

因為我們設定的total是120,count是字符串3,所以返回的結果自然就是

Result = true

這雖然是一個小小的例子,但是折射出來的意義就是 我們可以做到解析字符串形式的代碼,並將它翻譯成運行代碼去執行。

那麼接下來 我們就是開始準備設計 運行這個spark的job的抽象類 job_BaseRunClass。

然後 實現main方法。

<code>def main(args: Array[String]): Unit = {

//這裡的params就是外界傳入的參數
val params = if (args == null || args.length == 0) Map[String, String]()
else {
args(0).split(";").map(row => row.split("="))
.filter(_.size == 2).map(row => (row(0) -> row(1))).toMap
}

\t//這裡判斷下,如果運行的類是 com.oasis.spark.job_CodeGenGenerate 才會去執行讀取動態代碼
\t//反之 依舊維持原狀,保證了兼容性
if (params.getOrElse("mainClass", "") == "com.oasis.spark.job_CodeGenGenerate") {

\t\t\t//這裡是打開數據庫連接,用於獲取數據庫中存儲的代碼
val conn = getConn()()
require(params.contains("projectName"))
require(params.contains("jobName"))



val projectName =params.getOrElse("projectName", "")


val pre = conn.prepareStatement(s"select source_code,email_address from code_source where project_name='${projectName}' and job_name='${params.getOrElse("link_jobName", params.get("jobName").get)}' ")
val result = pre.executeQuery()
var sourceCode = ""

if (result.next()) {
//這裡我們就拿到了自己存儲在數據庫中的代碼了
sourceCode = result.getString(1)
receivers = if (result.getString(2) == null || result.getString(2).isEmpty) {
""
} else {

result.getString(2).replace("\\n", "").replace("\\r", "")
}


}
result.close()
pre.close()
conn.close()

val jobName = params.getOrElse("jobName", "XX")
\t\t//那麼拿到代碼之後肯定是去編譯這個代碼,這個邏輯在getCodeGenObject中。
getCodeGenObject(jobName, sourceCode +
"""
|
|private class JavaUtil {
| public Seq getSeq(T... values) {
| return (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(values));
| }
|}
""".stripMargin,
params.getOrElse("printCode", "false").toBoolean).runJob(
if (params.contains("receivers")) {
println("read config email:" + params.getOrElse("receivers", null))
params
} else {
println("read db email:" + receivers)
params.updated("receivers", receivers)
})

} else {
runJob(params)
}
}

/<code>

接下來 看下getCodeGenObject 這個方法,如何去編譯的。

<code>private final def getCodeGenObject(jobName: String, codeBody: String, printCode: Boolean) = {
\t\t//這裡定義了類的執行器,敲黑板,重點來了
val evaluator = new ClassBodyEvaluator()

evaluator.setDebuggingInformation(true, true, true);
evaluator.setParentClassLoader(getClass.getClassLoader)
//設置類名
evaluator.setClassName(s"com.oasis.spark.${jobName}")

//設置預先加載的類,相當於在代碼裡寫import 。。。。。
evaluator.setDefaultImports(
Array(
classOf[Platform].getName,
classOf[InternalRow].getName,
classOf[UnsafeRow].getName,
classOf[UTF8String].getName,
classOf[Decimal].getName,
classOf[CalendarInterval].getName,
classOf[ArrayData].getName,
classOf[UnsafeArrayData].getName,
classOf[MapData].getName,
classOf[UnsafeMapData].getName,
classOf[Expression].getName,
classOf[TaskContext].getName,
classOf[TaskKilledException].getName,
classOf[InputMetrics].getName,

"java.io.IOException",


"scala.Tuple2",
"scala.collection.mutable.ListBuffer",
"scala.collection.immutable.Seq",
"scala.collection.JavaConverters",
"scala.collection.JavaConversions",
"scala.collection.JavaConverters$",
"scala.collection.immutable.Map",
"scala.Option",
"scala.collection.immutable.Seq$",
"scala.Predef$",
"scala.reflect.ClassTag$",


"java.util.HashMap",
"java.util.ArrayList",
"java.util.List",
"java.util.Arrays",
"java.util.Calendar",
"java.text.ParseException",

"org.apache.hadoop.fs.Path",
"org.apache.hadoop.fs.FileSystem",

"org.apache.hadoop.conf.Configuration",

"org.apache.spark.sql.Encoders",
"org.apache.spark.sql.SaveMode",
"org.apache.spark.sql.Column",
"org.apache.spark.sql.SparkSession",
"org.apache.spark.sql.Dataset",
"org.apache.spark.sql.Row",
"org.apache.spark.sql.Row$",
"org.apache.spark.sql.expressions.Window",
"org.apache.spark.sql.functions",
"org.apache.spark.sql.types.DataTypes",
"org.apache.spark.sql.api.java.UDF1")
)
\t\t//設置繼承的類
evaluator.setExtendedClass(classOf[job_BaseRunClass])
evaluator.cook("generated.java", codeBody)
\t\t//最後實現類 job_BaseRunClass 這個抽象類,相當於 job_BaseRunClass 的子類了
\t\t//將這個類實例化出來,變成了一個對象
evaluator.getClazz().newInstance().asInstanceOf[job_BaseRunClass]
}
/<code>

得到是實例化出來的對象後,調用runJob 開啟了spark運行的旅程。

<code>  private final def runJob(params: Map[String, String]) = {
//獲取SparkSession 這個對象
val spark = getCommonSpark(params = params)
\t\t//設置一些內部處理
runPre(params)
\t\t//程序處理
execute(params.getOrElse("tableName", params.getOrElse("mainTable", null)), spark)
}
/<code>

這樣總體的流程就結束了。

而我們在網頁端的代碼編寫就是兩處


Spark 網頁式 開發 (三)

實現 runPre 方法 和 initWriteDBEntities 這兩個方法就可以了。

下次業務說要修改這個job的邏輯,直接在這個網頁上修改,那麼下次job運行的時候就會去讀取這個最新的代碼,然後編譯加載了。

完美的實現了 不用重新編譯代碼、生成jar包的操作了


分享到:


相關文章: