AKKA 邮箱Mailbox

  • 作者:新网
  • 来源:新网
  • 2017-10-21 11:13:26

邮箱Mailbox Akka Mailbox持有发给actor的消息。通常,每一个Actor都与自己的邮箱,但是当使用BalancingPool时,所有的路由都共享一个邮箱实例。 邮箱选择 Actor需要的消息队列类型 特定类型的actor可以用特定类型的消息队...

邮箱Mailbox

Akka Mailbox持有发给actor的消息。通常,每一个Actor都与自己的邮箱,但是当使用BalancingPool时,所有的路由都共享一个邮箱实例。

t01144d31d02dd7ebfb.jpg

邮箱选择

Actor需要的消息队列类型

特定类型的actor可以用特定类型的消息队列,只要这个actor实现了参数化的接口RequiresMessageQueue。这里是一个例子:

import akka.dispatch.BoundedMessageQueueSemantics;import akka.dispatch.RequiresMessageQueue;public class MyBoundedUntypedActor extends MyUntypedActorimplements RequiresMessageQueue<BoundedMessageQueueSemantics> {}

RequiresMessageQueue接口的类型参数需要在配置中映射到一个邮箱,就像这样:

bounded-mailbox {mailbox-type = "akka.dispatch.BoundedMailbox"mailbox-capacity = 1000mailbox-push-timeout-time = 10s}akka.actor.mailbox.requirements {"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox}

现在每次你创建一个类型为MyBoundedUntypedActor的actor,它都将会尝试获取一个有界邮箱。如果actor在部署时配置了不同的邮箱,可能是直接配置的,也可能是通过带有特定邮箱类型的分发器,那么就会覆写这个映射。

注意

为Actor创建的邮箱中的队列类型用接口中要求的类型进行检查,如果队列没有实现要求的类型,那么actor创建就会失败。

Dispatcher需要的消息队列类型

分发器也需要一个邮箱类型,用于运行中的actor。一个例子就是BalancingDispatcher,它需要一个并发的、线程安全的消息队列。这样的需求可以在分发器配置中进行规划,就像这样:

my-dispatcher {mailbox-requirement = org.example.MyInterface}

给定的需求命名了一个类或者接口,必须保证这个类或者接口是消息队列实现的超类型。万一冲突了,例如如果actor需要一个邮箱类型,但是它不满足这个需求,那么actor创建就会失败。

 

如何选择邮箱类型

当创建actor时,ActorRefProvider首先确定分发器,分发起会执行actor。然后按照如下顺序确定邮箱类型:

如果actor的部署配置部分包含一个mailbox关键字,那么这个mailbox关键字就指定了要使用的邮箱类型;如果actor的Props包含mailbox选择—即调用了withMailbox方法—那么这个方法指定要使用的邮箱类型;如果分发器的配置部分包含一个mailbox-type关键字,那么这部分也将被用于配置邮箱类型;如果actor需要上面描述的邮箱类型,那么这个需求的映射将被用于确定邮箱类型;如果失败了,那么分发器的需求-如果存在-将被会尝试;如果分发器需要后面描述的邮箱类型,那么这个需求的映射将被用于确定邮箱类型;将使用默认的邮箱akka.actor.default-mailbox。

默认邮箱

当按照上述描述的依然没有指定邮箱。那么就会使用默认的邮箱。默认邮箱his一个无界邮箱,是由java.util.concurrent.ConcurrentLinkedQueue支持的。

SingleConsumerOnlyUnboundedMailbox是更高效的邮箱,它可被用于默认邮箱,但是它不能被用于BalancingDispatcher。

将SingleConsumerOnlyUnboundedMailbox配置为默认邮箱:

akka.actor.default-mailbox {mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"}

那些配置会传给Mailbox类型

每一个邮箱类型都继承自MailboxType,它的构造函数有两个参数:ActorSystem.Settings对象和Config对象。后面这个是通过actor系统的配置获取,用邮箱类型的配置路径覆盖它的id关键字,并添加一个默认邮箱配置的回调。

内置的Mailbox实现

Akka自带了很多邮箱实现:

UnboundedMailbox(默认) 默认邮箱由java.util.concurrent.ConcurrentLinkedQueue支持阻塞: No有界: No配置名: "unbounded"或"akka.dispatch.UnboundedMailbox" SingleConsumerOnlyUnboundedMailbox

这个队列可能会或可能不会比默认邮箱更快,取决于你的使用场景—请确保进行过适当的基准测试!

由多生产者-单消费者队列支持,不能用于BalancingDispatcher阻塞: No有界: No配置名:"akka.dispatch.SingleConsumerOnlyUnboundedMailbox" NonBlockingBoundedMailbox 由非常高效的多生产者-单消费者队列支持阻塞: No (将溢出消息丢弃到死信)有界: Yes配置名:"akka.dispatch.NonBlockingBoundedMailbox" UnboundedControlAwareMailbox 优先派送akka.dispatch.ControlMessage消息由两个java.util.concurrent.ConcurrentLinkedQueue支持阻塞: No有界: No配置名:"akka.dispatch.UnboundedControlAwareMailbox" UnboundedPriorityMailbox 由java.util.concurrent.PriorityBlockingQueue支持相同优先级的消息的派送顺序未定义-与UnboundedStablePriorityMailbox相反阻塞: No有界: No配置名:"akka.dispatch.UnboundedPriorityMailbox" UnboundedStablePriorityMailbox 由包装到akka.util.PriorityQueueStabilizer的java.util.concurrent.PriorityBlockingQueue支持相同优先级的消息保证按照FIFO顺序派送- contrast with the UnboundedPriorityMailbox阻塞: No有界: No配置名:"akka.dispatch.UnboundedStablePriorityMailbox"

其它的有界邮箱实现如果达到最大容量,并且配置了non-zero mailbox-push-timeout-time,会阻塞发送者。

注意

下面的邮箱只应该用于mailbox-push-timeout-time为0的情况。

BoundedMailbox 由java.util.concurrent.LinkedBlockingQueue支持阻塞:如果使用non-zero mailbox-push-timeout-time为Yes,否则为No有界: Yes配置名:"bounded"或"akka.dispatch.BoundedMailbox" BoundedPriorityMailbox 由包装到akka.util.BoundedBlockingQueue中java.util.PriorityQueue支持相同优先级的消息的派送顺序未定义-与BoundedStablePriorityMailbox相反阻塞:如果使用了non-zero mailbox-push-timeout-time则为Yes,否则为No有界: Yes配置名:"akka.dispatch.BoundedPriorityMailbox" BoundedStablePriorityMailbox 由包装在akka.util.PriorityQueueStabilizer和akka.util.BoundedBlockingQueue中的java.util.PriorityQueue支持相同优先级的消息的派送顺序为FIFO-与BoundedPriorityMailbox相反阻塞: Yes如果使用了non-zero mailbox-push-timeout-time则为Yes,否则为No有界: Yes配置名:"akka.dispatch.BoundedStablePriorityMailbox" BoundedControlAwareMailbox 优先派送akka.dispatch.ControlMessage消息由两个java.util.concurrent.ConcurrentLinkedQueue支持,如果达到最大容量,则阻塞排队阻塞: Yes如果使用了non-zero mailbox-push-timeout-time则为Yes,否则为No有界: Yes配置名:"akka.dispatch.BoundedControlAwareMailbox"
 
邮箱配置示例

如果创建PriorityMailbox:

importcom.typesafe.config.Config;

importakka.actor.ActorSystem;

importakka.actor.PoisonPill;

importakka.dispatch.PriorityGenerator;

importakka.dispatch.UnboundedPriorityMailbox;

 

publicclassMyPrioMailboxextendsUnboundedPriorityMailbox {

// 用于反射实例化

publicMyPrioMailbox(ActorSystem.Settingssettings, Configconfig) {

//创建一个新的PriorityGenerator,低优先级意味着更重要

super(newPriorityGenerator() {

@Override

publicintgen(Objectmessage) {

if(message.equals("highpriority"))

return0;// 如果可能的话,高优先级的消息应该被优先处理

elseif(message.equals("lowpriority"))

return2;// 如果低优先级的消息应该被最后处理

elseif(message.equals(PoisonPill.getInstance()))

return3;// 当没有剩余时,则为处理PoisonPill

else

return1;// 默认位于高优先级和低优先级

}

});

}

}

然后把它添加到配置中:

prio-dispatcher {

mailbox-type ="docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"

//Other dispatcher configuration goes here

}

下面是使用这个邮箱的例子:

importcom.typesafe.config.Config;

importcom.typesafe.config.ConfigFactory;

 

importakka.actor.ActorRef;

importakka.actor.ActorSystem;

importakka.actor.PoisonPill;

importakka.actor.Props;

importakka.actor.UntypedActor;

importakka.event.Logging;

importakka.event.LoggingAdapter;

 

publicclassDemoextendsUntypedActor {

LoggingAdapterlog= Logging.getLogger(getContext().system(),this);

{

for(Objectmsg:newObject[] {"lowpriority","lowpriority","highpriority","pigdog","pigdog2","pigdog3",

"highpriority", PoisonPill.getInstance() }) {

getSelf().tell(msg, getSelf());

}

}

 

publicvoidonReceive(Objectmessage) {

log.info(message.toString());

}

 

publicstaticvoidmain(String[]args) {

Configconfig= ConfigFactory.parseString("akka.loglevel = DEBUG n"+"akka.actor.debug.lifecycle = on");

// We create a new Actor that just prints out what it processes

ActorSystemsystem= ActorSystem.create("mailbox");

ActorRefmyActor=system.actorOf(Props.create(Demo.class).withDispatcher("prio-dispatcher"),"demo");

system.terminate();

}

}

运行输出:

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] highpriority

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] highpriority

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog2

[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog3

[INFO] [12/24/2016 23:38:22.365] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] lowpriority

[INFO] [12/24/2016 23:38:22.365] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] lowpriority

也可以直接配置邮箱类型,就像这样:

prio-mailbox {

mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"

//Other mailbox configuration goes here

}

 

akka.actor.deployment {

/priomailboxactor {

mailbox = prio-mailbox

}

}

然后就可以使用来自部署的邮箱类型:

ActorRef myActor =

system.actorOf(Props.create(MyUntypedActor.class),

"priomailboxactor");

或这样:

ActorRef myActor =

system.actorOf(Props.create(MyUntypedActor.class)

.withMailbox("prio-mailbox"));

创建自己的邮箱类型

一个值得上千次吹嘘的例子:

importakka.actor.ActorRef;

importakka.actor.ActorSystem;

importakka.dispatch.Envelope;

importakka.dispatch.MailboxType;

importakka.dispatch.MessageQueue;

importakka.dispatch.ProducesMessageQueue;

importcom.typesafe.config.Config;

importjava.util.concurrent.ConcurrentLinkedQueue;

importjava.util.Queue;

importscala.Option;

 

publicclassMyUnboundedJMailboximplementsMailboxType, ProducesMessageQueue<MyUnboundedJMailbox.MyMessageQueue> {

 

// This is the MessageQueue implementation

publicstaticclassMyMessageQueueimplementsMessageQueue, MyUnboundedJMessageQueueSemantics {

privatefinalQueue<Envelope>queue=newConcurrentLinkedQueue<Envelope>();

 

// these must be implemented; queue used as example

publicvoidenqueue(ActorRefreceiver, Envelopehandle) {

queue.offer(handle);

}

 

publicEnvelope dequeue() {

returnqueue.poll();

}

 

publicintnumberOfMessages() {

returnqueue.size();

}

 

publicbooleanhasMessages() {

return!queue.isEmpty();

}

 

publicvoidcleanUp(ActorRefowner, MessageQueuedeadLetters) {

for(Envelopehandle:queue) {

deadLetters.enqueue(owner,handle);

}

}

}

 

// This constructor signature must exist, it will be called by Akka

publicMyUnboundedJMailbox(ActorSystem.Settingssettings, Configconfig) {

// put your initialization code here

}

 

// The create method is called to create the MessageQueue

publicMessageQueue create(Option<ActorRef>owner, Option<ActorSystem>system) {

returnnewMyMessageQueue();

}

}

//Marker interface used for mailbox requirements mapping

publicinterfaceMyUnboundedJMessageQueueSemantics {

}

然后只需要将分发器配置或mailbox配置的"mailbox-type"的值指定为你的MailboxType的全限定名。

注意

确保包含一个参数为akka.actor.ActorSystem.Settings和com.typesafe.config.ConfigMake的构造函数,因为这个构造函数会被反射调用,以便构建你的邮箱类型。传入的第二参数config是来自于配置中描述使用了这个邮箱类型的分发器和邮箱设置的那部分。对于每一个分发器和邮箱,邮箱类型只实例化一次。

你也可以使用邮箱类型作为分发器的必要条件,就像这样:

custom-dispatcher {

mailbox-requirement =

"docs.dispatcher.MyUnboundedJMessageQueueSemantics"

}

 

akka.actor.mailbox.requirements {

"docs.dispatcher.MyUnboundedJMessageQueueSemantics" =

custom-dispatcher-mailbox

}

 

custom-dispatcher-mailbox {

mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"

}

或者在你的actor上定义必备条件,就像这样:

importakka.actor.UntypedActor;

importakka.dispatch.RequiresMessageQueue;

 

publicclassMySpecialActorextendsUntypedActorimplementsRequiresMessageQueue<MyUnboundedJMessageQueueSemantics>{

@Override

publicvoidonReceive(Objectarg0)throwsException {

//TODOAuto-generated method stub

}

}

system.actorOf的特殊语义

为了让system.actorOf既同步又是非阻塞的,同时保持返回类型ActorRef(和返回的引用功能齐全的语义),这种情况下需要特殊处理。在这些场景背后,构造了虚拟的actor引用,这些引用被发送到系统的守护actor,守护actor实际创建actor和它的上下文,并将它们放入引用内部。Until that has happened,发送到ActorRef的消息将被本地排队,一旦交换实际的填写,它们将被转移到真正的邮箱。因此,

finalProps props = ...

//这个actor使用MyCustomMailbox,假设它是单例

system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);

assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));

可能会失败;你将不得不允许一些时间来匆忙地传入检查并重试。TestKit.awaitCond。

 

  • 相关专题

免责声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,也不承认相关法律责任。如果您发现本社区中有涉嫌抄袭的内容,请发送邮件至:operations@xinnet.com进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。

相关文章

免费咨询获取折扣