Spark有两种内存管理模式,静态内存管理(Static MemoryManager)和动态(统一)内存管理(Unified MemoryManager)。动态内存管理从Spark1.6开始引入,在SparkEnv.scala中的源码可以看到,Spark目前默认采用动态内存管理模式,若将spark.memory.useLegacyMode设置为true,则会改为采用静态内存管理。

一、Spark内存管理模式

Spark有两种内存管理模式,静态内存管理(Static MemoryManager)和动态(统一)内存管理(Unified MemoryManager)。动态内存管理从Spark1.6开始引入,在SparkEnv.scala中的源码可以看到,Spark目前默认采用动态内存管理模式,若将Spark.memory.useLegacyMode设置为true,则会改为采用静态内存管理。

  1. //SparkEnv.scala
  2. valuseLegacyMemoryManager=conf.getBoolean("spark.memory.useLegacyMode",false)
  3. valmemoryManager:MemoryManager=
  4. if(useLegacyMemoryManager){
  5. newStaticMemoryManager(conf,numUsableCores)
  6. }else{
  7. UnifiedMemoryManager(conf,numUsableCores)
  8. }

 还不收藏?Spark动态内存管理源码解析!(spark的内存管理) Spark 内存 管理 第1张

二、Spark动态内存管理空间分配

相比于Static MemoryManager模式,Unified MemoryManager模型打破了存储内存和运行内存的界限,使每一个内存区能够动态伸缩,降低OOM的概率。由上图可知,executor JVM内存主要由以下几个区域组成:

(1)Reserved Memory(预留内存):这部分内存预留给系统使用,默认为300MB,可通过spark.testing.reservedMemory进行设置。

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024

另外,JVM内存的最小值也与reserved Memory有关,即minSystemMemory = reserved Memory*1.5,即默认情况下JVM内存最小值为300MB*1.5=450MB。

  1. //UnifiedMemoryManager.scala
  2. valminSystemMemory=(reservedMemory*1.5).ceil.toLong

(2)Spark Memeoy:分为execution Memory和storage Memory。去除掉reserved Memory,剩下usableMemory的一部分用于execution和storage这两类堆内存,默认是0.6,可通过spark.memory.fraction进行设置。例如:JVM内存是1G,那么用于execution和storage的默认内存为(1024-300)*0.6=434MB。

  1. //UnifiedMemoryManager.scala
  2. valusableMemory=systemMemory-reservedMemory
  3. valmemoryFraction=conf.getDouble("spark.memory.fraction",0.6)
  4. (usableMemory*memoryFraction).toLong

他们的边界由spark.memory.storageFraction设定,默认为0.5。即默认状态下storage Memory和execution Memory为1:1.

  1. //UnifiedMemoryManager.scala
  2. onHeapStorageRegionSize=
  3. (maxMemory*conf.getDouble("spark.memory.storageFraction",0.5)).toLong,
  4. numCores=numCores)

(3)user Memory:剩余内存,用户根据需要使用,默认占usableMemory的(1-0.6)=0.4.

三、内存控制详解

首先我们先来了解一下Spark内存管理实现类之前的关系。

1.MemoryManager主要功能是:(1)记录用了多少StorageMemory和ExecutionMemory;(2)申请Storage、Execution和Unroll Memory;(3)释放Stroage和Execution Memory。

Execution内存用来执行shuffle、joins、sorts和aggegations操作,Storage内存用于缓存和广播数据,每一个JVM中都存在着一个MemoryManager。构造MemoryManager需要指定onHeapStorageMemory和onHeapExecutionMemory参数。

  1. //MemoryManager.scala
  2. private[spark]abstractclassMemoryManager(
  3. conf:SparkConf,
  4. numCores:Int,
  5. onHeapStorageMemory:Long,
  6. onHeapExecutionMemory:Long)extendsLogging{

创建StorageMemoryPool和ExecutionMemoryPool对象,用来创建堆内或堆外的Storage和Execution内存池,管理Storage和Execution的内存分配。

  1. //MemoryManager.scala
  2. @GuardedBy("this")
  3. protectedvalonHeapStorageMemoryPool=newStorageMemoryPool(this,MemoryMode.ON_HEAP)
  4. @GuardedBy("this")
  5. protectedvaloffHeapStorageMemoryPool=newStorageMemoryPool(this,MemoryMode.OFF_HEAP)
  6. @GuardedBy("this")
  7. protectedvalonHeapExecutionMemoryPool=newExecutionMemoryPool(this,MemoryMode.ON_HEAP)
  8. @GuardedBy("this")
  9. protectedvaloffHeapExecutionMemoryPool=newExecutionMemoryPool(this,MemoryMode.OFF_HEAP)

默认情况下,不使用堆外内存,可通过saprk.memory.offHeap.enabled设置,默认堆外内存为0,可使用spark.memory.offHeap.size参数设置。

  1. //Allthecodeyouwilleverneed
  2. finalvaltungstenMemoryMode:MemoryMode={
  3. if(conf.getBoolean("spark.memory.offHeap.enabled",false)){
  4. require(conf.getSizeAsBytes("spark.memory.offHeap.size",0)>0,
  5. "spark.memory.offHeap.sizemustbe>0whenspark.memory.offHeap.enabled==true")
  6. require(Platform.unaligned(),
  7. "NosupportforunalignedUnsafe.Setspark.memory.offHeap.enabledtofalse.")
  8. MemoryMode.OFF_HEAP
  9. }else{
  10. MemoryMode.ON_HEAP
  11. }
  12. }
  1. //MemoryManager.scala
  2. protected[this]valmaxOffHeapMemory=conf.getSizeAsBytes("spark.memory.offHeap.size",0)

释放numBytes字节的Execution内存方法

  1. //MemoryManager.scala
  2. defreleaseExecutionMemory(
  3. numBytes:Long,
  4. taskAttemptId:Long,
  5. memoryMode:MemoryMode):Unit=synchronized{
  6. memoryModematch{
  7. caseMemoryMode.ON_HEAP=>onHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
  8. caseMemoryMode.OFF_HEAP=>offHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
  9. }
  10. }

释放指定task的所有Execution内存并将该task标记为inactive。

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
0

释放numBytes字节的Stoarge内存方法

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
1

释放所有Storage内存方法

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
2

2.接下来我们了解一下,UnifiedMemoryManager是如何对内存进行控制的?动态内存是如何实现的呢?

UnifiedMemoryManage继承了MemoryManager

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
3

重写了maxOnHeapStorageMemory方法,***Storage内存=***内存-***Execution内存。

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
4

核心方法acquireStorageMemory:申请Storage内存。

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
5

核心方法acquireExecutionMemory:申请Execution内存。

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
6

方法***调用了ExecutionMemoryPool的acquireMemory方法,该方法的参数需要两个函数:maybeGrowExecutionPool()和computeMaxExecutionPoolSize()。

每个Task能够使用的内存被限制在pooSize / (2 * numActiveTask) ~ maxPoolSize / numActiveTasks。其中maxPoolSize代表了execution pool的***内存,poolSize表示当前这个pool的大小。

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
7

maybeGrowExecutionPool()方法实现了如何动态增加Execution内存区的大小。在每次申请execution内存的同时,execution内存池会进行多次尝试,每次尝试都可能会回收一些存储内存。

  1. //UnifiedMemoryManager.scala
  2. privatevalRESERVED_SYSTEM_MEMORY_BYTES=300*1024*1024
8
转载请说明出处
知优网 » 还不收藏?Spark动态内存管理源码解析!(spark的内存管理)

发表评论

您需要后才能发表评论