Spark for Java: SparkCore RDD Value類型方法案例

package

com.pactera.base;
import com.pactera.base.bean.User;
import com.pactera.base.utils.BaseUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFlatMapFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import
org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.RDD;
import org.json4s.JsonAST;
import scala.Function1;
import scala.collection.TraversableOnce;
import scala.math.Ordering;
import java.util.*;
/**
* SparkCore RDD Value類型
*
*
@author 張銳
*
@create
2020/2/2 14:01
*/
public class BaseApplication {
public static void main(String[] args) {
BaseUtil baseUtil = new BaseUtil();
JavaSparkContext sparkContext = baseUtil.init();
List data = Arrays.asList(1, 2, 3, 4, 5, 6);
JavaRDD javaRDD = sparkContext.parallelize(data);
//1. map(func)案例:返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換後組成
JavaRDD mapRDD = javaRDD.map(value -> value * 2);
//2. mapPartitions(func) 案例:類似於map,但獨立地在RDD的每一個分片上運行

JavaRDD mapPartitions = javaRDD.mapPartitions(iterator -> {
List list = new ArrayList<>();
while (iterator.hasNext()) {
Integer next = iterator.next();
list.add(next);
}
return list.iterator();
});
//jdk<1.8
javaRDD.mapPartitions(new FlatMapFunction, Integer>() {
@Override
public Iterator call(Iterator integerIterator) throws Exception {
List list = new
ArrayList<>();
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
});
//3. mapPartitionsWithIndex(func) 案例:類似於mapPartitions,但func帶有一個整數參數表示分片的索引值
javaRDD.mapPartitionsWithIndex((index, integerIterator) -> {
//index表示分區的索引值
List list = new ArrayList<>();
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}, true
);
javaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
@Override
public Iterator call(Integer integer, Iterator integerIterator) throws Exception {
List list = new ArrayList<>();
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
}, true);
//4. flatMap(func) 案例:類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素

javaRDD.flatMap(value -> Arrays.asList(value).iterator());
JavaRDD flatMap = javaRDD.flatMap(new FlatMapFunction() {
@Override
public Iterator call(Integer integer) throws Exception {
return Arrays.asList(integer).iterator();
}
});
//5. glom案例:將每一個分區形成一個數組,形成新的RDD類型時RDD[Array[T]]
JavaRDD> glom = javaRDD.glom();
//6. groupBy(func)案例:分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器
//對象需要實現序列化接口
List users = User.init();
JavaRDD userJavaRDD = sparkContext.parallelize(users);
Map> integerIterableMap = javaRDD.groupBy(value -> value).collectAsMap();

JavaPairRDD> groupBy = userJavaRDD.groupBy(new Function() {
@Override
public Object call(User user) throws Exception {
return user.getAge();
}
});
Map> collectAsMap = groupBy.collectAsMap();
System.out.println(collectAsMap);
//7. filter(func) 案例:過濾。返回一個新的RDD,該RDD由經過func函數計算後返回值為true的輸入元素組成
JavaRDD filter = javaRDD.filter(value -> value > 2);
//8. sample(withReplacement, fraction, seed) 案例:以指定的隨機種子隨機抽樣出數量為fraction的數據,
// withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣,seed用於指定隨機數生成器種子

JavaRDD sample = javaRDD.sample(true, 3);
//9. distinct([numTasks])) 案例:對源RDD進行去重後返回一個新的RDD。
// 默認情況下,只有8個並行任務來操作,但是可以傳入一個可選的numTasks參數改變它
javaRDD.distinct().collect();
//10. coalesce(numPartitions) 案例:縮減分區數,用於大數據集過濾後,提高小數據集的執行效率。
javaRDD.coalesce(2);
javaRDD.coalesce(2, true);
//11. repartition(numPartitions) 案例:根據分區數,重新通過網絡隨機洗牌所有數據。
javaRDD.repartition(2);
//12. coalesce和repartition的區別:
//1. coalesce重新分區,可以選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。
//2. repartition實際上是調用的coalesce,默認是進行shuffle的

//13. sortBy(func,[ascending], [numTasks]) 案例:使用func先對數據進行處理,按照處理後的數據比較結果排序,默認為正序。
javaRDD.sortBy(value -> value * 2, true, 2);
//14. pipe(command, [envVars]) 案例:管道,針對每個分區,都執行一個shell腳本,返回輸出的RDD。
//注意:腳本需要放在Worker節點可以訪問到的位置
javaRDD.pipe("sh demo.sh");
sparkContext.stop();
}
}


分享到:


相關文章: