请教java向kafka发消息

java代码如下:publicstaticvoidmain(String[]args){Propertiesproperties=newProperties();prop... java代码如下:
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.60.128:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.60.128:9092");// 声明kafka broker
Producer producer = new Producer<Integer, String>(new ProducerConfig(properties));
producer.send(new KeyedMessage<Integer, String>("Qbit_topic", "message " +));
}
在服务器上执行
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Qbit_topic
是能正常发消息的
java代码执行了也没有报错,但是consumer没有收到消息
展开
 我来答
沙漠水族
推荐于2018-04-06 · TA获得超过218个赞
知道答主
回答量:11
采纳率:100%
帮助的人:6.8万
展开全部

这是我们项目中用到的代码

public class ProducerService {
 private static Logger log = Logger.getLogger(ProducerService.class);
 private static  Producer<String,String> producer = null;
 private static String serviceIp = PropertiesUtils.getValue("/epoo.properties","bootstrap.servers");
 private static String serviceName = PropertiesUtils.getValue("/epoo.properties","name");   
    public boolean initProducer(){
        Properties props = new Properties();

       
        //dataPlace.getIp()
        props.put("bootstrap.servers", serviceIp);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "-1");
        producer = new KafkaProducer(props);   
        try{
            List<PartitionInfo> list = producer.partitionsFor(serviceName);
            
        }catch(Exception e){
            JOptionPane.showMessageDialog(null, e.getMessage(), "错误", JOptionPane.YES_OPTION);
            log.error(e.getMessage());
            return false;
        }
        return true;
    }
    
    public  void sendData(String mess){
        if(producer == null){
            initProducer();
        }
        
        producer.send(new ProducerRecord<String,String>(serviceName,mess),new Callback() {
            @Override
            public void onCompletion(RecordMetadata rm, Exception e) {
                if(e != null){
                    e.printStackTrace();
                    log.error(e.getMessage());
                }
                System.out.println("发送到服务器的Offset: " + rm.offset() + "-----Topic:" + rm.topic() + "-----partition:" + rm.partition());
            }
        });
    }
    
    public  void close(){
        if(producer != null){
            producer.close();
        }
    }
}
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式