Akka练习


Actor并发模型

Java中的并发开发
Java的并发编程是基于 共享数据 和 加锁 的一种机制。锁的是共享数据
synchronized

Scala中的并发开发
不共享数据。依赖于 消息传递 的一种并发编程模式

如果 Actor A 和 Actor B要相互沟通
1、A要给B传递一个消息,B有一个收件箱,B轮询自己的收件箱
2、如果B看到A的消息,解析A的消息并执行相应操作
3、有可能会回复 A 消息

Actor示例

package day6

import akka.actor.{Actor, ActorSystem, Props}

/**
  * Actor示例
  */
class HelloActor extends Actor{
  override def receive: Receive = {
    case "Hello" => println("Hello Receive")
    case _ => println("aaa")
  }
}

object Demo1 extends App {
  //新建一个ActorSystem
  val system = ActorSystem("HelloSystem")

  //构造函数
  val helloActor = system.actorOf(Props[HelloActor],"helloactor")

  //发消息
  helloActor ! "Hello"
  helloActor ! "Hello2234"
}

建立两个Actor 相互传递消息

package day6

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

/**
  * 建立两个Actor 相互传递消息
  *
  * 定义消息:样本类、区分 消息的不同
  */
//消息的定义
case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage

class Ping(pong : ActorRef) extends Actor{

  var count = 0
  def incrementAndPing {
    count += 1;
    println("Ping")
  }

  override def receive: Receive = {
    case StartMessage =>
      incrementAndPing
      pong ! PingMessage
    case PongMessage =>
      if(count > 9){
        sender() ! StopMessage
      }else {
        incrementAndPing
        pong ! PingMessage
      }
  }
}

class Pong extends Actor{
  override def receive: Receive = {
    case PingMessage =>
    println("pong")
    //给ping回复消息
    sender ! PongMessage

    case StopMessage =>
      println("pong Stop")
      context.stop(self)
      //context.system.finalize()

  }
}

object Demo2 extends App{
  val system = ActorSystem("PingPongSystem")
  val pong =  system.actorOf(Props[Pong],name="pong")
  val ping = system.actorOf(Props(new Ping(pong)),name="ping")

  ping ! StartMessage
}

AKKA 负责来回传递消息

Scala项目

实现一个主从管理系统

NewAkkaSystem

Worker类
Master类

ActorMessage类
WorkerInfo类

ActorMessage 类:定义消息 5种消息

WorkerInfo.scala

package akka

/**
  * 保存worker的基本信息
  */
class WorkerInfo(val id : String, val workerHost : String, val memory : String, val cores : String) {

  //保存心跳信息
  var lastHeartBeat : Long = System.currentTimeMillis()

  override def toString : String = s"WorkerInfo($id, $workerHost, $memory, $cores)"
}

ActorMessage.scala

package akka

/**
  * 样本类,保存所有信息
  */
//worker ----> master注册节点
case class RegisterWorker(val id : String, val workerHost : String, val memory : String, val cores : String)

//worker ----> master 发送心跳信号
case class HeartBeat(val workerId : String)

//master ----> worker 注册完成 ACK
case class RegisteredWorker(val workerHost : String)

//master ----> master 检查超时节点
case class CheckTimeOutWorker()

//worker ----> worker 提醒自己发送心跳信号
case class SendHeartBeat()

Worker.scala

package akka

import java.util.UUID

import akka.actor._
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class Worker extends Actor {

  //Worker端持有Master端的引用(代理对象)
  //因为worker会给Master发送信息,所以才要这个对象
  var master : ActorSelection = null

  ////生成一个UUID,作为Worker的标识
  val id = UUID.randomUUID().toString

  //构造方法执行完执行一次
  override def preStart(): Unit = {

    //Worker向MasterActorSystem发送建立连接请求
    master = context.system.actorSelection("akka.tcp://MasterActorSystem@localhost:8881/user/Master")

    //Worker向Master发送注册消息
    master ! RegisterWorker(id, "localhost", "10240", "8")
  }

  //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  override def receive : Receive = {
    //Master向Worker的反馈信息
    case RegisteredWorker(masterURL) => {
      //启动定时任务,向Master发送心跳
      context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
    }
    case SendHeartBeat => {
      println("worker send hearbeat")
      master ! HeartBeat(id)
    }
  }
}

object Worker extends App{
  val clientPort = 8803

  //创建ActorSystem的必要参数
  val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.port = $clientPort
       """.stripMargin

  val conf = ConfigFactory.parseString(configStr)

  //创建ActorSystem
  val actorSystem = ActorSystem("MasterActorSystem",conf)

  //启动Actor,Master会被实例化,生命周期方法会被调用
  actorSystem.actorOf(Props[Worker],"Worker")
}

Master.scala

package akka

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class Master extends Actor {

  //保存WorkerId 和 Worker信息的 map
  val idToWorker = new mutable.HashMap[String, WorkerInfo]

  //保存所有worker信息的Set
  val workers = new mutable.HashSet[WorkerInfo]

  //Worker超时时间
  val WORKER_TIMEOUT = 10 * 1000

  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //启动定时器,定时执行
    //设置在5毫秒之后,间隔10秒,给自己发一个CheckOfTimeOutWorker
    context.system.scheduler.schedule(0 millis, 5000 millis, self, CheckTimeOutWorker)
  }

  //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  override def receive: Receive = {
    //Worker向Master发送的注册消息
    case RegisterWorker(id, workerHost, memory, cores) => {
        if(!idToWorker.contains(id)){
          val worker = new WorkerInfo(id,workerHost,memory,cores)
          workers.add(worker)
          idToWorker(id) = worker
          println("nrew register worker: " + worker)
          sender ! RegisteredWorker(worker.id)
        }
      }

    //Worker向Master发送的心跳消息
    case HeartBeat(workerId) => {
      val workerInfo = idToWorker(workerId)
      println("get heartbeat message from: "+ workerInfo)
      workerInfo.lastHeartBeat = System.currentTimeMillis()
    }

    //Master自己向自己发送的定期检查超时Worker的消息
    case CheckTimeOutWorker => {
      //检查超时的worker
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter( w => currentTime - w.lastHeartBeat > WORKER_TIMEOUT).toArray

      for(worker <- toRemove){
        workers -= worker
        idToWorker.remove(worker.id)
      }
      println("Worker size: " + workers.size)
    }
  }
}

object Master extends App {
  val host = "localhost"
  val port = 8881

  //创建ActorSystem的必要参数
  val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
     """.stripMargin

  val conf = ConfigFactory.parseString(configStr)

  //创建ActorSystem
  val actorSystem = ActorSystem("MasterActorSystem",conf)

  //启动Actor Master会被实例化 生命周期的方法会被调用
  actorSystem.actorOf(Props[Master],"Master")
}

文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
Spark Core Spark Core
Spark生态圈:Spark Core : RDD(弹性分布式数据集)Spark SQLSpark StreamingSpark MLLib :协同过滤,ALS,逻辑回归等等 –> 机器学习Spark Graphx : 图计算 一、S
2019-03-29
下一篇 
Scala编程 Scala编程
一、Scala函数式编程多范式:面向对象,函数式编程(程序实现起来简单) 举例:WordCountsc 是 SparkContext , 非常重要 一行: var result = sc.textFile("hdfs://xxxx
2019-03-25
  目录