博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
(转) RabbitMQ学习之helloword(java)
阅读量:6369 次
发布时间:2019-06-23

本文共 4216 字,大约阅读时间需要 14 分钟。

http://blog.csdn.net/zhu_tianwei/article/details/40835555

amqp-client:

1.依赖jar包

<dependency>

<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>

2.生产者代码Send.

 

[java]   
 
 
  1. package cn.slimsmart.rabbitmq.demo.helloword;  
  2.   
  3. import com.rabbitmq.client.AMQP;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7.   
  8. public class Send {  
  9.     //消息队列名称  
  10.     private final static String QUEUE_NAME = "helloword";    
  11.   
  12.     public static void main(String[] args) throws Exception {  
  13.           /**  
  14.          * 创建连接连接到MabbitMQ  
  15.          */    
  16.         ConnectionFactory factory = new ConnectionFactory();    
  17.         //设置MabbitMQ所在主机ip或者主机名    
  18.         factory.setHost("192.168.101.174");    
  19.         //指定用户 密码  
  20.         factory.setUsername("admin");  
  21.         factory.setPassword("admin");  
  22.         //指定端口  
  23.         factory.setPort(AMQP.PROTOCOL.PORT);  
  24.         //创建一个连接    
  25.         Connection connection = factory.newConnection();    
  26.         //创建一个频道    
  27.         Channel channel = connection.createChannel();    
  28.         //指定一个队列    
  29.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
  30.         //发送的消息    
  31.         String message = "hello world!";    
  32.         //往队列中发出一条消息    
  33.         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());    
  34.         System.out.println("Sent Message:'" + message + "'");    
  35.         //关闭频道和连接    
  36.         channel.close();    
  37.         connection.close();    
  38.     }  
  39.   
  40. }  

 

3.消费者代码Receive.java

 

[java]   
 
 
  1. package cn.slimsmart.rabbitmq.demo.helloword;  
  2.   
  3. import com.rabbitmq.client.AMQP;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8.   
  9. public class Receive {  
  10.       
  11.     //消息队列名称  
  12.     private final static String QUEUE_NAME = "helloword";    
  13.   
  14.     public static void main(String[] args) throws Exception {  
  15.          //打开连接和创建频道,与发送端一样    
  16.         ConnectionFactory factory = new ConnectionFactory();    
  17.         factory.setHost("192.168.101.174");    
  18.         //指定用户 密码  
  19.         factory.setUsername("admin");  
  20.         factory.setPassword("admin");  
  21.         //指定端口  
  22.         factory.setPort(AMQP.PROTOCOL.PORT);  
  23.         //创建一个连接    
  24.         Connection connection = factory.newConnection();    
  25.         //创建一个频道    
  26.         Channel channel = connection.createChannel();    
  27.         //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。    
  28.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);    
  29.             
  30.         //创建队列消费者    
  31.         QueueingConsumer consumer = new QueueingConsumer(channel);    
  32.         //指定消费队列    
  33.         channel.basicConsume(QUEUE_NAME, true, consumer);    
  34.         while (true)    
  35.         {    
  36.             //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)    
  37.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
  38.             String message = new String(delivery.getBody());    
  39.             System.out.println("Received Message:'" + message + "'");    
  40.         }    
  41.     }  
  42.   
  43. }  

 

如果运行出现如下异常,可能创建的用户没有访问权限。

 

[java]   
 
 
  1. Exception in thread "main" java.io.IOException  
  2.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)  
  3.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)  
  4.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)  
  5.     at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:388)  
  6.     at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)  
  7.     at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)  
  8.     at cn.slimsmart.rabbitmq.demo.test.Test.main(Test.java:18)  
  9. Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset  
  10.     at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)  
  11.     at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)  
  12.     at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)  
  13.     at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)  
  14.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)  
  15.     ... 4 more  
  16. Caused by: java.net.SocketException: Connection reset  
  17.     at java.net.SocketInputStream.read(Unknown Source)  
  18.     at java.net.SocketInputStream.read(Unknown Source)  
  19.     at java.io.BufferedInputStream.fill(Unknown Source)  
  20.     at java.io.BufferedInputStream.read(Unknown Source)  
  21.     at java.io.DataInputStream.readUnsignedByte(Unknown Source)  
  22.     at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)  
  23.     at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)  
  24.     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)  

需要对该用户进行授权,登录web控制台后,点击"admin",进入需要授权的用户,在Admin标签页下点击新增的用户"admin",进入授权页面,默认直接点击"set permission"即可

 

实例代码:

你可能感兴趣的文章
syslog:类Unix系统常用的log服务
查看>>
使用Annotation设计持久层
查看>>
深入实践Spring Boot2.4.1 Neo4j依赖配置
查看>>
Zen Cart 如何添加地址栏上的小图标
查看>>
SecureCrt 连接Redhat linux
查看>>
[NHibernate]持久化类(Persistent Classes)
查看>>
如何在Hive中使用Json格式数据
查看>>
linux如何恢复被删除的热文件
查看>>
Eclipse(MyEclipse) 自动补全
查看>>
Struts2中dispatcher与redirect的区别
查看>>
zabbix agentd configure
查看>>
地图点聚合优化方案
查看>>
Google Chrome 快捷方式
查看>>
备考PMP心得体会
查看>>
vue proxy匹配规则
查看>>
线上应用故障排查之一:高CPU占用
查看>>
Extend Volume 操作 - 每天5分钟玩转 OpenStack(56)
查看>>
IronPython教程
查看>>
squid via检测转发循环
查看>>
计算分页
查看>>