Akka 是⼀个基于 Actor 模型的并发框架,由 Scala 语⾔实现。它为构建基于 JVM 的⾼并发、分布式、容错性强、事件驱动的应⽤程序提供了⽀持。在⼤数据处理框架如 Spark 和 Flink 中,Akka 被采⽤来实现进程与节点之间的通信。
为了深⼊理解 Akka 和其并发特点,我们可以从 Actor 的核⼼模型图开始探索。
在所展⽰的模型图中,每个 Actor 表⽰⼀个可调度的轻量级执⾏单元。如图展⽰,当 Actor A 和Actor C 向 Actor B 发送消息时,这些消息会被串⾏地放⼊ Actor B 的 Mailbox 中。随后,Akka 框架的底层调度机制会激活 Actor B,使其接收并处理这些消息。这种基于 Actor 模型的并发框架确保了消息在各个 Actor 间安全、有序地传递,同时也确保了每个 Actor 以串⾏的⽅式处理其收到的所有消息。
采⽤ Akka 框架基于 Actor 模型,开发者⽆需过多地关注底层的并发同步机制,⽽只需专注于定义每个 Actor 的业务逻辑:它需要处理哪些消息以及应当向哪些 Actor 发送何种消息。由于 Actor 模型内的消息传递机制保证了消息在各 Actor 间串⾏处理,这天然地规避了并发情境下的数据⼀致性问题。这意味着在并发单元之间需要频繁交互信息的应⽤场景中,Akka 框架在性能上具有显著优势。更为重要的是,Actor 模型的轻量性能够⽀撑极⼤规模的并发,并能够均匀分布到每个 CPU 核⼼,最⼤化地利⽤硬件资源,从⽽进⼀步提⾼应⽤性能。
Akka 提供了⼀个基于 Actor 模型的并发框架,运⾏在 JVM 上。其⽬标是为开发者打造⼀个具备⾼伸缩性和弹性的响应式并发应⽤平台。
ActorSystem 可以看做是 Actor 的系统⼯⼚或管理者。主要有以下功能:管理调度服务、配置相关参数、⽇志功能
Akka 官⽹展⽰的 Actor 层次结构⽰意图
Akka 有在系统中初始化三个 Actor:
/:被称为根监护⼈。它是所有 Actor 的顶级⽗节点,在整个系统终⽌时,它是最后被停⽌的。
/user:这是⽤⼾⾃定义 Actor 的⽗节点。不要误以为“user”与最终⽤⼾直接相关,实际上它只是路径的⼀部分。当你使⽤ Akka 创建 Actor 时,其路径将始终以 /user/ 开头。
/system系统监护⼈:创建 Actor 时,通常使⽤ system.actorOf() ⽅法,这会在 /user 路径下⽣成⼀个Actor。尽管这只是⽤⼾定义的层级中的顶级 Actor,但我们通常称其为顶级 Actor。
在 Akka 中,每个 Actor 都有⼀个⽗ Actor。要在现有 Actor 树中添加⼀个新的 Actor,可以通过context.actorOf() ⽅法实现。这样,新 Actor 的创建者即成为其⽗ Actor。
在使⽤ system.actorOf() 创建 Actor 时,其实返回的是⼀个 ActorRef 对象。
ActorRef 可以看做是 Actor 的引⽤,是⼀个 Actor 的不可变,可序列化的句柄(handle),它可能不在本地或同⼀个 ActorSystem 中,它是实现⽹络空间位置透明性的关键设计。
ActorRef 最重要功能是⽀持向它所代表的 Actor 发送消息:
ref ! message
public class ServerApp {
public static void main(String[] args) {
// Map<String, Object> map=new HashMap<>();
// map.put("akka.actor.provider", "remote");
// map.put("akka.remote.artery.enabled", true); // 使用 Artery
// map.put("akka.remote.artery.transport", "tcp"); // 可以选择 "tcp" 或 "aeron-udp"
// map.put("akka.remote.artery.canonical.hostname", "127.0.0.1");
// map.put("akka.remote.artery.canonical.port", 2551);
//
Map<String, Object> map=new HashMap<>();
map.put("akka.actor.provider","remote");
map.put("akka.remote.transport","akka.remote.netty.NettyRemoteTransport");
map.put("akka.remote.netty.tcp.hostname","127.0.0.1");
map.put("akka.remote.netty.tcp.port",2551);
Config config = ConfigFactory.parseMap(map).withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ServerSystem", config);
ActorRef serverActorRef = system.actorOf(Props.create(ServerActor.class), "ServerActor");
System.out.println("ServerActor started at: " + serverActorRef.path().toString());
}
}
public class ClientApp {
public static void main(String[] args) {
Config config = ConfigFactory.parseString(
"akka.actor.provider = remote\n" +
"akka.remote.transport = akka.remote.netty.NettyRemoteTransport\n" +
"akka.remote.netty.tcp.hostname = 127.0.0.1\n" +
"akka.remote.netty.tcp.port = 2550\n"
).withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClientSystem", config);
ActorRef clientActorRef = system.actorOf(Props.create(ClientActor.class), "clientActor");
System.out.println("clientActor started at: " + clientActorRef.path().toString());
String remoteHost="127.0.0.1";
int port= 2551;
String serverActorPath = "akka.tcp://ServerSystem@"+remoteHost+":"+port+"/user/ServerActor";
ActorRef serverRef = system.actorSelection(serverActorPath)
.resolveOne(Duration.ofSeconds(3)).toCompletableFuture().join();
serverRef.tell("hello", clientActorRef);
}
}
public class ServerActor extends AbstractActor {
int count = 0;
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("hello", message -> {
System.out.print("received hello from client, count:"+ count++ +" message:"+ message);
getSender().tell("Hello from Acoter Server!",getSelf());
count++;
})
.match(String.class, message -> {
System.out.println("Server received: " + message);
getSender().tell("Server response, count:"+ count +" message:"+ message , getSelf());
count++;
})
.build();
}
}
标签: 开发日记