默认情况下 spark程序的分配都是靠参数设置固定的Executor数量进行资源预分配的,如果用户op在yarn的资源队列里可以申请到200个资源,那它就算跑占用资源很少的程序也能申请到200个核,这是不合理的。
Spark在yarn集群上运行的时候,一方面默认通过num-executors参数设置固定的Executor数量,每个application会独占所有预分配的资源直到整个生命周期的结束。Spark1.2后开始引入动态资源分配(Dynamic Resource Allocation)机制,支持资源弹性分配。
对于已知的业务负载,使用固定的集群资源配置是相对容易的;对于未知的业务负载,使用动态的集群资源分配方式可以满足负载的动态变化,这样集群的资源利用和业务负载的处理效率都会更加灵活。
动态资源分配测试在Spark1.2仅支持Yarn模式,从Spark1.6开始,支持standalone、Yarn、Mesos. 这个特性默认是禁用的。
简单来说,就是基于负载来动态调节Spark应用的资源占用,你的应用会在资源空闲的时候将其释放给集群,而后续用到的时候再重新申请。
其实没有一个固定的方法可以预测一个executor后续是否马上会被分配去执行任务,或者一个新分配的执行器实际上是空闲的,所以我们需要一些试探性的方法,来决定是否申请或移除一个执行器。策略分为请求策略与移除策略:
开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)时间的时候,会开始动态资源分配;之后会每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。
之所以采用指数增长,出于两方面考虑:其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了如果application需要很多资源,而该方式可以在很少次数的申请之后得到满足。
(这段指数增长的策略可以根据实际情况通过修改源码来修改)
当application的executor空闲时间超过 spark.dynamicAllocation.executorIdleTimeout(默认60s)后,就会被回收。
首先需要对YARN进行配置,使其支持Spark的Shuffle Service , 修改每台集群上的yarn-site.xml
修改如下
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
增加如下
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>
将$SPARK_HOME/yarn/spark-*-yarn-shuffle.jar
拷贝到每台NodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/
下。重启所有修改配置的节点。