我的编程空间,编程开发者的网络收藏夹
学习永远不晚

大数据中Spark任务和集群启动流程是什么样的

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

大数据中Spark任务和集群启动流程是什么样的

这篇文章将为大家详细讲解有关大数据中Spark任务和集群启动流程是什么样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

大数据分享Spark任务和集群启动流程

大数据分享Spark任务和集群启动流程,Spark集群启动流程

调用start-all.sh脚本,开始启动Master

Master启动以后,preStart方法调用了一个定时器,定时检查超时的Worker后删除

启动脚本会解析slaves配置文件,找到启动Worker的相应节点.开始启动Worker

Worker服务启动后开始调用preStart方法开始向所有的Master进行注册

Master接收到Worker发送过来的注册信息,Master开始保存注册信息并把自己的URL响应给Worker

Worker接收到Master的URL后并更新,开始调用一个定时器,定时的向Master发送心跳信息

任务提交流程

Driver端会通过spark-submit脚本启动SaparkSubmit进程,此时创建了一个非常重要的对象(SparkContext),开始向Master发送消息

Master接收到发送过来的信息后开始生成任务信息,并把任务信息放到一个对列里

Master把所有有效的Worker过滤出来,按照空闲的资源进行排序

Master开始向有效的Worker通知拿取任务信息并启动相应的Executor

Worker启动Executor并向Driver反向注册

Driver开始把生成的task发送给相应的Executor,Executor开始执行任务

集群启动流程

首先创建Master类

import akka.actor.{Actor, ActorSystem, Props}

import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.mutable

import scala.concurrent.duration._

class Master(val masterHost: String, val masterPort: Int) extends Actor{

// 用来存储Worker的注册信息

val idToWorker = new mutable.HashMap[String, WorkerInfo]()

// 用来存储Worker的信息

val workers = new mutable.HashSet[WorkerInfo]()

// Worker的超时时间间隔

val checkInterval: Long = 15000

// 生命周期方法,在构造器之后,receive方法之前只调用一次

override def preStart(): Unit = {

// 启动一个定时器,用来定时检查超时的Worker

import context.dispatcher

context.system.scheduler.schedule(0 millis, checkInterval millis, self, CheckTimeOutWorker)

}

// 在preStart方法之后,不断的重复调用

override def receive: Receive = {

// Worker -> Master

case RegisterWorker(id, host, port, memory, cores) => {

if (!idToWorker.contains(id)){

val workerInfo = new WorkerInfo(id, host, port, memory, cores)

idToWorker += (id -> workerInfo)

workers += workerInfo

println("a worker registered")

sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" +

s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}")

}

}

case HeartBeat(workerId) => {

// 通过传过来的workerId获取对应的WorkerInfo

val workerInfo: WorkerInfo = idToWorker(workerId)

// 获取当前时间

val currentTime = System.currentTimeMillis()

// 更新最后一次心跳时间

workerInfo.lastHeartbeatTime = currentTime

}

case CheckTimeOutWorker => {

val currentTime = System.currentTimeMillis()

val toRemove: mutable.HashSet[WorkerInfo] =

workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval)

// 将超时的Worker从idToWorker和workers中移除

toRemove.foreach(deadWorker => {

idToWorker -= deadWorker.id

workers -= deadWorker

})

println(s"num of workers: ${workers.size}")

}

}

}

object Master{

val MASTER_SYSTEM = "MasterSystem"

val MASTER_ACTOR = "Master"

def main(args: Array[String]): Unit = {

val host = args(0)

val port = args(1).toInt

val configStr =

s"""

|akka.actor.provider = "akka.remote.RemoteActorRefProvider"

|akka.remote.netty.tcp.hostname = "$host"

|akka.remote.netty.tcp.port = "$port"

""".stripMargin

// 配置创建Actor需要的配置信息

val config: Config = ConfigFactory.parseString(configStr)

// 创建ActorSystem

val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config)

// 用actorSystem实例创建Actor

actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR)

actorSystem.awaitTermination()

}

}

创建RemoteMsg特质

trait RemoteMsg extends Serializable{

}

// Master -> self(Master)

case object CheckTimeOutWorker

// Worker -> Master

case class RegisterWorker(id: String, host: String,

port: Int, memory: Int, cores: Int) extends RemoteMsg

// Master -> Worker

case class RegisteredWorker(masterUrl: String) extends RemoteMsg

// Worker -> self

case object SendHeartBeat

// Worker -> Master(HeartBeat)

case class HeartBeat(workerId: String) extends RemoteMsg

创建Worker类

import java.util.UUID

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}

import com.typesafe.config.{Config, ConfigFactory}

import scala.concurrent.duration._

class Worker(val host: String, val port: Int, val masterHost: String,

val masterPort: Int, val memory: Int, val cores: Int) extends Actor{

// 生成一个Worker ID

val workerId = UUID.randomUUID().toString

// 用来存储MasterURL

var masterUrl: String = _

// 心跳时间间隔

val heartBeat_interval: Long = 10000

// master的Actor

var master: ActorSelection = _

override def preStart(){

// 获取Master的Actor

master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" +

s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}")

master ! RegisterWorker(workerId, host, port, memory, cores)

}

override def receive: Receive = {

// Worker接收到Master发送过来的注册成功的信息(masterUrl)

case RegisteredWorker(masterUrl) => {

this.masterUrl = masterUrl

// 启动一个定时器,定时给Master发送心跳

import context.dispatcher

context.system.scheduler.schedule(0 millis, heartBeat_interval millis, self, SendHeartBeat)

}

case SendHeartBeat => {

// 向Master发送心跳

master ! HeartBeat(workerId)

}

}

}

object Worker{

val WORKER_SYSTEM = "WorkerSystem"

val WORKER_ACTOR = "Worker"

def main(args: Array[String]): Unit = {

val host = args(0)

val port = args(1).toInt

val masterHost = args(2)

val masterPort = args(3).toInt

val memory = args(4).toInt

val cores = args(5).toInt

val configStr =

s"""

|akka.actor.provider = "akka.remote.RemoteActorRefProvider"

|akka.remote.netty.tcp.hostname = "$host"

|akka.remote.netty.tcp.port = "$port"

""".stripMargin

// 配置创建Actor需要的配置信息

val config: Config = ConfigFactory.parseString(configStr)

// 创建ActorSystem

val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config)

// 用actorSystem实例创建Actor

val worker: ActorRef = actorSystem.actorOf(

Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR)

actorSystem.awaitTermination()

}

}

创建初始化类

class WorkerInfo(val id: String, val host: String, val port: Int,

val memory: Int, val cores: Int) {

// 初始化最后一次心跳的时间

var lastHeartbeatTime: Long = _

}

本地测试需要传入参数:

 大数据中Spark任务和集群启动流程是什么样的

关于大数据中Spark任务和集群启动流程是什么样的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

大数据中Spark任务和集群启动流程是什么样的

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

大数据中Spark任务和集群启动流程是什么样的

这篇文章将为大家详细讲解有关大数据中Spark任务和集群启动流程是什么样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。大数据分享Spark任务和集群启动流程大数据分享Spark任务和集群启
2023-06-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录