package com.pactera.base;
import com.pactera.base.bean.User;
import com.pactera.base.utils.BaseUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFlatMapFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.RDD;
import org.json4s.JsonAST;
import scala.Function1;
import scala.collection.TraversableOnce;
import scala.math.Ordering;
import java.util.*;
/**
* SparkCore RDD Value類型
*
* @author 張銳
* @create 2020/2/2 14:01
*/
public class BaseApplication {
public static void main(String[] args) {
BaseUtil baseUtil = new BaseUtil();
JavaSparkContext sparkContext = baseUtil.init();
List
JavaRDD
//1. map(func)案例:返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換後組成
JavaRDD
//2. mapPartitions(func) 案例:類似於map,但獨立地在RDD的每一個分片上運行
JavaRDD
List
while (iterator.hasNext()) {
Integer next = iterator.next();
list.add(next);
}
return list.iterator();
});
//jdk<1.8
javaRDD.mapPartitions(new FlatMapFunction
@Override
public Iterator
List
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
});
//3. mapPartitionsWithIndex(func) 案例:類似於mapPartitions,但func帶有一個整數參數表示分片的索引值
javaRDD.mapPartitionsWithIndex((index, integerIterator) -> {
//index表示分區的索引值
List
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}, true );
javaRDD.mapPartitionsWithIndex(new Function2
@Override
public Iterator
List
while (integerIterator.hasNext()) {
Integer next = integerIterator.next();
list.add(next);
}
return list.iterator();
}
}, true);
//4. flatMap(func) 案例:類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素
javaRDD.flatMap(value -> Arrays.asList(value).iterator());
JavaRDD
@Override
public Iterator
return Arrays.asList(integer).iterator();
}
});
//5. glom案例:將每一個分區形成一個數組,形成新的RDD類型時RDD[Array[T]]
JavaRDD> glom = javaRDD.glom();
//6. groupBy(func)案例:分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器
//對象需要實現序列化接口
List
JavaRDD
Map
JavaPairRDD
閱讀更多 張銳 的文章