Common issues of disk going full on EMR Cluster (or In general any Hadoop / Spark cluster)

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

A disk going full can make YARN on EMR UNHEALTHY. So, customer’s need to identify and proactively predict why each Application like Hadoop / Spark can occupy disk space and act accordingly. This Article focuses on some most common ways the EMR cluster can go full and recommends actions we could take for those specific scenarios.

Some common factors occupying disk space on EMR :

  • HDFS (/mnt/hdfs/  ) i.e dfs.datanode.data.dir may be the one that’s occupying most space.
  • YARN containers logs. (/mnt/var/log/hadoop-yarn/containers/)
  • Localized files during an Hadoop/spark job run using YARN framework. (yarn.nodemanager.local-dirs/filecache , ../usercache/filecache , ../usercache//appcache/<app-id>/), where yarn.nodemanager.local-dirs is usually /mnt/yarn/ on one disk setup. For multiple disks per Instance(not root), multiple disks will be used by comma separated value on yarn.nodemanager.local-dirs.
  • Spark Application history logs (hdfs:///var/log/spark/apps/)
  • It may also be a combination of all of the above.

YARN LOGS:

/mnt/var/log/hadoop-yarn/

If its the logs(/var/logs symlinked to  /mnt/var/logs/) that’s occupying more space in that list, we can use multiple mount points for yarn.nodemanager.log-dirs setting(Comma seperated). Currently, EMR only uses one mount point for storing YARN container  logs.

The container logs on local machines should be ideally deleted by components in this order.
1. By YARN Nodemanager after log aggregation. – (Logic Altered by EMR team)
2. By LogPusher after retention period.
3. IC’s DSM when its heuristics are satisfied.

1.
In YARN , If log aggregation is turned on (with the yarn.log-aggregation-enable config), when the spark application is completed , container logs are copied to HDFS and after post-aggregation they are expected to be deleted from the local machine by NodeManager’s AppLogAggregatorImpl. However on EMR , we seem to keep it on local machines because we need those logs for logpusher to push them to S3.(logpusher cannot push logs from HDFS). So, EMR had a feature introduced in EMR Hadoop branch-2.7.3-amzn (not adopted in open source) by an internal commit.

With this commit, we basically are having an option to keep the files on local machines after log aggregation. Managed by “yarn.log-aggregation.enable-local-cleanup” property in yarn-site.xml on respective core/task nodes. This property is not public and can only be set on EMR distributions. In the latest EMR AMI’s , this option is set to ‘false’. This means the cleanup WILL NOT take place.

–For the logs to be deleted from local disks, we need to flip it to true with configurations API while launching the cluster. On live cluster, all core/task node’s yarn-site.xml should be updated and NM should be restarted. After the restart old container logs might still be present. — (Read below)

*** This options might not recommended because the Logpusher , will NOT be able to push those local container logs to customer’s(service’s) S3 if this option is set to true.
** and the Only source of container logs will be aggregated logs on HDFS which is not so persistent.

2. With logs non-cleanup in local machine because Logpusher needs them in local dir’s , we seem to be relying on same Logpusher to delete those local files after a certain retention period of 4 hours. More particularly on “/var/log/hadoop-yarn/containers” (/etc/logpusher/hadoop.config). LogPusher will only delete logs if they have not been touched in four hours

3. Instance contoller’s DiskSpaceManager is kind of a fail-safe to avoid disk fill up i.e If disk space goes beyond certain % , DSM will mark some files(including local container logs)for deletion . DSM does seem to have issues deleting the log files because of user/permissions issues. Ideally it need to list and delete logs from all users(yarn/ spark / hive ) and not just hadoop user’s logs.

Hadoop & Spark Streaming Jobs :

In a streaming(Hadoop or Spark using YARN) application, it is reasonable to expect that a log would be touched at least once every four hours for the entire lifetime of the streaming job, resulting in LogPusher never deleting the file. This can lead to disk space filling, which can lead to the customer wanting to spread logs across multiple mounts. Spreading across multiple mounts is not the best solution: we specifically put logs into one mount to leave space on the customer’s cluster for data

The correct solution here is to implement/configure log rotation for container logs. This way, if we rotate on an hourly basis, we:

  • keep the overall size of each log down
  • give logpusher a chance to upload and delete old logs
  • saving disk space for the customer and
  • preventing us from having to add unnecessary features to logpusher

Enabling log rotation for spark using /etc/spark/conf/log4j.properties to rotate ${spark.yarn.app.container.log.dir}/spark-%d{yyyy-MM-dd-HH-mm-ss}.log

Similarly log rotation can be done for Hadoop YARN logs using
/etc/hadoop/conf/container-log4j.properties
/etc/hadoop/conf/log4j.properties

HDFS DATA:

/mnt/hdfs/
If HDFS is occupying most space , then we might need to monitor HDFS CW metric and trigger a resize of auto-scale accordingly(Or manual resize).  After the resize the blocks will NOT be balanced. Only new data will go the node you just added. Old HDFS data blocks will not balance out automatically. you will need to re-balance out HDFS so that disk utilization on this node goes below 90% More details on HDFS re-balancing are explained in 
 
HDFS utilization > 90% doesn’t necessarily mean disk on a particular node will be > 90%. This really depends on HDFS replication and how blocks are spread around.

https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/emr-metricscollected.html

– Customer might need to check replication factor of HDFS. is it too large ?

– Is there a recent scale-down which lead to HDFS decommissioning where Blocks will be moved to available core nodes , thus filling up nodes ?

YARN LOCALIZED FILES:

http://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/

/mnt/yarn/

If /mnt/yarn/ (yarn.nodemanager.local-dirs) i.e  YARN localized files  is going full, it can happen at different stages of the Application.

1. /mnt/yarn/ (yarn.nodemanager.local-dirs)

On EMR , /mnt/yarn/ is configured on yarn-site.xml for with yarn.nodemanager.local-dirs . The list of directories used on this parameters is used –

–  During a MapReduce job, intermediate data and working files are written to temporary local files. Because this data includes the potentially very large output of map tasks, you need to ensure that the yarn.nodemanager.local-dirs property, which controls the location of local temporary storage for YARN containers, is configured to use disk partitions that are large enough.

–  During resource localization by YARN NM i.e NM downloads  resources from the supported source (such as HDFS, HTTP, and so on) to the NodeManager node’s local directory.

– After the job finishes, the Node Managers automatically clean up the localized files immediately by default.

Scenario 1 : /mnt/yarn/usercache/hadoop/appcache/ – occupying more space.

Localized files rarely fill up volumes. Its usually intermediate data from Mappers that fills this up.

Troubleshooting steps  :

1. Confirm if the existence of large intermediate output files. In this case , from one single application ,  large GB’s of intermediate data from mapper attempts are about to fill up disk space on one core node.

2. We can also confirm from mapper syslogs , that this directory is being used for intermediate data (mapreduce.cluster.local.dir)

3. Now,

You can refer to the following NodeManager log during resource localization:

This directory will be used by multiple YARN Applications and its containers during its lifecycle. First, we need to check if the application and its containers are  still running and currently occupying disk space. If they an running and corresponding appcache is making disk to go full, then your application needs that cache. NM doesn’t really delete any appcache that is currently being used by running containers. So, you will need to provision more space to handle your application’s cache. if multiple applications are running and filling up your appcache together , then you might need to limit the parallelism of your applications or provision bigger volumes.

If Applications are not running(containers ) and you continue to see disk being full with appcache, you might need to tune NM to speed the trigger of deletion service. Some NM parameters that you might need to configure(yarn-site.xml ) to change how NM decides to trigger the deletion service to remove the appcache..

yarn.nodemanager.localizer.cache.cleanup.interval-ms : Interval in between cache cleanups.
yarn.nodemanager.localizer.cache.target-size-mb  : Target size of localizer cache in MB, per local directory.
yarn.nodemanager.delete.thread-count

You can also try  running

(for running that command, you’ll need to make sure that the machine from where you are yarn-site.xml contains yarn.sharedcache.admin.address (the default is 0.0.0.0:8047) property defined. You might even try master IP instead of 0.0.0.0. )

Another parameter to watch out for is yarn.nodemanager.delete.debug-delay-sec , this is Number of seconds after an application finishes before the nodemanager’s DeletionService will delete the application’s localized file directory and log directory. This is set to 0 by defaylt which means , it will not wait for deletion seervice. If you have large number of this, the appcache will not be deleted after the application finishes and will exists untill this time.

References :
https://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/
https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

 

Spark’s usercache & SPARK on YARN :

/mnt/yarn/usercache/hadoop/appcache/


Ex:

2.8G  ./mnt/yarn/usercache/hadoop/appcache/application_1474295547515_0187/blockmgr-42cdcd45-fe7d-4f0d-bf4b-5c819d4ef15e
3.5G  ./mnt/yarn/usercache/hadoop/appcache/application_1474295547515_0187/blockmgr-840ac0bf-b0dd-4573-8f74-aa7859d83832

/usercache/ , In usercache direcory suppose there are a lot of big folders like blockmgr-b5b55c6f-ef8a-4359-93e4-9935f2390367.

filling up  with blocks from the block manager, which could mean you’re persisting a bunch of RDDs to disk, or maybe have a huge shuffle. The first step would be to figure out which of those it may be and avoid the issue by caching in memory or designing to avoid huge shuffles. You can consider upping spark.shuffle.memoryFraction to use more memory for shuffling and spill less.

– In cluster mode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. In client mode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined in spark.local.dir. This is because the Spark driver does not run on the YARN cluster in client mode, only the Spark executors do.

– Access the application cache through yarn.nodemanager.local-dirs on the nodes on which containers are launched. This directory contains the launch script, JARs, and all environment variables used for launching each container.

http://spark.apache.org/docs/latest/running-on-yarn.html

spark.local.dir :     /tmp     Directory to use for “scratch” space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks.

NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager – which seem to be /mnt/ etc, that your were concerned about.

http://spark.apache.org/docs/latest/configuration.html
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

 

SPARK HISTORY LOGS:

/var/log/spark/apps/

hdfs dfs -du -h /
15.7 M   /apps
0        /tmp
2.1 G    /user
199.3 G  /var

Within /var/log/spark/apps/ there is currently 15,326 files ranging in size of 100KB to 30 MB in size.

26.1 M   /var/log/spark/apps/application_1489085921151_9995_1
6.6 M    /var/log/spark/apps/application_1489085921151_9996_1
28.2 M   /var/log/spark/apps/application_1489085921151_9997_1
6.0 M    /var/log/spark/apps/application_1489085921151_9998_1
24.4 M   /var/log/spark/apps/application_1489085921151_9999_1

So why is this happening and how can I get these log files cleaned up once they have been saved to s3?

Those are spark history logs. Those retention settings are separate from YARN container log settings and can be configured to clean up at shorter intervals as defined here:

http://spark.apache.org/docs/latest/monitoring.html

following spark-default configurations might help in cleaning up logs.

spark.history.fs.cleaner.enabled : true
spark.history.fs.cleaner.interval : 1d
spark.history.fs.cleaner.maxAge : 7d

Emr edit software settings: [classification”:”spark-defaults”,”properties”:{“spark.history.fs.cleaner.maxAge”:”7d”,”spark.history.fs.cleaner.interval”:”1d”,”spark.history.fs.cleaner.enabled”:”true”}}]

You can also disable history logs(Event logs)  if you don’t care for it; for large files it doesn’t work anyway.
For disabling, you can use  “–conf spark.eventLog.enabled=false”  on spark-submit

But EMR’S Apppusher might need this events logs to display onto EMR console the spark’s Application logs.

 

Some other factors to consider:

If there’s a NM restart oR RM restart during the localization , there might be some stale files on the usercache which might not be deleted by deletion service and those files might persist after job completion. So, You might need to manually delete them sometimes.

Tags: , , , , ,

Trackback from your site.

Leave a comment