Java ActiveMQ:(八)在案例中处理对象消息
1.定义消息对象
- 创建两个Maven项目一个是Producer,一个是Consumer,而后在每个项目中定义Users
- 记得在POM文件中导入ActiveMQ的坐标
import java.io.Serializable;
public class Users implements Serializable {
private int userid;
private String username;
private int userage;
public int getUserid() {
return userid;
}
public void setUserid(int userid) {
this.userid = userid;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getUserage() {
return userage;
}
public void setUserage(int userage) {
this.userage = userage;
}
@Override
public String toString() {
return "Users [userid=" + userid + ", username=" + username + ", userage=" + userage + "]";
}
}
2.创建生产者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class HelloWorldProducer {
public void sendHelloWorldActiveMQ(Users users) {
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try {
/**
* userName:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.properties 文件进行修改
* brokerURL:访问 ActiveMQ 服务的路径地址。 路径结构为:协议名://主机地址:端口号
*/
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.0.110:61616");
//创建连接对象
connection = connectionFactory.createConnection();
//启动连接
connection.start();
/**
* transacted:是否使用事务 可选值为: true|false
* true:使用事务 当设置次变量值。 Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量则 acknowledgeMode 参数必须设置
* acknowledgeMode: * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认 机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客 户端确认消息机制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-users");
//创建消息的生产者
producer = session.createProducer(destination);
//创建消息对象
message = session.createObjectMessage(users);
//发送消息
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
//回收消息发送者资源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
3.定义消息消费者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class HelloWorldConsumer {
public void readHelloWorldActiveMQ() throws JMSException {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接对象
Connection connection = null;
// 定义会话
Session session = null;
// 目的地
Destination destination = null;
// 定义消息的发送者
MessageConsumer consumer = null;
// 定义消息
Message message = null;
try {
/**
* userName:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改 brokerURL:访问 ActiveMQ 服务的路径地址。路径结构为:协议名://主机地址: 端口号
*/
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.0.110:61616");
// 创建连接对象
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
/**
* transacted:是否使用事务 可选值为:true|false true:使用事务
* 当设置次变量值。 Session.SESSION_TRANSACTED false:不适用事务,设置次变量
* 则 acknowledgeMode 参数必须设置 acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination = session.createQueue("my-users");
// 创建消息的消费者
consumer = session.createConsumer(destination);
// 创建消息对象
message = consumer.receive();
//处理消息
ObjectMessage objMessage = (ObjectMessage) message;
Users users = (Users) objMessage.getObject();
System.out.println(users);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收消息发送者资源
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
4.测试
4.1 Producer
public class test {
public static void main(String[] args) {
HelloWorldProducer producer = new HelloWorldProducer();
Users users=new Users();
users.setUserid(1);
users.setUserage(21);
users.setUsername("dqcgm");
producer.sendHelloWorldActiveMQ(users);
}
}
运行结果:

4.2 Consumer
import javax.jms.JMSException;
public class test {
public static void main(String[] args) throws JMSException {
HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.readHelloWorldActiveMQ();
}
}
运行结果:
