Java ActiveMQ:(八)在案例中处理对象消息

1.定义消息对象

  • 创建两个Maven项目一个是Producer,一个是Consumer,而后在每个项目中定义Users
  • 记得在POM文件中导入ActiveMQ的坐标
import java.io.Serializable;

public class Users implements Serializable {
    private int userid;
    private String username;
    private int userage;

    public int getUserid() {
        return userid;
    }

    public void setUserid(int userid) {
        this.userid = userid;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public int getUserage() {
        return userage;
    }

    public void setUserage(int userage) {
        this.userage = userage;
    }

    @Override
    public String toString() {
        return "Users [userid=" + userid + ", username=" + username + ", userage=" + userage + "]";
    }
}

2.创建生产者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class HelloWorldProducer {
    public void sendHelloWorldActiveMQ(Users users) {
        //定义链接工厂
        ConnectionFactory connectionFactory = null;
        //定义链接对象
        Connection connection = null;
        //定义会话
        Session session = null;
        //目的地
        Destination destination = null;
        //定义消息的发送者
        MessageProducer producer = null;
        //定义消息
        Message message = null;
        try {
            /**
             * userName:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.properties 文件进行修改
             * password:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.properties 文件进行修改
             * brokerURL:访问 ActiveMQ 服务的路径地址。 路径结构为:协议名://主机地址:端口号
             */
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.0.110:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            /**
             * transacted:是否使用事务 可选值为: true|false
             * true:使用事务 当设置次变量值。 Session.SESSION_TRANSACTED
             * false:不适用事务,设置次变量则 acknowledgeMode 参数必须设置
             * acknowledgeMode: * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客 户端确认消息机制
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-users");
            //创建消息的生产者
            producer = session.createProducer(destination);
            //创建消息对象
            message = session.createObjectMessage(users);
            //发送消息
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

3.定义消息消费者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class HelloWorldConsumer {
    public void readHelloWorldActiveMQ() throws JMSException {
        // 定义链接工厂
        ConnectionFactory connectionFactory = null;
        // 定义链接对象
        Connection connection = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination destination = null;
        // 定义消息的发送者
        MessageConsumer consumer = null;
        // 定义消息
        Message message = null;
        try {
            /**
             * userName:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.
             * properties 文件进行修改
             * password:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过 jetty-ream.
             * properties 文件进行修改 brokerURL:访问 ActiveMQ 服务的路径地址。路径结构为:协议名://主机地址: 端口号
             */
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.0.110:61616");
            // 创建连接对象
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            /**
             * transacted:是否使用事务 可选值为:true|false true:使用事务
             * 当设置次变量值。 Session.SESSION_TRANSACTED false:不适用事务,设置次变量
             * 则 acknowledgeMode 参数必须设置 acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-users");
            // 创建消息的消费者
            consumer = session.createConsumer(destination);
            // 创建消息对象
            message = consumer.receive();
            //处理消息
            ObjectMessage objMessage = (ObjectMessage) message;
            Users users = (Users) objMessage.getObject();
            System.out.println(users);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 回收消息发送者资源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4.测试

4.1 Producer

public class test {
    public static void main(String[] args) {
        HelloWorldProducer producer = new HelloWorldProducer();
        Users users=new Users();
        users.setUserid(1);
        users.setUserage(21);
        users.setUsername("dqcgm");
        producer.sendHelloWorldActiveMQ(users);
    }
}

运行结果:
设置

4.2 Consumer

import javax.jms.JMSException;

public class test {
    public static void main(String[] args) throws JMSException {
        HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.readHelloWorldActiveMQ();
    }
}

运行结果:
设置