请教java向kafka发消息
java代码如下:publicstaticvoidmain(String[]args){Propertiesproperties=newProperties();prop...
java代码如下:
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.60.128:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.60.128:9092");// 声明kafka broker
Producer producer = new Producer<Integer, String>(new ProducerConfig(properties));
producer.send(new KeyedMessage<Integer, String>("Qbit_topic", "message " +));
}
在服务器上执行
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Qbit_topic
是能正常发消息的
java代码执行了也没有报错,但是consumer没有收到消息 展开
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.60.128:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.60.128:9092");// 声明kafka broker
Producer producer = new Producer<Integer, String>(new ProducerConfig(properties));
producer.send(new KeyedMessage<Integer, String>("Qbit_topic", "message " +));
}
在服务器上执行
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Qbit_topic
是能正常发消息的
java代码执行了也没有报错,但是consumer没有收到消息 展开
展开全部
这是我们项目中用到的代码
public class ProducerService {
private static Logger log = Logger.getLogger(ProducerService.class);
private static Producer<String,String> producer = null;
private static String serviceIp = PropertiesUtils.getValue("/epoo.properties","bootstrap.servers");
private static String serviceName = PropertiesUtils.getValue("/epoo.properties","name");
public boolean initProducer(){
Properties props = new Properties();
//dataPlace.getIp()
props.put("bootstrap.servers", serviceIp);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "-1");
producer = new KafkaProducer(props);
try{
List<PartitionInfo> list = producer.partitionsFor(serviceName);
}catch(Exception e){
JOptionPane.showMessageDialog(null, e.getMessage(), "错误", JOptionPane.YES_OPTION);
log.error(e.getMessage());
return false;
}
return true;
}
public void sendData(String mess){
if(producer == null){
initProducer();
}
producer.send(new ProducerRecord<String,String>(serviceName,mess),new Callback() {
@Override
public void onCompletion(RecordMetadata rm, Exception e) {
if(e != null){
e.printStackTrace();
log.error(e.getMessage());
}
System.out.println("发送到服务器的Offset: " + rm.offset() + "-----Topic:" + rm.topic() + "-----partition:" + rm.partition());
}
});
}
public void close(){
if(producer != null){
producer.close();
}
}
}
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询