引言
前面,我们已经介绍了kafka入门知识,storm的入门知识,如果你对这些都不熟悉,请参考之前发布的文章,这篇文章主要是实战,没有过多的理论介绍,如果有任何疑问,可以在下面留言,会在第一时间进行回复……文章最后,附有完整的代码实例
一、Kafka环境启动
版本选择:版本很重要,否则搭建环境,会出现很多问题,切记本篇文章只针对以下版本有效,其它版本请自行测试……
Zookeeper:3.4.12
Kafka:0.9.0.1
Storm:1.1.1
1、启动zookeeper环境
zkServer.sh start
2、启动Kafka环境
cd /usr/local/kafka_2.11-0.9.0.1
bin/kafka-server-start.sh -daemon config/server-1.properties &
3、查看进程,看是否启动成功
[root@jikeh ~]# jps
2322 Jps
2052 ConsoleProducer
1556 QuorumPeerMain #zookeeper启动成功
1966 Kafka #kafka启动成功
二、Storm集成Kafka
1、Kafka消费者
1)pom依赖
<properties>
<project.build.sourceencoding>UTF-8
<project.reporting.outputencoding>UTF-8
<java.version>1.8
<maven.compiler.source>1.8
<maven.compiler.target>1.8
<spring-boot.version>1.5.16.RELEASE
<spring-boot-maven-plugin.version>1.5.16.RELEASE
<maven-compiler-plugin.version>3.5.1
<storm.version>1.1.1
<kafka.clients.version>0.9.0.1
<dependencies>
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot
<version>${spring-boot.version}
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-starter-web
<version>${spring-boot.version}
<dependency>
<groupid>org.springframework.boot
<artifactid>spring-boot-configuration-processor
<version>${spring-boot.version}
<dependency>
<groupid>org.apache.storm
<artifactid>storm-core
<version>${storm.version}
<exclusions>
<exclusion>
<groupid>org.apache.logging.log4j
<artifactid>log4j-slf4j-impl
<dependency>
<groupid>org.apache.storm
<artifactid>storm-kafka
<version>${storm.version}
<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j
<dependency>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j
<dependency>
<groupid>org.apache.kafka
<artifactid>kafka_2.11
<version>${kafka.clients.version}
<exclusions>
<exclusion>
<groupid>org.apache.kafka
<artifactid>kafka-clients
<exclusion>
<groupid>org.apache.zookeeper
<artifactid>zookeeper
<exclusion>
<groupid>log4j
<artifactid>log4j
<exclusion>
<artifactid>slf4j-api
<groupid>org.slf4j
<exclusion>
<artifactid>slf4j-log4j12
<groupid>org.slf4j
2)Kafka Spout:接收Kafka消息
//这个地方其实就是kafka配置文件里边的zookeeper.connect这个参数,可以去那里拿过来。
//ZkStr 字符串格式是 ip:port(例如:localhost:2181).brokerZkPath 是存储所有 topic 和 partition信息的zk 根路径.默认情况下,Kafka使用 /brokers路径.
String brokerZkStr = "192.168.199.147:2181";
ZkHosts zkHosts = new ZkHosts(brokerZkStr);
String topic = "jikeh";
//汇报offset信息的root路径
String offsetZkRoot = "/" + topic;
//存储该spout id的消费offset信息,譬如以topoName来命名
String offsetZkId = UUID.randomUUID().toString();
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//kafka.api.OffsetRequest.EarliestTime(): 从topic 初始位置读取消息 (例如,从最老的那个消息开始)
//kafka.api.OffsetRequest.LatestTime(): 从topic尾部开始读取消息 (例如,新写入topic的信息)
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
KafkaSpout spout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout);
3)处理kafka消息
public class KafkaConsumerBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
try {
String message = tuple.getStringByField("str");
// String message = tuple.getString(0);
System.out.println("--->" + message);
this.collector.ack(tuple);
} catch (Exception e) {
this.collector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
4)本地模式运行Storm作业
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());
2、Kafka生产者
1)pom
与上面相同
2)数据源
public class MessageSpout extends BaseRichSpout {
private Fields fields = null;
private SpoutOutputCollector collector ;
public MessageSpout(Fields fields){
this.fields = fields;
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
for (int i = 0; i < 5; i++) {
this.collector.emit(new Values("jikeh", "visit--" + i));
}
Utils.sleep(2000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(this.fields);
}
@Override
public void ack(Object o) {
}
@Override
public void fail(Object o) {
}
}
3)数据处理并写入kafka
//2、写入kafka
//set producer properties.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.199.147:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, 3);
KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector(topicName))
// .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"))
;
builder.setBolt("bolt", bolt).shuffleGrouping("spout");
代码下载地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
项目名:spring-boot-storm-kafka
閱讀更多 極客慧 的文章