akka typed mysql | 您所在的位置:网站首页 › akka是什么 › akka typed mysql |
Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。 有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。 有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 对普通actor来说,你拥有一个外部API (public接口的实例) 来将方法调用异步地委托给其实现类的私有实例。 有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息, 它的劣势在于对你能做什么和不能做什么进行了一些限制,比如 你不能使用 become/unbecome. 有类型Actor是使用 JDK Proxies 实现的,JDK Proxies提供了非常简单的api来拦截方法调用。 注意 和普通Akka actor一样,有类型actor也一次处理一个消息。 什么时候使用有类型的Actor 有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。 工具箱 返回有类型actor扩展 Returns the Typed Actor Extension TypedActorExtension extension = TypedActor.get(system); //system is an instance of ActorSystem 判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or not TypedActor.get(system).isTypedActor(someReference); 返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an external Typed Actor Proxy TypedActor.get(system).getActorRefFor(someReference); 返回当前的ActorContext//Returns the current ActorContext, 此方法仅在一个TypedActor 实现的方法中有效 // method only valid within methods of a TypedActor implementation ActorContext context = TypedActor.context(); 返回当前有类型actor的外部代理//Returns the external proxy of the current Typed Actor, 此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a TypedActor implementation Squarer sq = TypedActor.self(); 返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor Extension 这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if you create other Typed Actors with this, //they will become children to the current Typed Actor. TypedActor.get(TypedActor.context()); 具体例子及说明 package practise.akka.typedactors import akka.dispatch.Future import akka.japi.Option /** * 这个就是对外的接口,各函数就是Typed Actor的接口方法 */ public interface Squarer { void squareDontCare(int i); //fire-forget Futuresquare(int i); //non-blocking send-request-reply OptionsquareNowPlease(int i);//blocking send-request-reply int squareNow(int i); //blocking send-request-reply } package practise.akka.typedactors import akka.dispatch.Future import akka.dispatch.Futures import akka.actor.TypedActor import akka.japi.Option import akka.actor.ActorContext import groovy.util.logging.Log4j import akka.actor.ActorRef /** * 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。) */ @Log4j class SquarerImpl implements Squarer, akka.actor.TypedActor.Receiver { private String name; public SquarerImpl() { this.name = "default"; } public SquarerImpl(String name) { this.name = name; } public void squareDontCare(int i) { log.debug("squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致----" + i) //可以从线程号看出是异步处理的 int sq = i * i; //Nobody cares :( //返回当前的ActorContext, // 此方法仅在一个TypedActor 实现的方法中有效 ActorContext context = TypedActor.context(); println "context ----" + context //返回当前有类型actor的外部代理, // 此方法仅在一个TypedActor 实现的方法中有效 Squarer mysq = TypedActor.self(); println "--self --" + mysq } public Futuresquare(int i) { log.debug("square send-request-reply Future----" + i) //可以从线程号看出是异步处理的 return Futures.successful(i * i, TypedActor.dispatcher()); } public OptionsquareNowPlease(int i) { log.debug("squareNowPlease send-request-reply Option----" + i) //可以从线程号看出是异步处理的 return Option.some(i * i); } public int squareNow(int i) { log.debug("squareNow send-request-reply result----" + i) //可以从线程号看出是异步处理的 return i * i; } @Override void onReceive(Object o, ActorRef actorRef) { log.debug("TypedActor收到消息----${o}---from:${actorRef}") } } package practise.akka.typedactors import akka.actor.ActorSystem import akka.actor.TypedActor import akka.actor.TypedProps import com.typesafe.config.ConfigFactory import akka.japi.Creator import groovy.util.logging.Log4j import akka.actor.ActorContext /** * 这里创建Typed Actor. */ @Log4j class TypedActorsFactory { ActorSystem system private final String config = """akka { loglevel = "${log?.debugEnabled ?"DEBUG":"INFO"}" actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty.hostname = "127.0.0.1" remote.netty.port = 2552 remote.log-received-messages = on remote.log-sent-messages = on }""" TypedActorsFactory(String sysName) { this.system = ActorSystem.create(sysName, ConfigFactory.parseString(config)) } Squarer getTypedActorDefault() { Squarer mySquarer = TypedActor.get(system).typedActorOf(new TypedProps(Squarer.class, SquarerImpl.class)); //这里创建的是代理类型 return mySquarer } Squarer getTypedActor(String name) { Squarer otherSquarer = TypedActor.get(system).typedActorOf(new TypedProps(Squarer.class, new Creator() { public SquarerImpl create() { return new SquarerImpl(name); } //这里创建的是具体的实现类型 }), name); //这个name是actor的name:akka//sys@host:port/user/name return otherSquarer } } 下面用几个测试用例实验一下 package practise.akka.typedactors import akka.actor.ActorRef import akka.actor.TypedActor import akka.actor.UntypedActorContext import akka.dispatch.Future import com.baoxian.akka.AkkaClientNoReply import com.baoxian.akka.AkkaServerApp class TestTypedActors extends GroovyTestCase { def testTypeActor() { println("----") TypedActorsFactory factory = new TypedActorsFactory("typedServer") // Squarer squarer = factory?.getTypedActorDefault() //创建代理 Squarer squarer = factory?.getTypedActor("serv") //具体实现 squarer?.squareDontCare(10) Future future = squarer?.square(10) AkkaServerApp app = new AkkaServerApp("tmp", "127.0.0.1", 6666, "result") //这是我自己构建的接收器 app.messageProcessor = {msg, UntypedActorContext context -> log.info("结果为" + msg) } app.startup() akka.pattern.Patterns.pipe(future).to(app.serverActor) //Future的返回结果pipe到接收器中了,在log中能看到结果 println "----" + squarer?.squareNowPlease(10)?.get() println "----" + squarer?.squareNow(10) //返回有类型actor扩展 TypedActor.get(factory.system) //返回一个外部有类型actor代理所代表的Akka actor ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer); actor.tell("消息") //这个消息将会在SquarerImpl的onReceive方法中接收到 sleep(1000 * 60 * 10) // TypedActor.get(factory.system).stop(squarer); //这将会尽快地异步终止与指定的代理关联的有类型Actor TypedActor.get(factory.system).poisonPill(squarer);//这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它 } def testRemoteTypedActor() { AkkaClientNoReply client = new AkkaClientNoReply("akka://[email protected]:2552/user/serv") client.send("远程消息") //这将会在SquarerImpl的onReceive方法中接收到 sleep(1000) client.shutdown() } } 本文原创发布php中文网,转载请注明出处,感谢您的尊重! |
CopyRight 2018-2019 实验室设备网 版权所有 |