请教java怎么向activemq发送消息
展开全部
一.Prop类(用来读取属性文件,单例)
package com.sitinspring.standardWeblogicJms;
import java.io.FileInputStream;
import java.util.Hashtable;
import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
public class Props {
private static final String File_Name = "jmsSetting.properties";
private static Properties propts;
public static void makeProptsInstance() {
propts = new Properties();
try {
propts.load(new FileInputStream(File_Name));
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static String get(String name){
if(propts==null){
makeProptsInstance();
}
return (String)propts.get(name);
}
@SuppressWarnings("unchecked")
public static Context getInitialContext(){
Context context=null;
String jndiFactory=Props.get("jndi.factory");
String providerUrl=Props.get("jndi.provider.url");
Hashtable env=new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
env.put(Context.PROVIDER_URL, providerUrl);
try {
context=new InitialContext(env);
} catch (Exception ex) {
ex.printStackTrace();
}
return context;
}
}
二.QueueBase类(QueueComsumer和QueueSupplier的基类,用于归纳一些两类共通的东西)
package com.sitinspring.standardWeblogicJms;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
public class QueueBase{
protected QueueConnectionFactory queueConnectionFactory;
protected QueueConnection queueConnection;
protected QueueSession queueSession;
protected Queue queue;
public QueueBase(Context context){
try{
String jmsFactory=Props.get("jms.factory");
queueConnectionFactory=(QueueConnectionFactory)context.lookup(jmsFactory);
queueConnection=queueConnectionFactory.createQueueConnection();
queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName=Props.get("queue.name");
queue=(Queue)context.lookup(queueName);
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
三.QueueComsumer类(用于接收消息)
package com.sitinspring.standardWeblogicJms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.QueueReceiver;
import javax.jms.TextMessage;
import javax.naming.Context;
public class QueueComsumer extends QueueBase implements MessageListener {
private QueueReceiver queueReceiver;
public QueueComsumer(Context context) {
super(context);
try {
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage txtmsg = (TextMessage) message;
try {
System.out.println("I have received the TextMassage:"
+ txtmsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public void close() throws JMSException {
queueReceiver.close();
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueComsumer queueComsumer = new QueueComsumer(context);
synchronized (queueComsumer) {
while (true) {
try {
queueComsumer.wait();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
try {
queueComsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
}
四.QueueSupplier类(用于发送消息)
package com.sitinspring.standardWeblogicJms;
import javax.jms.JMSException;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
import javax.naming.Context;
public class QueueSupplier extends QueueBase {
private QueueSender queueSender;
public QueueSupplier(Context context) {
super(context);
try {
queueSender = queueSession.createSender(queue);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void sendMsg(String msg) throws JMSException {
TextMessage txtMsg = queueSession.createTextMessage();
txtMsg.setText(msg);
queueConnection.start();
queueSender.send(txtMsg);
}
public void close() throws JMSException {
queueSender.close();
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueSupplier queueSupplier = new QueueSupplier(context);
try {
queueSupplier.sendMsg("Hello World");
System.out.println("A message have been sent!");
} catch (JMSException ex) {
ex.printStackTrace();
} finally {
try {
queueSupplier.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
package com.sitinspring.standardWeblogicJms;
import java.io.FileInputStream;
import java.util.Hashtable;
import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
public class Props {
private static final String File_Name = "jmsSetting.properties";
private static Properties propts;
public static void makeProptsInstance() {
propts = new Properties();
try {
propts.load(new FileInputStream(File_Name));
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static String get(String name){
if(propts==null){
makeProptsInstance();
}
return (String)propts.get(name);
}
@SuppressWarnings("unchecked")
public static Context getInitialContext(){
Context context=null;
String jndiFactory=Props.get("jndi.factory");
String providerUrl=Props.get("jndi.provider.url");
Hashtable env=new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
env.put(Context.PROVIDER_URL, providerUrl);
try {
context=new InitialContext(env);
} catch (Exception ex) {
ex.printStackTrace();
}
return context;
}
}
二.QueueBase类(QueueComsumer和QueueSupplier的基类,用于归纳一些两类共通的东西)
package com.sitinspring.standardWeblogicJms;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
public class QueueBase{
protected QueueConnectionFactory queueConnectionFactory;
protected QueueConnection queueConnection;
protected QueueSession queueSession;
protected Queue queue;
public QueueBase(Context context){
try{
String jmsFactory=Props.get("jms.factory");
queueConnectionFactory=(QueueConnectionFactory)context.lookup(jmsFactory);
queueConnection=queueConnectionFactory.createQueueConnection();
queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName=Props.get("queue.name");
queue=(Queue)context.lookup(queueName);
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
三.QueueComsumer类(用于接收消息)
package com.sitinspring.standardWeblogicJms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.QueueReceiver;
import javax.jms.TextMessage;
import javax.naming.Context;
public class QueueComsumer extends QueueBase implements MessageListener {
private QueueReceiver queueReceiver;
public QueueComsumer(Context context) {
super(context);
try {
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage txtmsg = (TextMessage) message;
try {
System.out.println("I have received the TextMassage:"
+ txtmsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public void close() throws JMSException {
queueReceiver.close();
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueComsumer queueComsumer = new QueueComsumer(context);
synchronized (queueComsumer) {
while (true) {
try {
queueComsumer.wait();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
try {
queueComsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
}
四.QueueSupplier类(用于发送消息)
package com.sitinspring.standardWeblogicJms;
import javax.jms.JMSException;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
import javax.naming.Context;
public class QueueSupplier extends QueueBase {
private QueueSender queueSender;
public QueueSupplier(Context context) {
super(context);
try {
queueSender = queueSession.createSender(queue);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void sendMsg(String msg) throws JMSException {
TextMessage txtMsg = queueSession.createTextMessage();
txtMsg.setText(msg);
queueConnection.start();
queueSender.send(txtMsg);
}
public void close() throws JMSException {
queueSender.close();
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueSupplier queueSupplier = new QueueSupplier(context);
try {
queueSupplier.sendMsg("Hello World");
System.out.println("A message have been sent!");
} catch (JMSException ex) {
ex.printStackTrace();
} finally {
try {
queueSupplier.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询