EMR Config for Big cluster to create Many Paritions in Hive

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

EMR Configuration :






Create Cluster CLI Command :


In below command, search for <code>replace</code> and replace them with relevant parameters like your own security groups, ssh key , IAM Roles, EMR Log bucket etc.



Increase Spark Driver memory using PySpark session from EMR Notebooks

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce



Spark UI vs. Spark History Server UI , which one to use and why ?

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Is Job Running ?

1. If you have Spark Applications Running, then you should be using SPARK UI. This UI is usually hosted on Spark Driver
– In YARN cluster mode, the Driver is run on YARN Application Master run on random Core node )
– IN YARN Client  Mode, the Driver is run on Master node itself.
To access Spark UI, You should be going to  YARN ResourceManager UI First. Then navigate to corresponding Spark Application and use “Application Master” link to Access Spark UI. If you observe the link, its taking you  you to the application master’s web UI at port 20888. This is basically a proxy running on master  listening on 20888  which makes available the Spark UI(which runs on either Core node or Master node)

2. You can also access Spark UI by going directly to Driver Hostname and Portname where its hosted.
For example, when I run spark-submit in cluster mode, it spinned up application_1569345960040_0007. In my driver logs I see below messages
19/09/24 22:29:15 INFO Utils: Successfully started service ‘SparkUI’ on port 35395.
19/09/24 22:29:15 INFO SparkUI: Bound SparkUI to, and started at
Where ip-10-0-0-69.myermdomain.com is one of my core node.
So I can go to
This automatically routes me to Master node proxy server listening on port 20888
Please note that, these links are temporary and will only show the UI while the Spark Application is running.

Is Job Completed ?

But if you want to see UI even when Spark job is completed, you should use Spark HistoryServer UI directly at http://master-public-dns-name:18080/.
Spark History Server can also be used for Running Jobs using “Show Incomplete Applications” Button. Spark History Server does this by using Spark Event logs which is enabled on EMR by default.

Differences between Spark UI and Spark History UI

 But looks like Spark History Server has some  differences when compared to “Spark UI” (For Running Apps of course ). Some of em’ that I observed are :
– Spark UI has “Kill” Button so your can kill some Spark Stages while Spark History Server doesn’t.
– SPark UI has “SQL” tab which shows more information about spark-sql jobs while Spark History Server doesn’t.
– Spark UI can pull up live  Thread Dumps for Executors  while Spark History Server doesn’t.
– Spark UI can give most update to date info(like “Total Uptime”) on Tasks while there can be a bit lag in  Spark History Server UI.

Download and parse Presto Server logs on EMR to find an Exception

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

<p class=”dropcap”>This article guides your to download all presto server logs from all EMR nodes using AWS S3 CLI so we can parse and look for errors. We can extend this for any EMR Application logs. </p>

Here’s the script :

– Replace s3://EMR-Log-bucket with the Bucket/Prefix where you have your EMR logs.
Please see this document to locate the path.


– Replace Cluster Id (Here I am using an EMR cluster ID j-1QZPX8GEC1234)

– Replace “some-absolute-path” with your own absolute path where you want to download(like /mnt/emr-logs/)


EMRFS Role Mappings integration with LDAP JupyterHub EMR

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

This Article Assumes you have explored following Articles




This Article expects your have a working Windows AD server with LDAP enabled.

EMRFS Role Mapping Allows you to use a Different Roles than default EMR_EC2_DefaultRole which will be used to make calls to S3. Using security configurations, we define a mapping to map an User/Group/Prefix to a particular IAM Role. Example configuration,

user1_onGroup1 -> Group1

user1_onGroup1 -> Group2


Here, the User and Group can be LDAP User/ LDAP Group respectively.

This article guides you to integrate LDAP with JupyterHub on EMR. After this setup,  User’s on your LDAP server can login in EMR’s JupyterHub to submit YARN Jobs. We will also enable User Impersonation where YARN Jobs are run by your LDAP user and not default user like ‘yarn’. If  EMRFS Role Mapping is enabled, then an IAM Role corresponding to your LDAP User will be used to make calls to S3. This will also make sure that if an LDAP User belongs to an LDAP group, IAM Role Mapping corresponding to that Group will be used.

First Spin up EMR cluster using Following Configuration and also using above EMRFS Role mappings using security configurations.



  • livy.impersonation.enabled=true to enable Livy user impersonation.
  • hadoop.security.group.mapping = org.apache.hadoop.security.LdapGroupsMapping to make sure Hadoop connect directly to an LDAP server to resolve the list of groups instead of operating systems’ group name resolution. If we do not do this step, EMRFS Role Mapping will not work with LDAP Groups.
  • hadoop.security.group.mapping.ldap.bind.user , the user that will be used to make LDAP search to retreive Group information.
  • hadoop.security.group.mapping.ldap.bind.password, this user’s LDAP password
  • hadoop.security.group.mapping.ldap.url , hostname and port of your LDAP server
  • hadoop.security.group.mapping.ldap.base, configures the search base for the LDAP connection. This is a distinguished name, and will typically be the root of the LDAP directory.

See https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html#LDAP_Groups_Mapping

Now Login to Master Node and run the following scripts

Now login to JupyterHub UI using LDAP user’s credentials. Once you submit a spark job, the job will make use of IAM Role Mapped to this LDAP user to make calls to S3.


Some other Considerations

– we can use “hadoop.user.group.static.mapping.overrides” to provide necessary mapping so that for a user like Hadoop will use mapping defined here and not your LDAP server. Please see https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/core-default.xml

Example :

“hadoop.user.group.static.mapping.overrides” : “hive=hadoop,hive;hdfs=hadoop,hdfs;oozie=users,hadoop,oozie;mapred=hadoop,mapred;yarn=hadoop;”

– We can use “hdfs group <ldap-user-name>” to test if the org.apache.hadoop.security.LdapGroupsMapping is working or not. This command will contact your LDAP server configured on “hadoop.security.group.mapping.ldap.url” to get the LDAP Group information. If this returns an error, then there might be some issue with your LDAP server config that you set using “hadoop.security.group.mapping.ldap.*”.



Expectations while using EMRFS

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Expectations while using EMRFS :

– $ hadoop The fs -put command updates both the S3 and EMRFS tables (files are created in S3 and also in the EMRFS table).
Ex) hadoop fs -put ./111.txt s3: // mannem / emrfstest1
– $ hadoop The fs -rm command updates S3 but does not update the EMRFS table. (Deleted in S3, but still in the EMRFS table)
Ex) hadoop fs -rm -f s3: //mannem/emrfstest1/111.txt
– $ hadoop The fs -mv command will rename S3 (create a new name after deleting it), but only add a new name to the EMRFS table (add new name without deleting existing information).
Ex) hadoop fs -mv s3: //mannem/emrfstest1/emrfs-didnt-work.png s3: //mannem/emrfstest1/emrfs-didnt-work_new.png
– Adding files from the S3 console (WEB UI) is added to S3 but not to the EMRFS table
– Deleting files from the S3 console (WEB UI) will delete them in S3, but they will not be deleted in the EMRFS table
– Renaming a file in the S3 console (WEB UI) changes the name in S3, but does not rename the EMRFS table.
– The EMRFS CLI (for example, $ emrfs delete or $ emrfs sync) does not change the actual data in S3. Only add / delete DynamODB meta tables used by EMRFS.
– EMRFS uses DynamoDB. In EMR clusters, you can view the table names with the emrfs describe-metadata command. You can also see it on the EMR web console.


– In the EMRFS table, S3 prefix value is entered in HashKey and Object name is entered in Rangekey.
– You can optionally specify the number of retries and the time to wait until the next retry when an exception occurs
: Http://docs.aws.amazon.com/emr/latest/ManagementGuide/emrfs-retry-logic.html
– Please refer to http://docs.aws.amazon.com/emr/latest/ManagementGuide/emrfs-files-tracked.html for related information.

EMR vCPU vCore issue

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Several customer confuse when they see vCore’s used by EMR is different from what Ec2 vCPU’s. This article will clarify why EMR had to use vCore’s and some problems that exist with Instance Fleets and how to workaround them.

When you choose an instance type using the AWS EMR Management Console, the number of vCPU shown for each Instance type is the number of YARN vCores for that instance type, not the number of EC2 vCPUs for that instance type.

1.  EMR console is picking the yarn.nodemanager.resource.cpu-vcores value for the respective instance type from a predefined fixed mapping done by EMR for every instance type / Family.  For some instance types/families like M5’s, EMR made vCore same as Ec2 vCPU‘s. And for some other instance types(Like M4 family), the setting is double the actual Ec2 vCPU’s.

For example : EMR used 80 vcore’s for m4.10xlarge whereas Ec2 reports vCPU’s as 40.

2. So it seems that the intent here is to report VCore usage at the YARN level, as opposed to the actual ec2 instance level.

3. The discrepancy on the EMR Console exists because we’re trying to represent a cluster’s compute power from YARN perspective. Since EMR clusters run applications according to their YARN settings, I think some decision may have been made to deem this a better representation of the compute resources than ec2’s vCPU.

4. The reason this is done is  to ensure that YARN runs enough containers to max out the CPU as much as possible. EMR determined at the introduction of some instance type families, that for a majority of use-cases, without doubling this value, the instances CPUs will usually be underutilized because most of the time applications are I/O bound.That is, if vCPUs were set to the actual number of CPUs for these instance types, you’d get about one YARN container per actual vCPU, but those containers would spend most of their time blocked on I/O anyway, so you could probably actually run more containers in order to max out the CPU.

5. Amazon EMR makes an effort to provide optimal configuration settings as defaults for each instance type for the broadest range of use cases(types of application and Engines like MapReduce and Spark).  However it is possible that you may need to manually adjust these settings for the needs of your application. This value may be changed via the Configuration API referencing the yarn.nodemanager.resource.cpu-vcores for your different applications and workloads using “yarn-site” classification.


Problems :

Sometime customer do not want to double the VCore’s for each vCPU. Some use-cases can cause  containers to compete for a single hyperthread(VCPU) causing subpar performance.  despite other nodes still having idle hyperthreads.

For example : If a customer has

– 1 m4.10xlarge core(which will double 40 vCPU to 80 vCore’s) &

– 1 m5.12xlarge Task node(with 1:1 mapping of  vCPU to vCore i.e 48 )

We cannot define “yarn.nodemanager.resource.cpu-vcores” as a fixed value for this cluster.

– For Instance Fleets, where  multiple instance families can be specified by customer  and fulfilled by EMR to satisfy Target capacity, its hard to set a single EMR configuration that encompasses all instance families.

– Similar issue exists for Uniform instance groups, with different instance families on different Core’s / Task group.

Workarounds :

– I tested the following settings to be set on “yarn-site” classification. This will tell YARN Nodemanagers(NM) on each node to use the vCpu’s of its underlying ec2 Linux instance  as vCore’s. This will ignore the defaults set by EMR. This means a m4.10xlarge node(NM) will only allocate 40 vcore’s for YARN containers because there are 40 logical processors.  The correct number off Vcore’s should be confirmed from ResourceManager UI or using YARN commands like  “yarn node -status ip-172-31-44-155.ec2.internal:8041”



Please see what these parameters mean here :


Note: If you use above parameters, AWS EMR Console(or Describe cluster API call) will still show 80 vcore’s(double the VCPU) for m4.10xlarge because that info is based on the predefined fixed mapping for different instance types and NOT pulled from your live nodes. For Instance fleets, EMR will still use 80 vcore’s count towards  capacity calculations to launch resources that satisfy your  Target capacity units per Fleet.  This could cause discrepancy in what Vcore capacity you need vs. what actually is provisioned and being used.

– For this reason, another suggestion is to use same Instance family(Like M5) on all your instance groups of the instance fleets and also for all fleets ,  so that they can have consistency with predictable 1:1 vCpu and vCore mapping.

Common issues of disk going full on EMR Cluster (or In general any Hadoop / Spark cluster)

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

A disk going full can make YARN on EMR UNHEALTHY. So, customer’s need to identify and proactively predict why each Application like Hadoop / Spark can occupy disk space and act accordingly. This Article focuses on some most common ways the EMR cluster can go full and recommends actions we could take for those specific scenarios.

Some common factors occupying disk space on EMR :

  • HDFS (/mnt/hdfs/  ) i.e dfs.datanode.data.dir may be the one that’s occupying most space.
  • YARN containers logs. (/mnt/var/log/hadoop-yarn/containers/)
  • Localized files during an Hadoop/spark job run using YARN framework. (yarn.nodemanager.local-dirs/filecache , ../usercache/filecache , ../usercache//appcache/<app-id>/), where yarn.nodemanager.local-dirs is usually /mnt/yarn/ on one disk setup. For multiple disks per Instance(not root), multiple disks will be used by comma separated value on yarn.nodemanager.local-dirs.
  • Spark Application history logs (hdfs:///var/log/spark/apps/)
  • It may also be a combination of all of the above.



If its the logs(/var/logs symlinked to  /mnt/var/logs/) that’s occupying more space in that list, we can use multiple mount points for yarn.nodemanager.log-dirs setting(Comma seperated). Currently, EMR only uses one mount point for storing YARN container  logs.

The container logs on local machines should be ideally deleted by components in this order.
1. By YARN Nodemanager after log aggregation. – (Logic Altered by EMR team)
2. By LogPusher after retention period.
3. IC’s DSM when its heuristics are satisfied.

In YARN , If log aggregation is turned on (with the yarn.log-aggregation-enable config), when the spark application is completed , container logs are copied to HDFS and after post-aggregation they are expected to be deleted from the local machine by NodeManager’s AppLogAggregatorImpl. However on EMR , we seem to keep it on local machines because we need those logs for logpusher to push them to S3.(logpusher cannot push logs from HDFS). So, EMR had a feature introduced in EMR Hadoop branch-2.7.3-amzn (not adopted in open source) by an internal commit.

With this commit, we basically are having an option to keep the files on local machines after log aggregation. Managed by “yarn.log-aggregation.enable-local-cleanup” property in yarn-site.xml on respective core/task nodes. This property is not public and can only be set on EMR distributions. In the latest EMR AMI’s , this option is set to ‘false’. This means the cleanup WILL NOT take place.

–For the logs to be deleted from local disks, we need to flip it to true with configurations API while launching the cluster. On live cluster, all core/task node’s yarn-site.xml should be updated and NM should be restarted. After the restart old container logs might still be present. — (Read below)

*** This options might not recommended because the Logpusher , will NOT be able to push those local container logs to customer’s(service’s) S3 if this option is set to true.
** and the Only source of container logs will be aggregated logs on HDFS which is not so persistent.

2. With logs non-cleanup in local machine because Logpusher needs them in local dir’s , we seem to be relying on same Logpusher to delete those local files after a certain retention period of 4 hours. More particularly on “/var/log/hadoop-yarn/containers” (/etc/logpusher/hadoop.config). LogPusher will only delete logs if they have not been touched in four hours

3. Instance contoller’s DiskSpaceManager is kind of a fail-safe to avoid disk fill up i.e If disk space goes beyond certain % , DSM will mark some files(including local container logs)for deletion . DSM does seem to have issues deleting the log files because of user/permissions issues. Ideally it need to list and delete logs from all users(yarn/ spark / hive ) and not just hadoop user’s logs.

Hadoop & Spark Streaming Jobs :

In a streaming(Hadoop or Spark using YARN) application, it is reasonable to expect that a log would be touched at least once every four hours for the entire lifetime of the streaming job, resulting in LogPusher never deleting the file. This can lead to disk space filling, which can lead to the customer wanting to spread logs across multiple mounts. Spreading across multiple mounts is not the best solution: we specifically put logs into one mount to leave space on the customer’s cluster for data

The correct solution here is to implement/configure log rotation for container logs. This way, if we rotate on an hourly basis, we:

  • keep the overall size of each log down
  • give logpusher a chance to upload and delete old logs
  • saving disk space for the customer and
  • preventing us from having to add unnecessary features to logpusher

Enabling log rotation for spark using /etc/spark/conf/log4j.properties to rotate ${spark.yarn.app.container.log.dir}/spark-%d{yyyy-MM-dd-HH-mm-ss}.log

Similarly log rotation can be done for Hadoop YARN logs using


If HDFS is occupying most space , then we might need to monitor HDFS CW metric and trigger a resize of auto-scale accordingly(Or manual resize).  After the resize the blocks will NOT be balanced. Only new data will go the node you just added. Old HDFS data blocks will not balance out automatically. you will need to re-balance out HDFS so that disk utilization on this node goes below 90% More details on HDFS re-balancing are explained in 
HDFS utilization > 90% doesn’t necessarily mean disk on a particular node will be > 90%. This really depends on HDFS replication and how blocks are spread around.


– Customer might need to check replication factor of HDFS. is it too large ?

– Is there a recent scale-down which lead to HDFS decommissioning where Blocks will be moved to available core nodes , thus filling up nodes ?




If /mnt/yarn/ (yarn.nodemanager.local-dirs) i.e  YARN localized files  is going full, it can happen at different stages of the Application.

1. /mnt/yarn/ (yarn.nodemanager.local-dirs)

On EMR , /mnt/yarn/ is configured on yarn-site.xml for with yarn.nodemanager.local-dirs . The list of directories used on this parameters is used –

–  During a MapReduce job, intermediate data and working files are written to temporary local files. Because this data includes the potentially very large output of map tasks, you need to ensure that the yarn.nodemanager.local-dirs property, which controls the location of local temporary storage for YARN containers, is configured to use disk partitions that are large enough.

–  During resource localization by YARN NM i.e NM downloads  resources from the supported source (such as HDFS, HTTP, and so on) to the NodeManager node’s local directory.

– After the job finishes, the Node Managers automatically clean up the localized files immediately by default.

Scenario 1 : /mnt/yarn/usercache/hadoop/appcache/ – occupying more space.

Localized files rarely fill up volumes. Its usually intermediate data from Mappers that fills this up.

Troubleshooting steps  :

1. Confirm if the existence of large intermediate output files. In this case , from one single application ,  large GB’s of intermediate data from mapper attempts are about to fill up disk space on one core node.

2. We can also confirm from mapper syslogs , that this directory is being used for intermediate data (mapreduce.cluster.local.dir)

3. Now,

You can refer to the following NodeManager log during resource localization:

This directory will be used by multiple YARN Applications and its containers during its lifecycle. First, we need to check if the application and its containers are  still running and currently occupying disk space. If they an running and corresponding appcache is making disk to go full, then your application needs that cache. NM doesn’t really delete any appcache that is currently being used by running containers. So, you will need to provision more space to handle your application’s cache. if multiple applications are running and filling up your appcache together , then you might need to limit the parallelism of your applications or provision bigger volumes.

If Applications are not running(containers ) and you continue to see disk being full with appcache, you might need to tune NM to speed the trigger of deletion service. Some NM parameters that you might need to configure(yarn-site.xml ) to change how NM decides to trigger the deletion service to remove the appcache..

yarn.nodemanager.localizer.cache.cleanup.interval-ms : Interval in between cache cleanups.
yarn.nodemanager.localizer.cache.target-size-mb  : Target size of localizer cache in MB, per local directory.

You can also try  running

(for running that command, you’ll need to make sure that the machine from where you are yarn-site.xml contains yarn.sharedcache.admin.address (the default is property defined. You might even try master IP instead of )

Another parameter to watch out for is yarn.nodemanager.delete.debug-delay-sec , this is Number of seconds after an application finishes before the nodemanager’s DeletionService will delete the application’s localized file directory and log directory. This is set to 0 by defaylt which means , it will not wait for deletion seervice. If you have large number of this, the appcache will not be deleted after the application finishes and will exists untill this time.

References :


Spark’s usercache & SPARK on YARN :



2.8G  ./mnt/yarn/usercache/hadoop/appcache/application_1474295547515_0187/blockmgr-42cdcd45-fe7d-4f0d-bf4b-5c819d4ef15e
3.5G  ./mnt/yarn/usercache/hadoop/appcache/application_1474295547515_0187/blockmgr-840ac0bf-b0dd-4573-8f74-aa7859d83832

/usercache/ , In usercache direcory suppose there are a lot of big folders like blockmgr-b5b55c6f-ef8a-4359-93e4-9935f2390367.

filling up  with blocks from the block manager, which could mean you’re persisting a bunch of RDDs to disk, or maybe have a huge shuffle. The first step would be to figure out which of those it may be and avoid the issue by caching in memory or designing to avoid huge shuffles. You can consider upping spark.shuffle.memoryFraction to use more memory for shuffling and spill less.

– In cluster mode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. In client mode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined in spark.local.dir. This is because the Spark driver does not run on the YARN cluster in client mode, only the Spark executors do.

– Access the application cache through yarn.nodemanager.local-dirs on the nodes on which containers are launched. This directory contains the launch script, JARs, and all environment variables used for launching each container.


spark.local.dir :     /tmp     Directory to use for “scratch” space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks.

NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager – which seem to be /mnt/ etc, that your were concerned about.





hdfs dfs -du -h /
15.7 M   /apps
0        /tmp
2.1 G    /user
199.3 G  /var

Within /var/log/spark/apps/ there is currently 15,326 files ranging in size of 100KB to 30 MB in size.

26.1 M   /var/log/spark/apps/application_1489085921151_9995_1
6.6 M    /var/log/spark/apps/application_1489085921151_9996_1
28.2 M   /var/log/spark/apps/application_1489085921151_9997_1
6.0 M    /var/log/spark/apps/application_1489085921151_9998_1
24.4 M   /var/log/spark/apps/application_1489085921151_9999_1

So why is this happening and how can I get these log files cleaned up once they have been saved to s3?

Those are spark history logs. Those retention settings are separate from YARN container log settings and can be configured to clean up at shorter intervals as defined here:


following spark-default configurations might help in cleaning up logs.

spark.history.fs.cleaner.enabled : true
spark.history.fs.cleaner.interval : 1d
spark.history.fs.cleaner.maxAge : 7d

Emr edit software settings: [classification”:”spark-defaults”,”properties”:{“spark.history.fs.cleaner.maxAge”:”7d”,”spark.history.fs.cleaner.interval”:”1d”,”spark.history.fs.cleaner.enabled”:”true”}}]

You can also disable history logs(Event logs)  if you don’t care for it; for large files it doesn’t work anyway.
For disabling, you can use  “–conf spark.eventLog.enabled=false”  on spark-submit

But EMR’S Apppusher might need this events logs to display onto EMR console the spark’s Application logs.


Some other factors to consider:

If there’s a NM restart oR RM restart during the localization , there might be some stale files on the usercache which might not be deleted by deletion service and those files might persist after job completion. So, You might need to manually delete them sometimes.


Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Contains different configurations and procedures to enable logging on different daemons on AWS EMR cluster.
[Please contribute to this article to add additional ways to enable logging]

HBASE on S3 :

This will enable calls made from EMRFS from HBASE.

Important to troubleshoot S3 consistency issues and failures for HBASE on S3 cluster.

Enabling DEBUG on Hive Metastore daemon (its Datastore) on EMR :


Logs at /var/log/hive/user/hive/hive.log


use_get_log_api=true in the beeswaxsection of the hue.ini configuration file.

Hadoop and MR :

Enable GC verbose on Hive Server 2 JVM:

WIRE OR DEBUG logging on EMR to check calls to S3 and DDB for DynamoDb connector library :

Paste the following on log4j configurations of Hadoop / hive / spark etc.



Debug on S3 Calls from EMR HIVE :

These metrics can be obtained from the hive.log when enabling debug logging in aws-java-sdk. To enable this logging, add the following line to '/etc/hive/conf/hive-log4j.properties'. The Configuration API can be used as well.

Enable DEBUG logging for Http Connection pool:

(from spark) by adding the following to /etc/spark/conf/log4j.properties

*Tez overwrites the loglevel options we have passed. Please see the related items.*

Enabling Debug on Hadoop log to log calls by EMRFS :


You can use same logging config for other Application like spark/hbase using respective log4j config files as appropriate. You can also use EMR log4j configuration classification like hadoop-log4j or spark-log4j to set those config’s while starting EMR cluster.(see below for sample JSON for configuration API)


DEBUG on EMR Logpusher Logs :

Edit this file on Master / Slave’s manually and restart Logpusher.


(Might need to stop Service-nanny before stopping Logpusher, to properly stop/start Logpusher)

DEBUG on Spark classes :

Use the following EMR config to set DEBUG level for relevant class files.

DEBUG using spark shell:

Execute the following commands after invoking spark-shell to enable DEBUG logging on respective spark classes like Memstore. You can use the same if you want to reduce the amount of logging from INFO (which is default coming from log4j.properties in the spark conf ) to ERROR.

EMRFS CLI command like EMRFS SYNC :


Logs will be on the console out. We might need to redirect to a File or do both.

Enable Debug on Boto3 client :

EMR Dynamodb Connector : Import data to DynamoDB from S3 : FAQ’s

Written by mannem on . Posted in AWS BIG DATA, Data Pipelines, Dynamo DB, EMR || Elastic Map Reduce


1 Q: How much time does my Import job take?

Certainly, longer than Export job. Here’s how we calculate the Import job duration:

While importing into a table, Dynamodb Storage-Handler library relies on BatchWriteItem operations. A single BatchWriteItem item operation can write upto 16MB which can comprise as many as 25 items(DynamoDB service limit). Each item write is separate from dynamodb metering perspective and hence atleast cost one write IOPS. This means imports require significantly more IOPS and time to complete compared to exports.

Average_item_size = table size/ item count  ->You get this information from original table, not backup.

If Average_item_size < 1kilobytes then IOPS_per_Item = 1

if Average_item_size > 1kilobytes then IOPS_per_Item = round it to the nearest higher kilobyte.

For example,

if Average_item_size = 50 bytes, IOPS_per_Item = 1

if Average_item_size =  3520 bytes, IOPS_per_Item = 4

Time in hours = (item_count * IOPS_per_Item) /  (ProvisionedWriteThroughputAllocatedForJob * 60 * 60)



– If you see Provisined WCU’s(Write capacity units) for your DynamoDB table fully being utilized by this import job, then that’s the fastest you can go.  To speed it up, one parameter that is really important to increase is of course the WCU of the table.

2 Q: How many mappers or Reducers my EMR cluster can run concurrently without being queued up?

To determine the number of parallel mappers, you will need to check this documentation from EMR called Task Configuration where EMR had a predefined mapping set of configurations for every instance type which would determine the number of mappers/reducers.


For example: Let’s say you have 5 m1.xlarge core nodes. According to the default mapred-site.xml configuration values for that instance type from EMR docs, we have

mapreduce.map.memory.mb = 768

yarn.nodemanager.resource.memory-mb = 12288

yarn.scheduler.maximum-allocation-mb = 12288 (same as above)

You can simply divide the later with former setting to get the maximum number of mappers supported by one m1.xlarge node = (12288/768) = 16

So, for the 5 node cluster, it would a max of 16*5 = 80 mappers that can run in parallel (considering a map only job). The same is the case with max parallel Reducers (30) and Applications Masters. You can do similar math for a combination of mappers and reducers and AM’s.

So, If you want to run more concurrent mappers without them being queued up by YARN,

– you can either re-size the cluster and add more nodes or

– If your application doesnt really need default memory set by EMR for mappers, reduce the mapreduce.map.memory.mb(and its heap mapreduce.map.java.opts kaing it ~ 80% of memory.mb) on every node and restart NM.

When spinning up a new cluster, you can change above memory settings with  Configurations API.


3 Q: My cluster can launch 14 mappers in parallel. why does job use only one/few mapper?

Based on,





The number of mappers decided for the job = minimum of these parameters:

1. #splits generated based on files/manifest on s3 (or HDFS ?).

calculation in ImportInputFormat.java: simply generate one split per S3 file and does not group S3 files unless #files > MAX_NUM_SPLITS(100000).

(using emr-dynamodb-tools)

In HIVE, if the data on S3 is NOT in the format of <Text, DynamoDBItemWritable> ,   the splits are calculated differently using CombineHiveInputFormat where multiple(small) input files on s3 are combined to make a split with a max size of  244 MB.

mapreduce.input.fileinputformat.split.maxsize    256000000       file:/etc/hive/conf.dist/hive-site.xml

hive.input.format          org.apache.hadoop.hive.ql.io.CombineHiveInputFormat org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2e8ab815  programatically

mapred.input.format.class         org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

input.FileInputFormat (FileInputFormat.java:listStatus(283))

io.CombineHiveInputFormat (CombineHiveInputFormat.java:getCombineSplits(451))

2. number of mappers that cluster can launch. mapreduce.job.maps  <- from cluster configuration EMR calculates this.

defaults per node ->  http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html

3. number of mappers that cluster can launch reserving space for YARN AM’s (as determined from TaskCalculator.java)

Examples :

– If I have 1 70MB data file on s3, it will only launch one mapper.

– If I have 2 250MB file, it will do 4 mappers. In this case, If cluster only has current capacity for 1 mapper container, then it will only run one mapper.

– If I have 1 70 MB file on s3, if my cluster do not have any capacity for a mapper after reserving space for AM, then the job will still request one mapper but it will be placed on queue until there’s capacity for one mapper.

Each mapper task will share the (Write capacity configured on DDB *  dynamodb.throughput.write.percent ) equally and using AbstractDynamoDBRecordWriter it will do BatchWrites trying to rate limit to whatever write capacity that task is assigned with.

We can control the max mappers used for an import job number using


And reduce tasks (will there be any? – hive may generate a plan based on Hive Query) using normal mapreduce parameter


4 Q: why are my Provisioned WCU’s of table are not being consumed in my import job?

Always check if dynamodb.throughput.write.percent = 1 on jobconf. The rate limiter only tries to use % WCU’s based on this parameter. If this is low, the import job will use less WCU’s.

Possible bottlenecks


1. Memory :

OutOfMemoryError‘s like

java.lang.OutOfMemoryError: GC overhead limit exceeded on Mappers tasks

Let’s assume that we want to import a large DDB table with millions of items. We provision table with 1000 WCU’s to make import faster. But we use an EMR cluster has just one core/task node which can support one mapper. we already set dynamodb.throughput.write.percent =1 to make it faster.

But, job calculates to use a single mapper task based on above calculations. This mapper is allowed to use all 1000 WCU’s.  A single mapper task is a JVM and has heap space allocated for it.  A mapper task with 400MB can do like 200 WCU’s. Having less memory for a mapper task to accomplish high WCU’s might cause OOM’s and mapper task can get killed and retried. The retries also use same heap space and can get killed eventually failing the entire import job. Out of memory errors often happens along with high CPU usage as well.

So, in these scenarios use a Mapper with high memory and better CPU using

mapreduce.map.java.opts , mapreduce.map.memory.mb.

Default memory for mapper on EMR for a particular instance type is here:



2. Network IO:

In above scnerio, If the core/task node on which mappers task runs , happen to be an ec2 instance type with ‘Low Network Performance’ , then it cannot really acheive the the 1000 WCU to DynamoDB  that it want to do. If you see n/w bandwidth from ec2 metrics ceiling and never increased from that ceiling , then see the following list and use a high n/w performance instance type.



Suggested Parameters to use:

mapreduce.map.speculative = false (will make sure there’s isnt 2 mapper tasks writing data to DDB at same time which would cause 4xx’s and could fail the task and the job.)


5 Q: Can I use Autoscaling on DynamoDB to improve performance of import rather than manually increasing capacity before every import job?

Yes. AbstractDynamoDBRecordWriter will DescribeTable every 5 minutes and gets updated Provisioned Write capacity and will do New writes per second based on updated capacity.

ToDo: Does this work for multiple mappers ?

6 Q: Why do i see Number of reduce tasks is set to 0 since there’s no reduce operator on import job? Where is Identity reducer?

This is set in Hive as hive doesn’t do manifest because its doing INSERT OVERWRITE. It writes everything as new with mapper writing directly to DDB.

7 Q: Checking RetriedReadExceptions and RetriedWriteExceptions ?

The Import jobs are designed to retry exceptions. list of counters printed at the end of each Map Reduce job. Check counters on RM UI or job history file(.jhist) on your EMR cluster.

8 Q: How to check progress of the import job while its running?

From the current library functionality, looks like there’s no Hadoop job counter that specifies items written by the import job. So, we will need to check the live container logs on the RM UI to ‘Total items written per mapper task.

INFO [main] org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter: New writes per second: 88

2018-02-07 19:06:46,828 INFO [main] org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter: Total items written: 101041

 Total items written by Import job is roughly = #mappers * (Total items written)

To get exact count, you can add up add up from all mapper’s container logs.

RM UI : https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html

– As long as the job is running, the container logs also can be found on local disks of core/task nodes at /var/log/hadoop-yarn/containers/

9  Q: How to verify items written after import job ended?

Check HIVE counters on Job counters on RM UI or on .jhist file.

 RECORDS_OUT_1_default.ddb2  978,316

 RECORDS_IN  978,316

We can use RM UI usually located at path like




10  Q: Does import job uses LOCAL DISK of EMR nodes?

 Usually no. But, mappers may spill to disk -the intermediate data based on mapreduce.map.sort.spill.percent and memory utilized. but since mapper is writing directly to DynamoDB and by default there’s no reducers involved on HIVE , spills are very less.

Check ‘FILE: Number of bytes written’ Hadoop Job counter for the any files written to disk by the job.

11 Q: If my jobs fails in middle, will DDB items written so far be gone?

No. If the jobs fails for some reason, the items written so far by individual mapper tasks cannot be deleted by the import job.  You will need to delete items manually or re-create the table for import.

Note that when running import job, If an item with the same key exists in the target DynamoDB table, it will be overwritten. If no item with the key exists in the target DynamoDB table, the item is inserted.

12 Q: How to enable WIRE OR DEBUG logging on EMR to check calls to S3 and DDB?

Use the following on appropriate log4j configurations of Hadoop / hive / spark etc on Master node before starting the job. Some configuration paths on EMR master are





log4j.rootCategory=DEBUG, stdout



log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2} – %m%n




Then check container logs

– on RM UI or

– on Local disks of core’s or (/var/log/hadoop-yarn/containers)

– on S3 log bucket configured for the EMR cluster.

13 Q: Why do I see a TEZ container although I do not use TEZ


14 Q: Can you run Import/Export tools on stand-alone EMR cluster without Data Pipelines?

We don’t provide emr-dynamodb-tools module on EMR clusters, because this module is not supported. The same reason there is no documentation for it.


This module was developed by DynamoDb team and uses custom format for import and export operations.

i.e ExportOutputFormat or  ExportManifestOutputFormat to write files to S3


ImportInputFormat for reading that same exported data(or manifests of data) from S3.

15 Q: What are the costs Associated with Import job?

 – DynamoDb table Provisioned WCU’s


– EMR cluster costs(EMR + Ec2 + EBS if configured for EMR)


– Data Pipeline (if the template is used to run import job)


– S3 bucket costs : https://aws.amazon.com/govcloud-us/pricing/s3/

Use this tool to estimate your monthly bill


16 Q. Which regions is import/export available in? We wish to use import export in NA/EU/FE/CN.

(When using Data Pipeline Template of “IMport DynamoDB backup data from s3” )

Note that Data Pipeline’s service is only available in some regions. But a Data Pipeline created in (let’s say us-east-1) can basically run an EMR cluster resource  in any other region  and that cluster can do the  import job from any region’s DynamoDB table to any regions S3 bucket.


– EmrCluster has a region field:


– Data Pipeline Template has ‘myDDBRegion’ paramter which will be passed to EmrActivity used to set the DynamoDB region for the Import job.

– EMR (with EMRFS) should be able to access S3 buckets in any region.

When doing cross region imports, its better to choose EMR cluster close to either DynamoDb’s region or S3 region. B/w DDb and S3 , I think EMR should be closer to service which encounters large latency for data transfer that could impact the performance.

I speculate, EMR should be close to S3. But this is something need to be tested.

17 Q: Can you Import Data from S3 bucket of different Account’s DynamoDB table?

Let’s say DynamoDb Table is in Account A.

The S3 contents need to be imported is on Account B.

1. The best way is to copy that data on Account B to S3 bucket on Account A and run the import job on Account A.

Note: The Data by default will use a manifest file which has Full paths of objects including bucket-names. So, you’ll probably need to edit the bucket names to Account A’s bucket name before running the import job.


2. If your EMR cluster on Account A (Assuming EMR_Ec2_Default role) has necessary permissions on objects of bucket in Account B , then the import job run on Account A EMR cluster might NOT work.


18 Q: Does AWS Data Pipeline spin up nodes based on any particular calculation for import job?

Yes. EDP calculates the nodes based on the number of mappers the job needs. Datapipeline has a  setting for the DynamoDB import/export function, resizeClusterBeforeRunning, that will override your settings on CORE nodes on EmrResource object. It attempts to choose the best options based on an algorithm but this is still a work in progress..


For example , A job might need 5 mappers which it can calculate based on IOPS. Those 5 mappers can work up to 6GB heap and 100,000 items. So, EDP configures each mapper to be 6 GB memory and spins ups nodes with such config based on instance type.

This calculation is done if resize operation on EMR resource object is set to true,.

19 Q. Can I add a Data Pipeline to transform the data between the import and export pipelines?

Data pipelines Import/export template ultimately runs Hive import/export tool on EMR cluster. This tool is built with open source library and packaged JAR’s are included in EMR clusters by Data Pipelines service.



These pre-included tools DOES NOT allow you to transform the data.

For import, they just expect the data on S3 in DynamoDB Input format which is like new line delimited JSON (created with previous Export from similar tool). And it puts that data to S3 as is.

To transform the data, you’ll need to tweak the Pipeline definition so that you run your own HIVE queries on EMR.


Using Hive queries, you can make use of pre-included “org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler” to read / query and transform / write data directly from/to DynamoDB and S3.



An example Pipeline which runs Hive queries to import data to DDB.



Any transformations that you want can be achieved with HIVE DML’s on/before “INSERT OVERWRITE” which does the writing to DDB table.


For export jobs, you can use Hive Queries which support predicate push down so that a HIVE SQL-like SELECT QUERY with a WHERE clause is translated to a equivalent QUERY with a  KeyConditionExpression on  DynamODB QUERY API instead of usual full DynamoDB SCAN on the table.


This means instead of FULL SCAN’s which fetches all your items from DynamoDb table(consuming large RCU’s) , a SELECT Query (with proper WHERE clause and comparision operators on Primary or/and Range Key )  can just translate that condition to  to a  KeyConditionExpression on the DynamoDB query.


A Scan Filter is also being used

– A DynamodDB FilterExpression is never Applied but only KeyConditionExpression is applied in such QUERY call.

– It seems that neither global secondary indexes or local indexes are supported, however scenarios involving a query on a single primary key are recognized pretty well.

– This makes it practical to use a primary key as a method of partitioning your data to avoid EMR queries taking longer over time as the table grows.

– DynamoDB does not support filters on columns of types set

The QUERY for some common scenarios are well explained here:


20 Q. Can we use the import/export in the above use case considering the data in Table B is continually changing? Can we modify the import task to include conditional writes?

I do not think the open source library can handle live data changes. Its designed to work on stable source and destinations.

I don’t think you can modify import task to do conditional writes using the tool. The tool just implements BatchWrites from chunks of fixed data on S3 to DDB table.

If an item with the same key exists in the target DynamoDB table, it will be overwritten(Replaced). If no item with the key exists in the target DynamoDB table, the item is inserted. With batch write you can’t put conditions on individual items thus you can’t prevent it from updating lets say a latest Item.


For live changes, you’ll need to use a different library or the DynamoDB features.

You can create tables that are automatically replicated across two or more AWS Regions, with full support for multi-master writes. This gives you the ability to build fast, massively scaled applications for a global user base without having to manage the replication process. For more information, see Global Tables.

Use DynamoDB Cross-Region Replication  https://github.com/awslabs/dynamodb-cross-region-library

21 Q:  Are there any TPS restrictions in the use of import/export?

TPS is basically limited by

– provisioned Read / Write capacity of DynamoDb

–  Read / Write ratio parameter that you set on the import / export tools,.

– Some other parameters like table size and Avg. item size can impact the performance.

– of course, the import/export uses EMR cluster where it uses Tez engine or MR engine using YARN framework. So, typical bottlenecks on ec2 instance types like memory / IO / CPU can be present and the speed depends on the concurrency of containers that your cluster can support.

22 Q:  Do we need to code the EMR job for the import/export ourselves?


You can always alter the open source implementation, build ,. package the JAR onto EMR to implement custom logics to handle your needs.

– Data Pipeline downloads relatively old import export tools( emr-dynamodb-tools written for hive 2.1.0) to EMR, which based on its AMI version can contain old Connector libraries(emr-dynamodb-hadoop and emr-dynamodb-hive) packaged.

The latest Library could have several improvements like bug fixes , Features ,  latest AWS SDK’s etc to handle large data sets effectively.



Possible customizations that can be done to awslabs/emr-dynamodb-connector library for large imports in terms of TB’s

– infinite retries on ProvisionedThroughputExceededException’s and Internal Server Error’s.


– max burst capabilities.

– log to S3 and continuation on incompatible items,

– basic CW progress monitoring to report totalItemsWritten , totalIOPSConsumed written by AbstractDynamoDBRecordWriter.

– capability to store invalid items in an s3 bucket (as staging). So, they can be later altered and retried.

23 Q: How to Import CSV or XML data on S3 to DynamoDB?

A Hive script is needed for such import job. We have some examples here for Data Pipelines.



24 Q: Do I need to use EMRFS consistent view on EMR for Import job ?

 Consistent view on EMRFS is designed to help you with eventual consistency problem when Reading from S3.  If an Export job just completed and wrote Files to S3 and an Import job is immediately(or within minutes delay) tries to read those files, the import job can hit FileNotFound error and might fail the job.

So, If an Import job  need to read several files which are just written,  its a good idea to have this enabled.  If the Export job did not use the EMRFS consistent view to write to S3 , then the corresponding metadata might not be present on DynamoDB table – that EMR uses for checkpointing.  You can always SYNC the metadata to your checkpointing table by running EMRFS commands.


25 Code snippets:


244 MB

26 Tests:

  – Autoscaling works. AbstractDynamoDBRecordWriter will do describe table every 5 minutes and gets updated Provisioned capacity and will do New writes per second based on updated capacity.

– Will it work for multiple mappers ?

27 Todo