Spark内核解析1


概述

Spark 内核泛指 Spark 的核心运行机制,包括 Spark 核心组件的运行机制、Spark 任务调度机制、Spark 内存管理机制、Spark 核心功能的运行原理。

核心组件

Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责:

  1. 将用户程序转化为任务(job);

  2. 在 Executor 之间调度任务(task);

  3. 跟踪 Executor 的执行情况;

  4. 通过 UI 展示查询运行情况;

Executor

Spark Executor 节点是一个 JVM 进程,负责在 Spark 作业中运行具体任务,任务彼此之间相互独立。Spark 应用启动时, Executor 节点被同时启动, 并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行, 会将出错节点上的任务调度到其他 Executor 节点上继续运行。

Executor 有两个核心功能:

  1. 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;

  2. 它们通过自身的块管理器( Block Manager)为用户程序中要求缓存的 RDD提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

运行流程

Spark 通用运行流程, 不论 Spark 以何种模式进行部署, 任务提交后, 都会先启动 Driver 进程,随后 Driver 进程向集群管理器注册应用程序,之后集群管理器根据此任务的配置文件分配 Executor 并启动,当 Driver 所需的资源全部满足后,

Driver 开始执行 main 函数, Spark 查询为懒执行, 当执行到 action 算子时开始反向推算,根据宽依赖进行 stage 的划分,随后每一个 stage 对应一个 taskset,taskset 中有多个 task,根据本地化原则, task 会被分发到指定的 Executor 去执行,在任务执行的过程中, Executor 也会不断与 Driver 进行通信,报告任务运行情况。

部署模式

Standalone

Standalone 集群有四个重要组成部分, 分别是:

Driver: 是一个进程,我们编写的 Spark 应用程序就运行在 Driver 上, 由Driver 进程执行;

Master:是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;

Worker:是一个进程,一个 Worker 运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储 RDD 的某个或某些 partition;另一个是启动其他进程和线程(Executor) ,对 RDD 上的 partition 进行并行的处理和计算。

Executor:是一个进程, 一个 Worker 上可以运行多个 Executor, Executor 通过启动多个线程( task)来执行对 RDD 的 partition 进行并行计算,也就是执行我们对 RDD 定义的例如 map、flatMap、reduce 等算子操作。

Standalone Client

Standalone Client 模式下,Driver 在任务提交的本地机器上运行,Driver 启动后向 Master 注册应用程序,Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。

Standalone Cluster

Standalone Cluster 模式下,任务提交后,Master 会找到一个 Worker 启动 Driver进程, Driver 启动后向 Master 注册应用程序, Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。

注意, Standalone 的两种模式下( client/Cluster) , Master 在接到 Driver 注册

Spark 应用程序的请求后,会获取其所管理的剩余资源能够启动一个 Executor 的所有 Worker, 然后在这些 Worker 之间分发 Executor, 此时的分发只考虑 Worker 上的资源是否足够使用,直到当前应用程序所需的所有 Executor 都分配完毕, Executor 反向注册完毕后,Driver 开始执行 main 程序。

YARN Client

在 YARN Client 模式下,Driver 在任务提交的本地机器上运行,Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster, 随后 ResourceManager 分配 container , 在 合 适 的 NodeManager 上启动 ApplicationMaster ,此时的

ApplicationMaster 的功能相当于一个 ExecutorLaucher, 只负责向 ResourceManager 申请 Executor 内存。

ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程, Executor 进程启动后会向 Driver 反向注册, Executor 全部注册完成后 Driver 开始执行 main 函数,之后执行到 Action 算子时,触发一个 job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。

YARN Cluster

在 YARN Cluster 模式下, 任务提交后会和 ResourceManager 通讯申请启动ApplicationMaster, 随后 ResourceManager 分配 container,在合适的 NodeManager上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver。

Driver 启动后向 ResourceManager 申请 Executor 内存, ResourceManager 接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程,Executor 进程启动后会向 Driver 反向注册, Executor 全部注册完成后 Driver 开始执行 main 函数,之后执行到 Action 算子时,触发一个 job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个Executor 上执行。

通讯架构

Spark2.x 版本使用 Netty 通讯框架作为内部通讯组件。spark 基于 netty 新的 rpc框架借鉴了 Akka 的中的设计, 它是基于Actor 模型。

Scala里面处理通信采用Actor架构,Actor架构其实就是一个邮局模型, AKKA为给予Actor模型的工程实现。Akka不同版本之间无法通信,存在兼容性问题。用户使用Akka与Spark中的Akka存在冲突。Spark对Akka没有自身维护,需要新功能时只能等待新版本,比较牵制Spark发展。因此在Spark2中已经抛弃了Akka。

Spark早期版本中采用Akka作为内部通信部件。
Spark1.3中引入Netty通信框架,为了解决Shuffle的大数据传输问题使用
Spark1.6中Akka和Netty可以配置使用。Netty完全实现了Akka在Spark中的功能。
Spark2系列中,Spark抛弃Akka,使用Netty。

Spark 通讯框架中各个组件( Client/Master/Worker)可以认为是一个个独立的实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:

Endpoint( Client/Master/Worker)有 1 个 InBox 和 N 个 OutBox( N>=1,N 取决于当前 Endpoint 与多少其他的 Endpoint 进行通信, 一个与其通讯的其他 Endpoint 对应一个 OutBox), Endpoint 接收到的消息被写入 InBox, 发送出去的消息写入OutBox 并被发送到其他 Endpoint 的 InBox 中。

  1. RpcEndpoint:RPC 端点,Spark 针对每个节点( Client/Master/Worker)都称之为一个 Rpc 端点,且都实现 RpcEndpoint 接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用 Dispatcher;

  2. RpcEnv: RPC 上下文环境, 每个 RPC 端点运行时依赖的上下文环境称为RpcEnv;

  3. Dispatcher:消息分发器,针对于 RPC 端点需要发送消息或者从远程 RPC 接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;

  4. Inbox:指令消息收件箱,一个本地 RpcEndpoint 对应一个收件箱,Dispatcher 在每次向 Inbox 存入消息时, 都将对应 EndpointData 加入内部 ReceiverQueue 中, 另外 Dispatcher 创建时会启动一个单独线程进行轮询 ReceiverQueue,进行收件箱消息消费;

  5. RpcEndpointRef:RpcEndpointRef 是对远程 RpcEndpoint 的一个引用。当我们需要向一个具体的 RpcEndpoint 发送消息时,一般我们需要获取到该 RpcEndpoint 的引用,然后通过该应用发送消息。

  6. OutBox : 指令消息发件箱 , 对于当前 RpcEndpoint 来说 , 一个目标RpcEndpoint 对应一个发件箱, 如果向多个目标 RpcEndpoint 发送信息, 则有多个OutBox。当消息放入 Outbox 后,紧接着通过 TransportClient 将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

  7. RpcAddress: 表示远程的 RpcEndpointRef 的地址, Host + Port。

  8. TransportClient:Netty 通信客户端,一个 OutBox 对应一个 TransportClient,TransportClient 不断轮询 OutBox,根据 OutBox 消息的 receiver 信息,请求对应的远程 TransportServer;

  9. TransportServer : Netty通信服务端 , 一 个 RpcEndpoint 对应一个TransportServer,接受远程消息后调用Dispatcher 分发消息至对应收发件箱;

RpcEndPoint就代表一个通信端点, 一个端点就有一个inbox, 一个 transportServer 一个 Dispatcher, 根据你通信的其他端点的数目,就有多个Outbox, 一个outbox有一个 transportClient, transportClient主要负责和 transportServer来通信。

在我们的传统认知中,多个端点要通信,中间要有一个节点类似于总的路由,节点之间的通信靠中间的“路由”,而 Spark没有中间的这个“路由”,如果中间的“路由”存在一定会存在瓶颈问题。Spark很巧妙的把中间的“路由”拆分到各个节点上。

高层视图

private[spark] trait RpcEndpoint {

  /**
   * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
   */
  val rpcEnv: RpcEnv
   ....
}

 /**
   * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
   * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
   */
  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

 /**
   * Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
   * [[SparkException]] will be thrown and sent to `onError`.
   */
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }

  /**
   * Invoked before [[RpcEndpoint]] starts to handle any message.
   */
  def onStart(): Unit = {
    // By default, do nothing.
  }

RpcEndpoint 注意三个方法,

1、receive 改方法被子类实现,用于接收其他节点发送的消息。
2、receiveAndReply 该方法被子类实现,用于接收并回复其他节点发送的消息。
3、onStart 该方法被子类实现,该方法在端口启动的时候自动调用。

我们查看以下RpcEnv的实现发现实现是NettyRpcEnv

private[netty] class NettyRpcEnv(
    val conf: SparkConf,
    javaSerializerInstance: JavaSerializerInstance,
    host: String,
    securityManager: SecurityManager) extends RpcEnv(conf) with Logging {

  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  // 设置一个消息分发器
  private val dispatcher: Dispatcher = new Dispatcher(this)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = {
    if (securityManager.isAuthenticationEnabled()) {
      java.util.Arrays.asList(new SaslClientBootstrap(transportConf, "", securityManager,
        securityManager.isSaslEncryptionEnabled()))
    } else {
      java.util.Collections.emptyList[TransportClientBootstrap]
    }
  }

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  /**
   * A separate client factory for file downloads. This avoids using the same RPC handler as
   * the main RPC context, so that events caused by these clients are kept isolated from the
   * main RPC traffic.
   *
   * It also allows for different configuration of certain properties, such as the number of
   * connections per peer.
   */
  @volatile private var fileDownloadFactory: TransportClientFactory = _

  val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")

  // Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool
  // to implement non-blocking send/ask.
  // TODO: a non-blocking TransportClientFactory.createClient in future
  private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
    "netty-rpc-connection",
    conf.getInt("spark.rpc.connect.threads", 64))

  @volatile private var server: TransportServer = _

  private val stopped = new AtomicBoolean(false)

  /**
   * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
   * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
   */
    // 多个地址对应的发件箱
  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

  /**
   * Remove the address's Outbox and stop it.
   */
  private[netty] def removeOutbox(address: RpcAddress): Unit = {
    val outbox = outboxes.remove(address)
    if (outbox != null) {
      outbox.stop()
    }
  }
  // 启动TransportServer来接收远程消息
  def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

  @Nullable
  override lazy val address: RpcAddress = {
    if (server != null) RpcAddress(host, server.getPort()) else null
  }

  // 注册当前端点
  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }
.... 

我们似乎没有看到Inbox在哪里点击Dispatcher

  private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    val inbox = new Inbox(ref, endpoint)
  }

启动脚本

start-all.sh

# Start all spark daemons.
# Starts the master on this node.
# Starts a worker on each node specified in conf/slaves

if [ -z "${SPARK_HOME}" ]; then  #如果没有发现Spark环境变量
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" # 获得当前的目录把当前目录设置为SPARK_HOME
fi

# Load the Spark configuration
. "${SPARK_HOME}/sbin/spark-config.sh" #加载 spark-config.sh配置

# Start Master
"${SPARK_HOME}/sbin"/start-master.sh

# Start Workers
"${SPARK_HOME}/sbin"/start-slaves.sh

spark-config.sh

# included in all the spark scripts with source command
# should not be executable directly
# also should not be passed any arguments, since we need original $*

# symlink and absolute path should rely on SPARK_HOME to resolve
if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}" #设置 SPARK_CONF_DIR 目录
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
  export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
  export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}"
  export PYSPARK_PYTHONPATH_SET=1
fi
export JAVA_HOME=/opt/module/jdk1.8.0_162

start-master.sh

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Starts the master on the machine this script is executed on.

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.deploy.master.Master"  #调用Master

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  echo "Usage: ./sbin/start-master.sh [options]"
  pattern="Usage:"
  pattern+="\|Using Spark's default log4j profile:"
  pattern+="\|Registered signal handlers for"

  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
  exit 1
fi

ORIGINAL_ARGS="$@"

. "${SPARK_HOME}/sbin/spark-config.sh"  

. "${SPARK_HOME}/bin/load-spark-env.sh" #加载环境变量

if [ "$SPARK_MASTER_PORT" = "" ]; then # 如果没有端口 默认7077
  SPARK_MASTER_PORT=7077
fi

if [ "$SPARK_MASTER_HOST" = "" ]; then
  case `uname` in
      (SunOS)           # 如果没有设置HOST 则把/usr/sbin/check-hostname作为主机名
          SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
          ;;
      (*)
          SPARK_MASTER_HOST="`hostname -f`"
          ;;
  esac
fi

if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
  SPARK_MASTER_WEBUI_PORT=8080
fi

"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
  --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  $ORIGINAL_ARGS
  def main(argStrings: Array[String]) {
    // 1、初始化log对象
    Utils.initDaemon(log)
    // 2、加载SparkConf
    val conf = new SparkConf
    // 3、解析Master启动参数
    val args = new MasterArguments(argStrings, conf)
    // 4、启动RPC框架端点
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

start-slaves.sh

# Starts a slave instance on each machine specified in the conf/slaves file.

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" #获取当前的目录
fi

. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh" #加载配置

# Find the port number for the master
if [ "$SPARK_MASTER_PORT" = "" ]; then
  SPARK_MASTER_PORT=7077
fi

if [ "$SPARK_MASTER_HOST" = "" ]; then
  case `uname` in
      (SunOS)
          SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
          ;;
      (*)
          SPARK_MASTER_HOST="`hostname -f`"
          ;;
  esac
fi

# Launch the slaves 调用了start-slave.sh
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"

start-slave.sh

# Starts a slave on the machine this script is executed on.
#
# Environment Variables
#
#   SPARK_WORKER_INSTANCES  The number of worker instances to run on this
#                           slave.  Default is 1.
#   SPARK_WORKER_PORT       The base port number for the first worker. If set,
#                           subsequent workers will increment this number.  If
#                           unset, Spark will find a valid port number, but
#                           with no guarantee of a predictable pattern.
#   SPARK_WORKER_WEBUI_PORT The base port for the web interface of the first
#                           worker.  Subsequent workers will increment this
#                           number.  Default is 8081.

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.deploy.worker.Worker"

if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  echo "Usage: ./sbin/start-slave.sh [options] "
  pattern="Usage:"
  pattern+="\|Using Spark's default log4j profile:"
  pattern+="\|Registered signal handlers for"

  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
  exit 1
fi

. "${SPARK_HOME}/sbin/spark-config.sh"

. "${SPARK_HOME}/bin/load-spark-env.sh"

# First argument should be the master; we need to store it aside because we may
# need to insert arguments between it and the other arguments
MASTER=$1
shift

# Determine desired worker port
if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
  SPARK_WORKER_WEBUI_PORT=8081
fi

# Start up the appropriate number of workers on this machine.
# quick local function to start a worker
function start_instance {
  WORKER_NUM=$1
  shift

  if [ "$SPARK_WORKER_PORT" = "" ]; then
    PORT_FLAG=
    PORT_NUM=
  else
    PORT_FLAG="--port"
    PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 ))
  fi
  WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 ))
   #调用org.apache.spark.deploy.worker.Worker
  "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
     --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@" 
}

if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
  start_instance 1 "$@"
else
  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
    start_instance $(( 1 + $i )) "$@"
  done
fi

workerMain方法

def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }

任务提交

spark-submit

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-class

```shell
if [ -z “${SPARK_HOME}” ]; then
source “$(dirname “$0”)”/find-spark-home
fi

. “${SPARK_HOME}”/bin/load-spark-env.sh

Find the java binary

if [ -n “${JAVA_HOME}” ]; then
RUNNER=”${JAVA_HOME}/bin/java”
else
if [ “$(command -v java)” ]; then
RUNNER=”java”
else
echo “JAVA_HOME is not set” >&2
exit 1
fi
fi

Find Spark jars.

if [ -d “${SPARK_HOME}/jars” ]; then
SPARK_JARS_DIR=”${SPARK_HOME}/jars”
else
SPARK_JARS_DIR=”${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars”
fi

if [ ! -d “$SPARK_JARS_DIR” ] && [ -z “$SPARK_TESTING$SPARK_SQL_TESTING” ]; then
echo “Failed to find Spark jars directory ($SPARK_JARS_DIR).” 1>&2
echo “You need to build Spark with the target "package" before running this program.” 1>&2
exit 1
else
LAUNCH_CLASSPATH=”$SPARK_JARS_DIR/*”
fi

Add the launcher build dir to the classpath if requested.

if [ -n “$SPARK_PREPEND_CLASSES” ]; then
LAUNCH_CLASSPATH=”${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH”
fi

For tests

if [[ -n “$SPARK_TESTING” ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi

The launcher library will print arguments separated by a NULL character, to allow arguments with

characters that would be otherwise interpreted by the shell. Read that in a while loop, populating

an array that will be used to exec the final command.

The exit code of the launcher is appended to the output, so the parent shell removes it from the

command array and checks the value to see if the launcher succeeded.

build_command() {
“$RUNNER” -Xmx128m -cp “$LAUNCH_CLASSPATH” org.apache.spark.launcher.Main “$@”
printf “%d\0” $?
}

CMD=()
while IFS= read -d ‘’ -r ARG; do
CMD+=(“$ARG”)
done < <(build_command “$@”)

COUNT=$


文章作者: 清风笑丶
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 清风笑丶 !
 上一篇
Spark内核解析2 Spark内核解析2
Master启动时首先创一个RpcEnv对象,负责管理所有通信逻辑Master通过RpcEnv对象创建一个Endpoint,Master就是一个Endpoint,Worker可以与其进行通信Worker启动时也是创一个RpcEnv对象Wo
2019-06-09
下一篇 
Spark之GraphX Spark之GraphX
GraphX 是 Spark 图表和图形并行计算的新组件。GraphX 延伸 Spark RDD 通过引入新的图形的抽象:计算与连接到每个顶点和边缘性的向量。以支持图形计算,GraphX 公开了一组基本的操作符(例如 subgraph,
2019-06-08
  目录