请教一个关于使用spark 读取kafka只能读取一个分区数据的问题

 我来答
匿名用户
2017-05-02
展开全部
我先写了一个kafka的生产者程序,然后写了一个kafka的消费者程序,一切正常。
生产者程序生成5条数据,消费者能够读取到5条数据。然后我将kafka的消费者程序替换成使用spark的读取kafka的程序,重复多次发现每次都是读取1号分区的数据,而其余的0号和2号2个分区的数据都没有读到。请哪位大侠出手帮助一下。
我使用了三台虚拟机slave122,slave123,slave124作为kafka集群和zk集群;然后生产者和消费者程序以及spark消费者程序都是在myeclipse上完成。
软件版本为:kafka_2.11-0.10.1.0,spark-streaming-kafka-0-10_2.11-2.1.0,zookeeper-3.4.9
spark消费者程序主要代码如下:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "slave124:9092,slave122:9092,slave123:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "ssgroup");
kafkaParams.put("auto.offset.reset", "earliest"); //update mykafka,"earliest" from the beginning,"latest" from the rear of topic
kafkaParams.put("enable.auto.commit", "true"); //messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics
kafkaParams.put("auto.commit.interval.ms", "5000");

// Create a local StreamingContext with two working thread and batch interval of 2 second
SparkConf conf = new SparkConf();
//conf被set后,返回新的SparkConf实例,所以多个set必须连续,不能拆开。
conf.setMaster("local[1]").setAppName("streaming word count").setJars(new String[]{"D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\MyFirstHadoop.jar"});;
try{
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
Collection<String> topics = new HashSet<>(Arrays.asList("order"));
JavaInputDStream<ConsumerRecord<String, String>> oJInputStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> pairs = oJInputStream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
try {
BufferedWriter oBWriter = new BufferedWriter(new FileWriter("D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\mysparkstream\\MyFirstHadoop.out",true));
String strLog = "^^^^^^^^^^^ " + System.currentTimeMillis() / 1000 + " mapToPair:topic:" + record.topic() + ",key:" + record.key() + ",value:" + record.value() + ",partition id:" + record.partition() + ",offset:" + record.offset() + ".\n";
System.out.println(strLog);
oBWriter.write(strLog);
oBWriter.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new Tuple2<>(record.key(), record.value());
}
}
);
pairs.print();
jssc.start(); //start here in fact
jssc.awaitTermination();
jssc.close();
}catch(Exception e){
// TODO Auto-generated catch block
System.out.println("Exception:throw one exception");
e.printStackTrace();
}
本回答被网友采纳
已赞过 已踩过<
你对这个回答的评价是?
评论 收起
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式