Akka是一个高性能,高容错的的分布式框架,并且对Camel也提供了很好的支持,下面创建一个Akka Camel的demo,运行环境:CentOS7 + IntelliJ + JDK8。这个demo分别创建一个Producer和Consumer,实现Redis的pub/sub功能。
1,创建Maven工程,加入dependencies,pom.xml文件如下:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-redis</artifactId>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-stream</artifactId>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-camel_2.11</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
2,分别创建MyRedisProducer和MyRedisConsumer类,这两个类分别继承akka提供的UntypedProducerActor和UntypedConsumerActor,用于产生和消费消息,代码如下:
/**
* Created by sam on 5/9/16.
*/
public class MyRedisProducer extends UntypedProducerActor {
public void preStart() {
super.preStart();
}
@Override
public String getEndpointUri() {
return "spring-redis://localhost:9999?connectionFactory=#connectionFactory&serializer=#serializer";
}
@Override
public void onRouteResponse(Object message) {
System.out.println("response from route:{}" + message);
}
@Override
public Object onTransformOutgoingMessage(Object message) {
if (message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message;
return camelMessage;
} else {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("CamelRedis.Command", "PUBLISH");
headers.put("CamelRedis.Channel", "testChannel");
headers.put("CamelRedis.Message", message.toString());
CamelMessage camelMessage = new CamelMessage(message, headers);
return camelMessage;
}
}
}
在MyRedisProducer类中,需要预处理收到的消息,为消息设置headers属性,Camel Redis组件是根据headers属性类执行命令的。
getEndpointUri中用到的connectionFactory和serializer,会在创建actor的时候进行定义。
/**
* Created by sam on 5/9/16.
*/
public class MyRedisConsumer extends UntypedConsumerActor {
@Override
public String getEndpointUri() {
return "spring-redis://localhost:9999?connectionFactory=#connectionFactory&serializer=#serializer&channels=testChannel&command=SUBSCRIBE";
}
@Override
public void onReceive(Object o) throws Exception {
System.out.println(o);
if (o instanceof CamelMessage) {
CamelMessage msg = (CamelMessage) o;
System.out.println(msg.getBodyAs(String.class, getCamelContext()));
}
}
}
MyRedisConsumer会将接收到的消息放在message body中。
3,创建Actor,代码如下:
/**
* Created by sam on 5/9/16.
*/
public class RedisTest {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("redis-actor");
Camel camel = CamelExtension.get(system); // 获取Camel对象,该对象可以直接操作Camel,比如获取CamelContext对象等。
PropertyPlaceholderDelegateRegistry delegateRegistry = (PropertyPlaceholderDelegateRegistry) camel.context().getRegistry();
JndiRegistry registry = (JndiRegistry) delegateRegistry.getRegistry(); // Apache Camel默认使用JndiRegistry来注册类信息。
if (registry.lookup("connectionFactory") == null && registry.lookup("serializer") == null) {
// 添加beans
JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
connectionFactory.setHostName("localhost");
connectionFactory.setPassword("1234567890");
connectionFactory.setPort(9999);
// call this method to initialize connection factory
connectionFactory.afterPropertiesSet();
registry.bind("connectionFactory", connectionFactory);
registry.bind("serializer", new StringRedisSerializer());
}
// 创建producer和consumer
ActorRef producer = system.actorOf(Props.create(MyRedisProducer.class), "redisProducer");
ActorRef consumer = system.actorOf(Props.create(MyRedisConsumer.class), "redisConsumer");
while (true) {
Thread.sleep(1000);
producer.tell(new Date().toString(), ActorRef.noSender());
}
}
}
在这段代码中,先获得默认的JndiRegistry对象,并注册connectionFactory和serializer beans,注意使用JndiRegistry时,需要在资源文件中添加jndi.properties文件,内容如下:
java.naming.factory.initial = org.apache.camel.util.jndi.CamelInitialContextFactory
最后使用producer来发送消息,在consumer中,会得到输出,结果如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
response from route:{}CamelMessage(null, Map())
CamelMessage(Thu May 12 09:38:04 CST 2016, Map(MessageExchangeId -> ID-localhost-localdomain-33258-1463017082575-0-2, breadcrumbId -> ID-localhost-localdomain-33258-1463017082575-0-1, CamelRedis.Channel -> testChannel, CamelRedis.Pattern -> [B@5910162e))
Thu May 12 09:38:04 CST 2016
response from route:{}CamelMessage(null, Map())
CamelMessage(Thu May 12 09:38:05 CST 2016, Map(MessageExchangeId -> ID-localhost-localdomain-33258-1463017082575-0-4, breadcrumbId -> ID-localhost-localdomain-33258-1463017082575-0-3, CamelRedis.Channel -> testChannel, CamelRedis.Pattern -> [B@4154c5ce))
Thu May 12 09:38:05 CST 2016
response from route:{}CamelMessage(null, Map())
CamelMessage(Thu May 12 09:38:06 CST 2016, Map(MessageExchangeId -> ID-localhost-localdomain-33258-1463017082575-0-6, breadcrumbId -> ID-localhost-localdomain-33258-1463017082575-0-5, CamelRedis.Channel -> testChannel, CamelRedis.Pattern -> [B@2cb03be0))
Thu May 12 09:38:06 CST 2016
转自:https://www.cnblogs.com/zengbiaobiao2016/p/5484502.html
