ActiveMq–学习日记

activeMq 简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是1个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,虽然JMS规范出台已是很久的事情了,但是JMS在现今的J2EE利用中间依然扮演着特殊的地位。

特性:

  • 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。利用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

  • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

  • 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5
    resource adaptors的配置,可让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

  • 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

  • 支持通过JDBC和journal提供高速的消息持久化

  • 从设计上保证了高性能的集群,客户端-服务器,点对点

  • 支持Ajax

  • 支持与Axis的整合

  • 可以很容易得调用内嵌JMS provider,进行测试

JMS简介

JMS源于企业利用对消息中间件的需求,使利用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作火伴设计的JMS API定义了1组公共的利用程序接口和相应语法,使得Java程序能够和其他消息组件进行通讯。JMS有4个组成部份:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

1. JMS服务提供者

JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已是J2EE API的1部份,J2EE服务器都提供JMS服务。

2. 消息管理对象

消息管理对象提供对消息进行操作的API。JMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

3. 消息的生产者消费者

消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的定阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。

4. JMS消息

JMS消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下3部份组成:消息头(header)、属性(property)和消息体(body)。

5. 消息标头

消息标头是消息的信封,包括为使消息到达目的地所需要的所有信息,可以直接控制其中1些字段的值,其它值则由JMS提供程序填写。

JMS消息头包括了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息肯定路由。

JMSDestination: 由Send方法设置。指定消息的目的地,由JMS提供程序填写

JMSDeliveryMode: 由Send方法设置。提交消息的模式-延续或非延续。发送消息后JMS提供程序填写该字段。

JMSMessageID: 由Send方法设置。包括消息的唯1标识符。发送进程中由JMS提供程序填写

JMSTimeStamp: 由Send 方法设置。记录消息被传递给send方法的时间。发送进程中由JMS提供程序填写

JMSCorrelationID: 由客户端设置。包括用于将消息连接在1起的ID。客户端1般将其置为所援用消息的ID

JMSReplyTo: 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段

JMSRedelivered: 由JMS提供程序设置。指出该消息先前被发送过

JMSType: 由客户端设置。包括由客户端提供的消息类型标识符。是不是需要该字段,不同的提供程序有不同要求

JMSExpiration: Send 方法设置。1个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则烧毁消息

JMSPriority: Send 方法设置。包括客户端在发送消息时所设置有限级值

5. 消息属性

消息属性,用来添加删除消息头之外的附加信息。除上面的属性,还可以自定义属性,以便进行消息的选择 。1般通过setXXXProperty方法来定义消息属性,XXX取值为:Boolean、Byte、Double、Float、Int、Long、Object、Short及String。每属性均由字符串名字和相干的值组成 ,例如:

TextMessage msg = tsession.createTextMessage();

msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);

String customer = msg.getStringProperty(“CUSTOMER_NAME”);

其中的”CUSTOMER_NAME”和”MyCustomer”就是消息当中对应的key和value。

6. 消息主体

消息主体包括了消息的核心数据。

JMS 定义了5中消息类型: TextMessage、MapMessage、BytesMessage、

StreamMessage和ObjectMessage

选择最适合的消息类型可使JMS最有效 的处理消息。

  • TextMessage(文本消息)

将数据作为简单字符串寄存在主体中(XML就能够作为字符串发)

TextMessage msg = session.createTextMessage();

msg.setText(text);

有些厂商支持1种XML专用的消息格式,带来了便利,但是否是标准的JMS类型,影响移植性。只自己定义了两个方法setText(String s)、getText()

  • MapMessage(映照消息)

使用1张映照表来寄存其主体内容(参照Jms API)

MapMessage msg = session.createMapMessage();

msg.setString(“CUSTOMER_NAME”,”John”);

msg.setInt(“CUSTOMER_AGE”,12);

String s = msg.getString(“CUSTOMER_NAME”);

int age = msg.getInt(“CUSTOMER_AGE”);

  • BytesMessage(字节消息)

将字节流寄存在消息主体中。合适于以下情况:必须紧缩发送的大量数据、需要与现有

消息格式保持1致等(参照Jms API)

byte[] data;

BytesMessage msg = session.createBytesMessage();

msg.wirte(data);

byte[] msgData = new byte[256];

int bytesRead = msg.readBytes(msgData);

  • StreamMessage(流消息)

用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这类

消息格式时,收发双方事前协商好字段的顺序,以保证写读顺序相同(参照Jms API)

StringMessage msg = session.createStreamMessage();

msg.writeString(“John”);

msg.writeInt(12);

String s = msg.readString();

Int age = msg.readInt();

(PS:个人认为有点像socket的信息收发)

  • ObjectMessage(对象消息)

用于往消息中写入可序列化的对象。

消息中可以寄存1个对象,如果要寄存多个对象,需要建立1个对象集合,然后把这个集合写入消息。

客户端接收到1个ObjectMessage时,是read-only模式。如果1个客户端试图写message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写自己只单独定义了两个方法:getObject()和setObject(Serializable s)

ObjectMessage包括的只是object的1个快照,set以后object的修改对ObjectMessage的body无效 (从两个方法可以看出,这类消息已强迫要你实现java.io. Serializable接口)

Message只读时被set抛出MessageNotWriteableException;

set和get时,如果对象序列化失败抛出MessageFormatException

7. 消息的通讯方式(点对点通讯和发布/定阅方式)

点对点方式(point-to-point)

点对点的消息发送方式主要建立在 Message Queue、Sender、Receiver上,

Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue ,而 Receiver Client从Queue中接收消息和”发送消息已接受”到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任什么时候刻发送信息到Queue,而不需要知道接收客户端是否是在运行。
发布/定阅方式(publish/subscriber Messaging)

发布/定阅方式用于多接收客户真个方式.作为发布定阅的方式,可能存在多个接收客户

端,并且接收端客户端与发送客户端存在时间上的依赖。1个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

编程模式

消息产生者向JMS发送消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection建立会话Session
  4. 使用会话Session和管理对象Destination创建消息生产者MessageSender
  5. 使用消息生产者MessageSender发送消息

消息消费者从JMS接受消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection 建立会话Session
  4. 使用会话Session和管理对象Destination创建消息消费者MessageReceiver
  5. 使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

ActiveMQ运行

ActiveMQ5.3版本默许启动时,启动了内置的jetty服务器,提供1个demo利用和用于监控ActiveMQ的admin利用。运行%activemq_home%bin/目录下的 activemq.bat , 以后你会看见以下1段话表示启动成功。

打开http://localhost:8161/admin/ ,可以查看消息队列的管理控台,以下截图:
这里写图片描述

点对点通讯模式demo

消息发送者:

package com.ainong.demo.p2pqueue;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
*
* <b>function:</b> Queue 方式消息发送者
*
* @author dong.gang
*
*/

public class QueueSender {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";

// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION_PAYMENT = "mq.queue.payment";
public static final String DESTINATION_QUERY = "mq.queue.query";

public QueueSender() {

}

/**
*
* <b>function:</b> 发送消息
*
* @throws Exception
*/

public void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {

for (int i = 0; i < SEND_NUM; i++) {

JSONObject msgJson = new JSONObject();
msgJson.put("seqId", i+1);
msgJson.put("content", "发送第" + (i + 1) + "条消息");

MapMessage map = session.createMapMessage();
map.setString("msg", msgJson.toJSONString());
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
sender.send(map);
}
}

public void run(String mode) throws Exception {

QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建1个连接
connection = factory.createQueueConnection();
// 启动连接
connection.start();
// 创建1个session会话
session = connection.createQueueSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
String queueDis = "1".equals(mode) ? DESTINATION_PAYMENT : DESTINATION_QUERY;
// 创建1个消息队列
Queue queue = session.createQueue(queueDis);
// 创建消息发送者
javax.jms.QueueSender sender = session.createSender(queue);
// 设置持久化模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, sender);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}

public static void main(String[] args) throws Exception {

QueueSender sender = new QueueSender();
// sender.run("1");
sender.run("2");
}
}

消息接收者:

package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
*
* <b>function:</b> 消息接收者
*
* @author donggang
*
*/

public class QueueReceiver {

// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String TARGET_PAYMENT = "mq.queue.payment";
public static final String TARGET_QUERY = "mq.queue.query";

public void run(String mode) throws Exception {

QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建1个连接
connection = factory.createQueueConnection();
// 启动连接
connection.start();
// 创建1个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建1个消息队列
String queueTarg = "1".equals(mode) ? TARGET_PAYMENT : TARGET_QUERY;
Queue queue = session.createQueue(queueTarg);

// 创建消息制作者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
JSONObject jsonObj = JSONObject.parseObject(map.getString("msg"));
if ("3".equals(jsonObj.getString("seqId"))) {
System.out.println(map.getLong("time") + "接收#" + map.getString("msg"));
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 提交会话
session.commit();
// 休眠1s再关闭
Thread.sleep(1000);

} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}

public static void main(String[] args) throws Exception {

QueueReceiver receiver = new QueueReceiver();
// receiver.run("1");
receiver.run("2");
}
}

其实上边的消息接收者已集成了消息监听类,如果我们需要分离业务操作,可以 receiver.setMessageListener()参数中设为我们的业务监听处理类(需要实现MessageListener类):

package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

public class QueueMsgListenner implements MessageListener {

@Override
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + "接收#" + map.getString("id000"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

发布/定阅模式 demo

消息发布者:

package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

import com.alibaba.fastjson.JSONObject;

public class Publisher {
protected static String brokerURL = "tcp://localhost:61616";
protected static transient ConnectionFactory factory;
protected transient Connection connection;
protected transient Session session;
protected transient MessageProducer producer;

public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
try {
// 持久化定阅(消息会保存,特定的消费者可以1段时间落后行消费)
connection.setClientID("client1");
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
// 发送消息时用使用持久模式(不设置,默许就是持久的)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}

public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}

protected void sendMessage(JSONObject msg) throws JMSException {
Destination destination = session.createTopic("demo");
Message message = createMessage(msg, session);
System.out.println("消息发送: "
+ ((ActiveMQMapMessage) message).getContentMap()
+ " on destination: " + destination);
producer.send(destination, message);
}

protected Message createMessage(JSONObject msg, Session session)
throws JMSException {

MapMessage message = session.createMapMessage();
message.setString("id", msg.getString("id"));
message.setString("content", msg.getString("content"));
return message;
}

public static void main(String[] args) throws JMSException,
InterruptedException {
Publisher publisher = new Publisher();
for (int i = 0; i < 3; i++) {
JSONObject msgObject = new JSONObject();
msgObject.put("id", "msg00" + i);
msgObject.put("content", "第" + i + "条消息");
publisher.sendMessage(msgObject);
Thread.sleep(3000);
}
publisher.close();
}
}

消息定阅者1(持久化定阅)

package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 持久化定阅模式(消息会保存,消费者可以在任意时间消费消息)
*
* @author DG
*
*/

public class Consumer1 {

private static String brokerURL = "tcp://localhost:61616";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;

public Consumer1() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.setClientID("client1");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}

public Session getSession() {
return session;
}

public static void main(String[] args) throws JMSException {
Consumer1 consumer = new Consumer1();
Topic topic = consumer.getSession().createTopic("demo");
// 普通定阅
// MessageConsumer messageConsumer =
// consumer.getSession().createConsumer(
// destination);
// 持久化定阅
MessageConsumer messageConsumer =consumer.getSession().createDurableSubscriber(topic,"client1"); //持久定阅
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage) message;
String id = map.getString("id");
String content = map.getString("content");
System.out.println("消费者1,消息接收:id = " + id + ";content = "
+ content);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

常规定阅者:

package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 常规定阅模式(消费者必须在消息生产前就已启动监听消息,否则错过消息以后,消费就会失效,不会被处理)
*
* @author DG
*
*/

public class Consumer2 {

private static String brokerURL = "tcp://localhost:61616";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;

public Consumer2() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}

public Session getSession() {
return session;
}

public static void main(String[] args) throws JMSException {
Consumer2 consumer = new Consumer2();
Destination destination = consumer.getSession().createTopic("demo");
MessageConsumer messageConsumer = consumer.getSession().createConsumer(
destination);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage) message;
String id = map.getString("id");
String content = map.getString("content");
System.out.println("消费者2:消息接收:id = " + id + ";content = "
+ content);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}

上面只是我初识activemq时的1些心得,附代码源码:
http://download.csdn.net/detail/donggang1992/9561041

波比源码 – 精品源码模版分享 | www.bobi11.com
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!

波比源码 » ActiveMq–学习日记

发表评论

Hi, 如果你对这款模板有疑问,可以跟我联系哦!

联系站长
赞助VIP 享更多特权,建议使用 QQ 登录
喜欢我嘛?喜欢就按“ctrl+D”收藏我吧!♡