kafka怎么发布订阅 怎么在java中实现
展开全部
这是我们项目中用到的代码
public class ProducerService {
private static Logger log = Logger.getLogger(ProducerService.class);
private static Producer<String,String> producer = null;
private static String serviceIp = PropertiesUtils.getValue("/epoo.properties","bootstrap.servers");
private static String serviceName = PropertiesUtils.getValue("/epoo.properties","name");
public boolean initProducer(){
Properties props = new Properties();
//dataPlace.getIp()
props.put("bootstrap.servers", serviceIp);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "-1");
producer = new KafkaProducer(props);
try{
List<PartitionInfo> list = producer.partitionsFor(serviceName);
}catch(Exception e){
JOptionPane.showMessageDialog(null, e.getMessage(), "错误", JOptionPane.YES_OPTION);
log.error(e.getMessage());
return false;
}
return true;
}
public void sendData(String mess){
if(producer == null){
initProducer();
}
producer.send(new ProducerRecord<String,String>(serviceName,mess),new Callback() {
@Override
public void onCompletion(RecordMetadata rm, Exception e) {
if(e != null){
e.printStackTrace();
log.error(e.getMessage());
}
System.out.println("发送到服务器的Offset: " + rm.offset() + "-----Topic:" + rm.topic() + "-----partition:" + rm.partition());
}
});
}
public void close(){
if(producer != null){
producer.close();
}
}
}
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询