EMR Dynamodb Connector : Import data to DynamoDB from S3 : FAQ’s

Written by mannem on . Posted in AWS BIG DATA, Data Pipelines, Dynamo DB, EMR || Elastic Map Reduce


1 Q: How much time does my Import job take?

Certainly, longer than Export job. Here’s how we calculate the Import job duration:

While importing into a table, Dynamodb Storage-Handler library relies on BatchWriteItem operations. A single BatchWriteItem item operation can write upto 16MB which can comprise as many as 25 items(DynamoDB service limit). Each item write is separate from dynamodb metering perspective and hence atleast cost one write IOPS. This means imports require significantly more IOPS and time to complete compared to exports.

Average_item_size = table size/ item count  ->You get this information from original table, not backup.

If Average_item_size < 1kilobytes then IOPS_per_Item = 1

if Average_item_size > 1kilobytes then IOPS_per_Item = round it to the nearest higher kilobyte.

For example,

if Average_item_size = 50 bytes, IOPS_per_Item = 1

if Average_item_size =  3520 bytes, IOPS_per_Item = 4

Time in hours = (item_count * IOPS_per_Item) /  (ProvisionedWriteThroughputAllocatedForJob * 60 * 60)



– If you see Provisined WCU’s(Write capacity units) for your DynamoDB table fully being utilized by this import job, then that’s the fastest you can go.  To speed it up, one parameter that is really important to increase is of course the WCU of the table.

2 Q: How many mappers or Reducers my EMR cluster can run concurrently without being queued up?

To determine the number of parallel mappers, you will need to check this documentation from EMR called Task Configuration where EMR had a predefined mapping set of configurations for every instance type which would determine the number of mappers/reducers.


For example: Let’s say you have 5 m1.xlarge core nodes. According to the default mapred-site.xml configuration values for that instance type from EMR docs, we have

mapreduce.map.memory.mb = 768

yarn.nodemanager.resource.memory-mb = 12288

yarn.scheduler.maximum-allocation-mb = 12288 (same as above)

You can simply divide the later with former setting to get the maximum number of mappers supported by one m1.xlarge node = (12288/768) = 16

So, for the 5 node cluster, it would a max of 16*5 = 80 mappers that can run in parallel (considering a map only job). The same is the case with max parallel Reducers (30) and Applications Masters. You can do similar math for a combination of mappers and reducers and AM’s.

So, If you want to run more concurrent mappers without them being queued up by YARN,

– you can either re-size the cluster and add more nodes or

– If your application doesnt really need default memory set by EMR for mappers, reduce the mapreduce.map.memory.mb(and its heap mapreduce.map.java.opts kaing it ~ 80% of memory.mb) on every node and restart NM.

When spinning up a new cluster, you can change above memory settings with  Configurations API.


3 Q: My cluster can launch 14 mappers in parallel. why does job use only one/few mapper?

Based on,





The number of mappers decided for the job = minimum of these parameters:

1. #splits generated based on files/manifest on s3 (or HDFS ?).

calculation in ImportInputFormat.java: simply generate one split per S3 file and does not group S3 files unless #files > MAX_NUM_SPLITS(100000).

(using emr-dynamodb-tools)

In HIVE, if the data on S3 is NOT in the format of <Text, DynamoDBItemWritable> ,   the splits are calculated differently using CombineHiveInputFormat where multiple(small) input files on s3 are combined to make a split with a max size of  244 MB.

mapreduce.input.fileinputformat.split.maxsize    256000000       file:/etc/hive/conf.dist/hive-site.xml

hive.input.format          org.apache.hadoop.hive.ql.io.CombineHiveInputFormat org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2e8ab815  programatically

mapred.input.format.class         org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

input.FileInputFormat (FileInputFormat.java:listStatus(283))

io.CombineHiveInputFormat (CombineHiveInputFormat.java:getCombineSplits(451))

2. number of mappers that cluster can launch. mapreduce.job.maps  <- from cluster configuration EMR calculates this.

defaults per node ->  http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html

3. number of mappers that cluster can launch reserving space for YARN AM’s (as determined from TaskCalculator.java)

Examples :

– If I have 1 70MB data file on s3, it will only launch one mapper.

– If I have 2 250MB file, it will do 4 mappers. In this case, If cluster only has current capacity for 1 mapper container, then it will only run one mapper.

– If I have 1 70 MB file on s3, if my cluster do not have any capacity for a mapper after reserving space for AM, then the job will still request one mapper but it will be placed on queue until there’s capacity for one mapper.

Each mapper task will share the (Write capacity configured on DDB *  dynamodb.throughput.write.percent ) equally and using AbstractDynamoDBRecordWriter it will do BatchWrites trying to rate limit to whatever write capacity that task is assigned with.

We can control the max mappers used for an import job number using


And reduce tasks (will there be any? – hive may generate a plan based on Hive Query) using normal mapreduce parameter


4 Q: why are my Provisioned WCU’s of table are not being consumed in my import job?

Always check if dynamodb.throughput.write.percent = 1 on jobconf. The rate limiter only tries to use % WCU’s based on this parameter. If this is low, the import job will use less WCU’s.

Possible bottlenecks


1. Memory :

OutOfMemoryError‘s like

java.lang.OutOfMemoryError: GC overhead limit exceeded on Mappers tasks

Let’s assume that we want to import a large DDB table with millions of items. We provision table with 1000 WCU’s to make import faster. But we use an EMR cluster has just one core/task node which can support one mapper. we already set dynamodb.throughput.write.percent =1 to make it faster.

But, job calculates to use a single mapper task based on above calculations. This mapper is allowed to use all 1000 WCU’s.  A single mapper task is a JVM and has heap space allocated for it.  A mapper task with 400MB can do like 200 WCU’s. Having less memory for a mapper task to accomplish high WCU’s might cause OOM’s and mapper task can get killed and retried. The retries also use same heap space and can get killed eventually failing the entire import job. Out of memory errors often happens along with high CPU usage as well.

So, in these scenarios use a Mapper with high memory and better CPU using

mapreduce.map.java.opts , mapreduce.map.memory.mb.

Default memory for mapper on EMR for a particular instance type is here:



2. Network IO:

In above scnerio, If the core/task node on which mappers task runs , happen to be an ec2 instance type with ‘Low Network Performance’ , then it cannot really acheive the the 1000 WCU to DynamoDB  that it want to do. If you see n/w bandwidth from ec2 metrics ceiling and never increased from that ceiling , then see the following list and use a high n/w performance instance type.



Suggested Parameters to use:

mapreduce.map.speculative = false (will make sure there’s isnt 2 mapper tasks writing data to DDB at same time which would cause 4xx’s and could fail the task and the job.)


5 Q: Can I use Autoscaling on DynamoDB to improve performance of import rather than manually increasing capacity before every import job?

Yes. AbstractDynamoDBRecordWriter will DescribeTable every 5 minutes and gets updated Provisioned Write capacity and will do New writes per second based on updated capacity.

ToDo: Does this work for multiple mappers ?

6 Q: Why do i see Number of reduce tasks is set to 0 since there’s no reduce operator on import job? Where is Identity reducer?

This is set in Hive as hive doesn’t do manifest because its doing INSERT OVERWRITE. It writes everything as new with mapper writing directly to DDB.

7 Q: Checking RetriedReadExceptions and RetriedWriteExceptions ?

The Import jobs are designed to retry exceptions. list of counters printed at the end of each Map Reduce job. Check counters on RM UI or job history file(.jhist) on your EMR cluster.

8 Q: How to check progress of the import job while its running?

From the current library functionality, looks like there’s no Hadoop job counter that specifies items written by the import job. So, we will need to check the live container logs on the RM UI to ‘Total items written per mapper task.

INFO [main] org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter: New writes per second: 88

2018-02-07 19:06:46,828 INFO [main] org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter: Total items written: 101041

 Total items written by Import job is roughly = #mappers * (Total items written)

To get exact count, you can add up add up from all mapper’s container logs.

RM UI : https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html

– As long as the job is running, the container logs also can be found on local disks of core/task nodes at /var/log/hadoop-yarn/containers/

9  Q: How to verify items written after import job ended?

Check HIVE counters on Job counters on RM UI or on .jhist file.

 RECORDS_OUT_1_default.ddb2  978,316

 RECORDS_IN  978,316

We can use RM UI usually located at path like




10  Q: Does import job uses LOCAL DISK of EMR nodes?

 Usually no. But, mappers may spill to disk -the intermediate data based on mapreduce.map.sort.spill.percent and memory utilized. but since mapper is writing directly to DynamoDB and by default there’s no reducers involved on HIVE , spills are very less.

Check ‘FILE: Number of bytes written’ Hadoop Job counter for the any files written to disk by the job.

11 Q: If my jobs fails in middle, will DDB items written so far be gone?

No. If the jobs fails for some reason, the items written so far by individual mapper tasks cannot be deleted by the import job.  You will need to delete items manually or re-create the table for import.

Note that when running import job, If an item with the same key exists in the target DynamoDB table, it will be overwritten. If no item with the key exists in the target DynamoDB table, the item is inserted.

12 Q: How to enable WIRE OR DEBUG logging on EMR to check calls to S3 and DDB?

Use the following on appropriate log4j configurations of Hadoop / hive / spark etc on Master node before starting the job. Some configuration paths on EMR master are





log4j.rootCategory=DEBUG, stdout



log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2} – %m%n




Then check container logs

– on RM UI or

– on Local disks of core’s or (/var/log/hadoop-yarn/containers)

– on S3 log bucket configured for the EMR cluster.

13 Q: Why do I see a TEZ container although I do not use TEZ


14 Q: Can you run Import/Export tools on stand-alone EMR cluster without Data Pipelines?

We don’t provide emr-dynamodb-tools module on EMR clusters, because this module is not supported. The same reason there is no documentation for it.


This module was developed by DynamoDb team and uses custom format for import and export operations.

i.e ExportOutputFormat or  ExportManifestOutputFormat to write files to S3


ImportInputFormat for reading that same exported data(or manifests of data) from S3.

15 Q: What are the costs Associated with Import job?

 – DynamoDb table Provisioned WCU’s


– EMR cluster costs(EMR + Ec2 + EBS if configured for EMR)


– Data Pipeline (if the template is used to run import job)


– S3 bucket costs : https://aws.amazon.com/govcloud-us/pricing/s3/

Use this tool to estimate your monthly bill


16 Q. Which regions is import/export available in? We wish to use import export in NA/EU/FE/CN.

(When using Data Pipeline Template of “IMport DynamoDB backup data from s3” )

Note that Data Pipeline’s service is only available in some regions. But a Data Pipeline created in (let’s say us-east-1) can basically run an EMR cluster resource  in any other region  and that cluster can do the  import job from any region’s DynamoDB table to any regions S3 bucket.


– EmrCluster has a region field:


– Data Pipeline Template has ‘myDDBRegion’ paramter which will be passed to EmrActivity used to set the DynamoDB region for the Import job.

– EMR (with EMRFS) should be able to access S3 buckets in any region.

When doing cross region imports, its better to choose EMR cluster close to either DynamoDb’s region or S3 region. B/w DDb and S3 , I think EMR should be closer to service which encounters large latency for data transfer that could impact the performance.

I speculate, EMR should be close to S3. But this is something need to be tested.

17 Q: Can you Import Data from S3 bucket of different Account’s DynamoDB table?

Let’s say DynamoDb Table is in Account A.

The S3 contents need to be imported is on Account B.

1. The best way is to copy that data on Account B to S3 bucket on Account A and run the import job on Account A.

Note: The Data by default will use a manifest file which has Full paths of objects including bucket-names. So, you’ll probably need to edit the bucket names to Account A’s bucket name before running the import job.


2. If your EMR cluster on Account A (Assuming EMR_Ec2_Default role) has necessary permissions on objects of bucket in Account B , then the import job run on Account A EMR cluster might NOT work.


18 Q: Does AWS Data Pipeline spin up nodes based on any particular calculation for import job?

Yes. EDP calculates the nodes based on the number of mappers the job needs. Datapipeline has a  setting for the DynamoDB import/export function, resizeClusterBeforeRunning, that will override your settings on CORE nodes on EmrResource object. It attempts to choose the best options based on an algorithm but this is still a work in progress..


For example , A job might need 5 mappers which it can calculate based on IOPS. Those 5 mappers can work up to 6GB heap and 100,000 items. So, EDP configures each mapper to be 6 GB memory and spins ups nodes with such config based on instance type.

This calculation is done if resize operation on EMR resource object is set to true,.

19 Q. Can I add a Data Pipeline to transform the data between the import and export pipelines?

Data pipelines Import/export template ultimately runs Hive import/export tool on EMR cluster. This tool is built with open source library and packaged JAR’s are included in EMR clusters by Data Pipelines service.



These pre-included tools DOES NOT allow you to transform the data.

For import, they just expect the data on S3 in DynamoDB Input format which is like new line delimited JSON (created with previous Export from similar tool). And it puts that data to S3 as is.

To transform the data, you’ll need to tweak the Pipeline definition so that you run your own HIVE queries on EMR.


Using Hive queries, you can make use of pre-included “org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler” to read / query and transform / write data directly from/to DynamoDB and S3.



An example Pipeline which runs Hive queries to import data to DDB.



Any transformations that you want can be achieved with HIVE DML’s on/before “INSERT OVERWRITE” which does the writing to DDB table.


For export jobs, you can use Hive Queries which support predicate push down so that a HIVE SQL-like SELECT QUERY with a WHERE clause is translated to a equivalent QUERY with a  KeyConditionExpression on  DynamODB QUERY API instead of usual full DynamoDB SCAN on the table.


This means instead of FULL SCAN’s which fetches all your items from DynamoDb table(consuming large RCU’s) , a SELECT Query (with proper WHERE clause and comparision operators on Primary or/and Range Key )  can just translate that condition to  to a  KeyConditionExpression on the DynamoDB query.


A Scan Filter is also being used

– A DynamodDB FilterExpression is never Applied but only KeyConditionExpression is applied in such QUERY call.

– It seems that neither global secondary indexes or local indexes are supported, however scenarios involving a query on a single primary key are recognized pretty well.

– This makes it practical to use a primary key as a method of partitioning your data to avoid EMR queries taking longer over time as the table grows.

– DynamoDB does not support filters on columns of types set

The QUERY for some common scenarios are well explained here:


20 Q. Can we use the import/export in the above use case considering the data in Table B is continually changing? Can we modify the import task to include conditional writes?

I do not think the open source library can handle live data changes. Its designed to work on stable source and destinations.

I don’t think you can modify import task to do conditional writes using the tool. The tool just implements BatchWrites from chunks of fixed data on S3 to DDB table.

If an item with the same key exists in the target DynamoDB table, it will be overwritten(Replaced). If no item with the key exists in the target DynamoDB table, the item is inserted. With batch write you can’t put conditions on individual items thus you can’t prevent it from updating lets say a latest Item.


For live changes, you’ll need to use a different library or the DynamoDB features.

You can create tables that are automatically replicated across two or more AWS Regions, with full support for multi-master writes. This gives you the ability to build fast, massively scaled applications for a global user base without having to manage the replication process. For more information, see Global Tables.

Use DynamoDB Cross-Region Replication  https://github.com/awslabs/dynamodb-cross-region-library

21 Q:  Are there any TPS restrictions in the use of import/export?

TPS is basically limited by

– provisioned Read / Write capacity of DynamoDb

–  Read / Write ratio parameter that you set on the import / export tools,.

– Some other parameters like table size and Avg. item size can impact the performance.

– of course, the import/export uses EMR cluster where it uses Tez engine or MR engine using YARN framework. So, typical bottlenecks on ec2 instance types like memory / IO / CPU can be present and the speed depends on the concurrency of containers that your cluster can support.

22 Q:  Do we need to code the EMR job for the import/export ourselves?


You can always alter the open source implementation, build ,. package the JAR onto EMR to implement custom logics to handle your needs.

– Data Pipeline downloads relatively old import export tools( emr-dynamodb-tools written for hive 2.1.0) to EMR, which based on its AMI version can contain old Connector libraries(emr-dynamodb-hadoop and emr-dynamodb-hive) packaged.

The latest Library could have several improvements like bug fixes , Features ,  latest AWS SDK’s etc to handle large data sets effectively.



Possible customizations that can be done to awslabs/emr-dynamodb-connector library for large imports in terms of TB’s

– infinite retries on ProvisionedThroughputExceededException’s and Internal Server Error’s.


– max burst capabilities.

– log to S3 and continuation on incompatible items,

– basic CW progress monitoring to report totalItemsWritten , totalIOPSConsumed written by AbstractDynamoDBRecordWriter.

– capability to store invalid items in an s3 bucket (as staging). So, they can be later altered and retried.

23 Q: How to Import CSV or XML data on S3 to DynamoDB?

A Hive script is needed for such import job. We have some examples here for Data Pipelines.



24 Q: Do I need to use EMRFS consistent view on EMR for Import job ?

 Consistent view on EMRFS is designed to help you with eventual consistency problem when Reading from S3.  If an Export job just completed and wrote Files to S3 and an Import job is immediately(or within minutes delay) tries to read those files, the import job can hit FileNotFound error and might fail the job.

So, If an Import job  need to read several files which are just written,  its a good idea to have this enabled.  If the Export job did not use the EMRFS consistent view to write to S3 , then the corresponding metadata might not be present on DynamoDB table – that EMR uses for checkpointing.  You can always SYNC the metadata to your checkpointing table by running EMRFS commands.


25 Code snippets:


244 MB

26 Tests:

  – Autoscaling works. AbstractDynamoDBRecordWriter will do describe table every 5 minutes and gets updated Provisioned capacity and will do New writes per second based on updated capacity.

– Will it work for multiple mappers ?

27 Todo

Search DynamoDB tables using Elasticsearch/Kibana via Logstash plugin

Written by mannem on . Posted in Dynamo DB, Elasticsearch

The Logstash plugin for Amazon DynamoDB gives you a nearly real-time view of the data in your DynamoDB table. The Logstash plugin for DynamoDB uses DynamoDB Streams to parse and output data as it is added to a DynamoDB table. After you install and activate the Logstash plugin for DynamoDB, it scans the data in the specified table, and then it starts consuming your updates using Streams and then outputs them to Elasticsearch, or a Logstash output of your choice.

Logstash is a data pipeline service that processes data, parses data, and then outputs it to a selected location in a selected format. Elasticsearch is a distributed, full-text search server. For more information about Logstash and Elasticsearch, go to https://www.elastic.co/products/elasticsearch.

Amazon Elasticsearch Service is a managed service that makes it easy to deploy, operate, and scale Elasticsearch in the AWS Cloud. aws.amazon.com/elasticsearch-service/

This article includes an installation guide that is tested on EC2 instance where all the per-requsites are installed and Logstash is configured so that it connects to Amazon ElasticSearch using the input/Output plugins to start indexing records from DynamoDB. Click here to get all the instructions :

Logstash configuration:

After running a similar command on the shell, Logstash should successfully start and begin indexing the records from your DynamoDB table.

Throughput considerations:





Similar plugins:


Using DynamoDB as session provider with AWS SDK V3

Written by mannem on . Posted in Dynamo DB

The DynamoDB Session Handler is a custom session handler for PHP that allows developers to use Amazon DynamoDB as a session store. Using DynamoDB for session storage alleviates issues that occur with session handling in a distributed web application by moving sessions off of the local file system and into a shared location. DynamoDB is fast, scalable, easy to setup, and handles replication of your data automatically.

Setting up:

1. Make sure you have PHP >= 5.5.0
2. install AWS PHP SDK(v3) from here http://docs.aws.amazon.com/aws-sdk-php/v3/guide/getting-started/installation.html
3. Configure PHP SDK to use any of the credentials options as mentioned here: http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html
4. See more details about DyanmoDB provided session handler here: https://docs.aws.amazon.com/aws-sdk-php/v3/guide/service/dynamodb-session-handler.html
5. A DynamoDB table to store session info, with ‘id’ (String) as Hash key.

End to End PHP code with debug turned on:

> php sessionProvider.php
successfully connected

Now, check the DynamoDB table if the session information is stored successfully.

Here is the example structure(DynamoDB JSON format):

References :


  • cloudformation












  • snap





    Storage Gatewa




    Cloud Front

  • r53

    Route 53











  • sns







    Cloud Search


    App Stream



  • opsworks



    Cloud Watch


    Elastic Beanstalk


    Code Deploy



  • dynamodb