求教,如何使用Python向activeMQ发送ByteMessage类型的消息
1个回答
展开全部
你好,具体代码可以参考下面的:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
//发送TextMessage
public class SendMessage {
private static final String url = "tcp://localhost:61616";;
private static final String QUEUE_NAME = "choice.queue";
protected String expectedBody = "<hello>world!</hello>";
public void sendMessage() throws JMSException{
Connection connection = null;
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(expectedBody);
message.setStringProperty("headname", "remoteB");
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
connection.close();
}
}
***************************************************************************************
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
//发送BytesMessage
public class SendMessage {
private String url = "tcp://localhost:61616";
public void sendMessage() throws JMSException{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] content = getFileByte("d://test.jar");
message.writeBytes(content);
try{
producer.send(message);
System.out.println("successful send message");
}catch(Exception e){
e.printStackTrace();
e.getMessage();
}finally{
session.close();
connection.close();
}
}
private byte[] getFileByte(String filename){
byte[] buffer = null;
FileInputStream fin = null;
try {
File file = new File(filename);
fin = new FileInputStream(file);
buffer = new byte[fin.available()];
fin.read(buffer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return buffer;
}
发送完消息后可以访问
http://localhost:8161/admin/queues.jsp
看到相应的queue中是否有消息
适用收取TextMessage消息
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ReceiveMessage {
private static final String url = "tcp://172.16.168.167:61616";
private static final String QUEUE_NAME = "szf.queue";
public void receiveMessage(){
Connection connection = null;
try{
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}catch(Exception e){
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// connection = connectionFactory.createConnection();
}
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection,session,consumer);
}catch(Exception e){
}
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException {
for (int i = 0; i < 1;) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message){
try{
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
//发送TextMessage
public class SendMessage {
private static final String url = "tcp://localhost:61616";;
private static final String QUEUE_NAME = "choice.queue";
protected String expectedBody = "<hello>world!</hello>";
public void sendMessage() throws JMSException{
Connection connection = null;
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(expectedBody);
message.setStringProperty("headname", "remoteB");
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
connection.close();
}
}
***************************************************************************************
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
//发送BytesMessage
public class SendMessage {
private String url = "tcp://localhost:61616";
public void sendMessage() throws JMSException{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] content = getFileByte("d://test.jar");
message.writeBytes(content);
try{
producer.send(message);
System.out.println("successful send message");
}catch(Exception e){
e.printStackTrace();
e.getMessage();
}finally{
session.close();
connection.close();
}
}
private byte[] getFileByte(String filename){
byte[] buffer = null;
FileInputStream fin = null;
try {
File file = new File(filename);
fin = new FileInputStream(file);
buffer = new byte[fin.available()];
fin.read(buffer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return buffer;
}
发送完消息后可以访问
http://localhost:8161/admin/queues.jsp
看到相应的queue中是否有消息
适用收取TextMessage消息
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ReceiveMessage {
private static final String url = "tcp://172.16.168.167:61616";
private static final String QUEUE_NAME = "szf.queue";
public void receiveMessage(){
Connection connection = null;
try{
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}catch(Exception e){
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// connection = connectionFactory.createConnection();
}
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection,session,consumer);
}catch(Exception e){
}
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException {
for (int i = 0; i < 1;) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message){
try{
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
本回答被提问者和网友采纳
已赞过
已踩过<
评论
收起
你对这个回答的评价是?
威孚半导体技术
2024-08-19 广告
2024-08-19 广告
威孚(苏州)半导体技术有限公司是一家专注生产、研发、销售晶圆传输设备整机模块(EFEM/SORTER)及核心零部件的高科技半导体公司。公司核心团队均拥有多年半导体行业从业经验,其中技术团队成员博士、硕士学历占比80%以上,依托丰富的软件底层...
点击进入详情页
本回答由威孚半导体技术提供
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询