怎么配置sparkstreaming 让他解析kafka中的日志
展开全部
1、KafkaUtils.createDstream
构造函数KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
使用receivers接收数据利用Kafka高层消费者api于所receivers接收数据保存spark executors通Spark Streaming启job处理些数据默认丢失启用WAL志该志存储HDFS
A、创建receiverkafka进行定拉取数据sscrdd区kafkatopic区概念故增加特定主体区数仅仅增加receiver消费topic线程数并增加spark并行处理数据数量
B、于同grouptopic使用receivers创建同DStream
C、启用WAL需要设置存储级别即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
2.KafkaUtils.createDirectStream
区别Receiver接收数据种式定期kafkatopic+partition查询新偏移量再根据偏移量范围每batch面处理数据使用kafka简单消费者api
优点:
A、 简化并行需要kafka输入流该创建kafka区rdd数且kafka并行读取
B、高效种式并需要WALWAL模式需要数据复制两第kafka复制另写wal
C、恰语义(Exactly-once-semantics)传统读取kafka数据通kafka高层api偏移量写入zookeeper存数据丢失能性zookeeperssc偏移量致EOS通实现kafka低层api偏移量仅仅ssc保存checkpoint消除zkssc偏移量致问题缺点使用基于zookeeperkafka监控工具
构造函数KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
使用receivers接收数据利用Kafka高层消费者api于所receivers接收数据保存spark executors通Spark Streaming启job处理些数据默认丢失启用WAL志该志存储HDFS
A、创建receiverkafka进行定拉取数据sscrdd区kafkatopic区概念故增加特定主体区数仅仅增加receiver消费topic线程数并增加spark并行处理数据数量
B、于同grouptopic使用receivers创建同DStream
C、启用WAL需要设置存储级别即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
2.KafkaUtils.createDirectStream
区别Receiver接收数据种式定期kafkatopic+partition查询新偏移量再根据偏移量范围每batch面处理数据使用kafka简单消费者api
优点:
A、 简化并行需要kafka输入流该创建kafka区rdd数且kafka并行读取
B、高效种式并需要WALWAL模式需要数据复制两第kafka复制另写wal
C、恰语义(Exactly-once-semantics)传统读取kafka数据通kafka高层api偏移量写入zookeeper存数据丢失能性zookeeperssc偏移量致EOS通实现kafka低层api偏移量仅仅ssc保存checkpoint消除zkssc偏移量致问题缺点使用基于zookeeperkafka监控工具
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询