如何确定Kafka的分区数,key和consumer线程数

 我来答
day忘不掉的痛
2017-07-09 · 知道合伙人数码行家
day忘不掉的痛
知道合伙人数码行家
采纳数:62646 获赞数:223932
本人担任公司网络部总经理多年,有充足的网络经验、互联网相关知识和资讯。

向TA提问 私信TA
展开全部
分区实际上是调优Kafka并行度的最小单元。 对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;
而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。 所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
匿名用户
2017-07-12
展开全部
public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("fans", 1);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<Message> it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}
已赞过 已踩过<
你对这个回答的评价是?
评论 收起
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式