EMR Dynamodb Connector : Import data to DynamoDB from S3 : FAQ’s

Written by mannem on . Posted in AWS BIG DATA, Data Pipelines, Dynamo DB, EMR || Elastic Map Reduce

 

1 Q: How much time does my Import job take?

Certainly, longer than Export job. Here’s how we calculate the Import job duration:

While importing into a table, Dynamodb Storage-Handler library relies on BatchWriteItem operations. A single BatchWriteItem item operation can write upto 16MB which can comprise as many as 25 items(DynamoDB service limit). Each item write is separate from dynamodb metering perspective and hence atleast cost one write IOPS. This means imports require significantly more IOPS and time to complete compared to exports.

Average_item_size = table size/ item count  ->You get this information from original table, not backup.

If Average_item_size < 1kilobytes then IOPS_per_Item = 1

if Average_item_size > 1kilobytes then IOPS_per_Item = round it to the nearest higher kilobyte.

For example,

if Average_item_size = 50 bytes, IOPS_per_Item = 1

if Average_item_size =  3520 bytes, IOPS_per_Item = 4

Time in hours = (item_count * IOPS_per_Item) /  (ProvisionedWriteThroughputAllocatedForJob * 60 * 60)

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ProvisionedThroughput.html

– If you see Provisined WCU’s(Write capacity units) for your DynamoDB table fully being utilized by this import job, then that’s the fastest you can go.  To speed it up, one parameter that is really important to increase is of course the WCU of the table.

2 Q: How many mappers or Reducers my EMR cluster can run concurrently without being queued up?

To determine the number of parallel mappers, you will need to check this documentation from EMR called Task Configuration where EMR had a predefined mapping set of configurations for every instance type which would determine the number of mappers/reducers.

http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html

For example: Let’s say you have 5 m1.xlarge core nodes. According to the default mapred-site.xml configuration values for that instance type from EMR docs, we have

mapreduce.map.memory.mb = 768

yarn.nodemanager.resource.memory-mb = 12288

yarn.scheduler.maximum-allocation-mb = 12288 (same as above)

You can simply divide the later with former setting to get the maximum number of mappers supported by one m1.xlarge node = (12288/768) = 16

So, for the 5 node cluster, it would a max of 16*5 = 80 mappers that can run in parallel (considering a map only job). The same is the case with max parallel Reducers (30) and Applications Masters. You can do similar math for a combination of mappers and reducers and AM’s.

So, If you want to run more concurrent mappers without them being queued up by YARN,

– you can either re-size the cluster and add more nodes or

– If your application doesnt really need default memory set by EMR for mappers, reduce the mapreduce.map.memory.mb(and its heap mapreduce.map.java.opts kaing it ~ 80% of memory.mb) on every node and restart NM.

When spinning up a new cluster, you can change above memory settings with  Configurations API.

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

3 Q: My cluster can launch 14 mappers in parallel. why does job use only one/few mapper?

Based on,

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/WriteIopsCalculator.java

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBUtil.java

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/util/TaskCalculator.java

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/importformat/ImportInputFormat.java

The number of mappers decided for the job = minimum of these parameters:

1. #splits generated based on files/manifest on s3 (or HDFS ?).

calculation in ImportInputFormat.java: simply generate one split per S3 file and does not group S3 files unless #files > MAX_NUM_SPLITS(100000).

(using emr-dynamodb-tools)

In HIVE, if the data on S3 is NOT in the format of <Text, DynamoDBItemWritable> ,   the splits are calculated differently using CombineHiveInputFormat where multiple(small) input files on s3 are combined to make a split with a max size of  244 MB.

mapreduce.input.fileinputformat.split.maxsize    256000000       file:/etc/hive/conf.dist/hive-site.xml

hive.input.format          org.apache.hadoop.hive.ql.io.CombineHiveInputFormat org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2e8ab815  programatically

mapred.input.format.class         org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

input.FileInputFormat (FileInputFormat.java:listStatus(283))

io.CombineHiveInputFormat (CombineHiveInputFormat.java:getCombineSplits(451))

2. number of mappers that cluster can launch. mapreduce.job.maps  <- from cluster configuration EMR calculates this.

defaults per node ->  http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html

3. number of mappers that cluster can launch reserving space for YARN AM’s (as determined from TaskCalculator.java)

Examples :

– If I have 1 70MB data file on s3, it will only launch one mapper.

– If I have 2 250MB file, it will do 4 mappers. In this case, If cluster only has current capacity for 1 mapper container, then it will only run one mapper.

– If I have 1 70 MB file on s3, if my cluster do not have any capacity for a mapper after reserving space for AM, then the job will still request one mapper but it will be placed on queue until there’s capacity for one mapper.

Each mapper task will share the (Write capacity configured on DDB *  dynamodb.throughput.write.percent ) equally and using AbstractDynamoDBRecordWriter it will do BatchWrites trying to rate limit to whatever write capacity that task is assigned with.

We can control the max mappers used for an import job number using

dynamodb.max.map.tasks

And reduce tasks (will there be any? – hive may generate a plan based on Hive Query) using normal mapreduce parameter

mapreduce.job.reduces

4 Q: why are my Provisioned WCU’s of table are not being consumed in my import job?

Always check if dynamodb.throughput.write.percent = 1 on jobconf. The rate limiter only tries to use % WCU’s based on this parameter. If this is low, the import job will use less WCU’s.

Possible bottlenecks

———————–

1. Memory :

OutOfMemoryError‘s like

java.lang.OutOfMemoryError: GC overhead limit exceeded on Mappers tasks

Let’s assume that we want to import a large DDB table with millions of items. We provision table with 1000 WCU’s to make import faster. But we use an EMR cluster has just one core/task node which can support one mapper. we already set dynamodb.throughput.write.percent =1 to make it faster.

But, job calculates to use a single mapper task based on above calculations. This mapper is allowed to use all 1000 WCU’s.  A single mapper task is a JVM and has heap space allocated for it.  A mapper task with 400MB can do like 200 WCU’s. Having less memory for a mapper task to accomplish high WCU’s might cause OOM’s and mapper task can get killed and retried. The retries also use same heap space and can get killed eventually failing the entire import job. Out of memory errors often happens along with high CPU usage as well.

So, in these scenarios use a Mapper with high memory and better CPU using

mapreduce.map.java.opts , mapreduce.map.memory.mb.

Default memory for mapper on EMR for a particular instance type is here:

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html

———————-

2. Network IO:

In above scnerio, If the core/task node on which mappers task runs , happen to be an ec2 instance type with ‘Low Network Performance’ , then it cannot really acheive the the 1000 WCU to DynamoDB  that it want to do. If you see n/w bandwidth from ec2 metrics ceiling and never increased from that ceiling , then see the following list and use a high n/w performance instance type.

https://aws.amazon.com/ec2/instance-types/

https://aws.amazon.com/ec2/previous-generation

Suggested Parameters to use:

mapreduce.map.speculative = false (will make sure there’s isnt 2 mapper tasks writing data to DDB at same time which would cause 4xx’s and could fail the task and the job.)

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java

5 Q: Can I use Autoscaling on DynamoDB to improve performance of import rather than manually increasing capacity before every import job?

Yes. AbstractDynamoDBRecordWriter will DescribeTable every 5 minutes and gets updated Provisioned Write capacity and will do New writes per second based on updated capacity.

ToDo: Does this work for multiple mappers ?

6 Q: Why do i see Number of reduce tasks is set to 0 since there’s no reduce operator on import job? Where is Identity reducer?

This is set in Hive as hive doesn’t do manifest because its doing INSERT OVERWRITE. It writes everything as new with mapper writing directly to DDB.

7 Q: Checking RetriedReadExceptions and RetriedWriteExceptions ?

The Import jobs are designed to retry exceptions. list of counters printed at the end of each Map Reduce job. Check counters on RM UI or job history file(.jhist) on your EMR cluster.

8 Q: How to check progress of the import job while its running?

From the current library functionality, looks like there’s no Hadoop job counter that specifies items written by the import job. So, we will need to check the live container logs on the RM UI to ‘Total items written per mapper task.

INFO [main] org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter: New writes per second: 88

2018-02-07 19:06:46,828 INFO [main] org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter: Total items written: 101041

 Total items written by Import job is roughly = #mappers * (Total items written)

To get exact count, you can add up add up from all mapper’s container logs.

RM UI : https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html

– As long as the job is running, the container logs also can be found on local disks of core/task nodes at /var/log/hadoop-yarn/containers/

9  Q: How to verify items written after import job ended?

Check HIVE counters on Job counters on RM UI or on .jhist file.

 RECORDS_OUT_1_default.ddb2  978,316

 RECORDS_IN  978,316

We can use RM UI usually located at path like

http://ip-172-31-21-216.us-west-2.compute.internal:19888/jobhistory/jobcounters/job_1517959834080_0006

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html

https://aws.amazon.com/blogs/big-data/securely-access-web-interfaces-on-amazon-emr-launched-in-a-private-subnet/

10  Q: Does import job uses LOCAL DISK of EMR nodes?

 Usually no. But, mappers may spill to disk -the intermediate data based on mapreduce.map.sort.spill.percent and memory utilized. but since mapper is writing directly to DynamoDB and by default there’s no reducers involved on HIVE , spills are very less.

Check ‘FILE: Number of bytes written’ Hadoop Job counter for the any files written to disk by the job.

11 Q: If my jobs fails in middle, will DDB items written so far be gone?

No. If the jobs fails for some reason, the items written so far by individual mapper tasks cannot be deleted by the import job.  You will need to delete items manually or re-create the table for import.

Note that when running import job, If an item with the same key exists in the target DynamoDB table, it will be overwritten. If no item with the key exists in the target DynamoDB table, the item is inserted.

12 Q: How to enable WIRE OR DEBUG logging on EMR to check calls to S3 and DDB?

Use the following on appropriate log4j configurations of Hadoop / hive / spark etc on Master node before starting the job. Some configuration paths on EMR master are

/etc/hadoop/conf/log4j.properties

/etc/hadoop/conf/container-log4j.properties

/etc/hive/conf/hive-log4j2.properties

/etc/spark/conf/..

log4j.rootCategory=DEBUG, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2} – %m%n

log4j.logger.org.apache.hadoop.hive=ALL

log4j.logger.org.apache.dynamodb.preader=ALL

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hive/src/test/resources/log4j.properties

Then check container logs

– on RM UI or

– on Local disks of core’s or (/var/log/hadoop-yarn/containers)

– on S3 log bucket configured for the EMR cluster.

13 Q: Why do I see a TEZ container although I do not use TEZ

??

14 Q: Can you run Import/Export tools on stand-alone EMR cluster without Data Pipelines?

We don’t provide emr-dynamodb-tools module on EMR clusters, because this module is not supported. The same reason there is no documentation for it.

https://github.com/awslabs/emr-dynamodb-connector/tree/master/emr-dynamodb-tools

This module was developed by DynamoDb team and uses custom format for import and export operations.

i.e ExportOutputFormat or  ExportManifestOutputFormat to write files to S3

&

ImportInputFormat for reading that same exported data(or manifests of data) from S3.

15 Q: What are the costs Associated with Import job?

 – DynamoDb table Provisioned WCU’s

https://aws.amazon.com/dynamodb/pricing/

– EMR cluster costs(EMR + Ec2 + EBS if configured for EMR)

https://aws.amazon.com/emr/pricing/

– Data Pipeline (if the template is used to run import job)

https://aws.amazon.com/datapipeline/pricing/

– S3 bucket costs : https://aws.amazon.com/govcloud-us/pricing/s3/

Use this tool to estimate your monthly bill

http://calculator.s3.amazonaws.com/calc5.html

16 Q. Which regions is import/export available in? We wish to use import export in NA/EU/FE/CN.

(When using Data Pipeline Template of “IMport DynamoDB backup data from s3” )

Note that Data Pipeline’s service is only available in some regions. But a Data Pipeline created in (let’s say us-east-1) can basically run an EMR cluster resource  in any other region  and that cluster can do the  import job from any region’s DynamoDB table to any regions S3 bucket.

https://aws.amazon.com/about-aws/whats-new/2014/02/20/aws-data-pipeline-now-available-in-four-new-regions/

– EmrCluster has a region field:

https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-manage-region.html

– Data Pipeline Template has ‘myDDBRegion’ paramter which will be passed to EmrActivity used to set the DynamoDB region for the Import job.

– EMR (with EMRFS) should be able to access S3 buckets in any region.

When doing cross region imports, its better to choose EMR cluster close to either DynamoDb’s region or S3 region. B/w DDb and S3 , I think EMR should be closer to service which encounters large latency for data transfer that could impact the performance.

I speculate, EMR should be close to S3. But this is something need to be tested.

17 Q: Can you Import Data from S3 bucket of different Account’s DynamoDB table?

Let’s say DynamoDb Table is in Account A.

The S3 contents need to be imported is on Account B.

1. The best way is to copy that data on Account B to S3 bucket on Account A and run the import job on Account A.

Note: The Data by default will use a manifest file which has Full paths of objects including bucket-names. So, you’ll probably need to edit the bucket names to Account A’s bucket name before running the import job.

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/exportformat/ExportManifestOutputFormat.java

2. If your EMR cluster on Account A (Assuming EMR_Ec2_Default role) has necessary permissions on objects of bucket in Account B , then the import job run on Account A EMR cluster might NOT work.

https://docs.aws.amazon.com/AmazonS3/latest/dev/example-walkthroughs-managing-access-example2.html

18 Q: Does AWS Data Pipeline spin up nodes based on any particular calculation for import job?

Yes. EDP calculates the nodes based on the number of mappers the job needs. Datapipeline has a  setting for the DynamoDB import/export function, resizeClusterBeforeRunning, that will override your settings on CORE nodes on EmrResource object. It attempts to choose the best options based on an algorithm but this is still a work in progress..

http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-emractivity.html

For example , A job might need 5 mappers which it can calculate based on IOPS. Those 5 mappers can work up to 6GB heap and 100,000 items. So, EDP configures each mapper to be 6 GB memory and spins ups nodes with such config based on instance type.

This calculation is done if resize operation on EMR resource object is set to true,.

19 Q. Can I add a Data Pipeline to transform the data between the import and export pipelines?

Data pipelines Import/export template ultimately runs Hive import/export tool on EMR cluster. This tool is built with open source library and packaged JAR’s are included in EMR clusters by Data Pipelines service.

https://github.com/awslabs/emr-dynamodb-connector#example-importexport-tool

https://github.com/awslabs/emr-dynamodb-connector/tree/master/emr-dynamodb-tools

These pre-included tools DOES NOT allow you to transform the data.

For import, they just expect the data on S3 in DynamoDB Input format which is like new line delimited JSON (created with previous Export from similar tool). And it puts that data to S3 as is.

To transform the data, you’ll need to tweak the Pipeline definition so that you run your own HIVE queries on EMR.

https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html

Using Hive queries, you can make use of pre-included “org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler” to read / query and transform / write data directly from/to DynamoDB and S3.

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/EMRforDynamoDB.html

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/EMR_Hive_Commands.html

An example Pipeline which runs Hive queries to import data to DDB.

https://github.com/aws-samples/data-pipeline-samples/tree/master/samples/DynamoDBImportCSV

https://github.com/aws-samples/data-pipeline-samples/blob/master/samples/DynamoDBImport/XMLtoDynamoDBImport.json

Any transformations that you want can be achieved with HIVE DML’s on/before “INSERT OVERWRITE” which does the writing to DDB table.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML

For export jobs, you can use Hive Queries which support predicate push down so that a HIVE SQL-like SELECT QUERY with a WHERE clause is translated to a equivalent QUERY with a  KeyConditionExpression on  DynamODB QUERY API instead of usual full DynamoDB SCAN on the table.

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdown.java

This means instead of FULL SCAN’s which fetches all your items from DynamoDb table(consuming large RCU’s) , a SELECT Query (with proper WHERE clause and comparision operators on Primary or/and Range Key )  can just translate that condition to  to a  KeyConditionExpression on the DynamoDB query.

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.KeyConditionExpressions

A Scan Filter is also being used

– A DynamodDB FilterExpression is never Applied but only KeyConditionExpression is applied in such QUERY call.

– It seems that neither global secondary indexes or local indexes are supported, however scenarios involving a query on a single primary key are recognized pretty well.

– This makes it practical to use a primary key as a method of partitioning your data to avoid EMR queries taking longer over time as the table grows.

– DynamoDB does not support filters on columns of types set

The QUERY for some common scenarios are well explained here:

http://hipsterdevblog.com/blog/2015/05/30/analysing-dynamodb-index-usage-in-hive-queries/

20 Q. Can we use the import/export in the above use case considering the data in Table B is continually changing? Can we modify the import task to include conditional writes?

I do not think the open source library can handle live data changes. Its designed to work on stable source and destinations.

I don’t think you can modify import task to do conditional writes using the tool. The tool just implements BatchWrites from chunks of fixed data on S3 to DDB table.

If an item with the same key exists in the target DynamoDB table, it will be overwritten(Replaced). If no item with the key exists in the target DynamoDB table, the item is inserted. With batch write you can’t put conditions on individual items thus you can’t prevent it from updating lets say a latest Item.

https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html

For live changes, you’ll need to use a different library or the DynamoDB features.

You can create tables that are automatically replicated across two or more AWS Regions, with full support for multi-master writes. This gives you the ability to build fast, massively scaled applications for a global user base without having to manage the replication process. For more information, see Global Tables.

Use DynamoDB Cross-Region Replication  https://github.com/awslabs/dynamodb-cross-region-library

21 Q:  Are there any TPS restrictions in the use of import/export?

TPS is basically limited by

– provisioned Read / Write capacity of DynamoDb

–  Read / Write ratio parameter that you set on the import / export tools,.

– Some other parameters like table size and Avg. item size can impact the performance.

– of course, the import/export uses EMR cluster where it uses Tez engine or MR engine using YARN framework. So, typical bottlenecks on ec2 instance types like memory / IO / CPU can be present and the speed depends on the concurrency of containers that your cluster can support.

22 Q:  Do we need to code the EMR job for the import/export ourselves?

https://github.com/awslabs/emr-dynamodb-connector/

You can always alter the open source implementation, build ,. package the JAR onto EMR to implement custom logics to handle your needs.

– Data Pipeline downloads relatively old import export tools( emr-dynamodb-tools written for hive 2.1.0) to EMR, which based on its AMI version can contain old Connector libraries(emr-dynamodb-hadoop and emr-dynamodb-hive) packaged.

The latest Library could have several improvements like bug fixes , Features ,  latest AWS SDK’s etc to handle large data sets effectively.

https://github.com/awslabs/emr-dynamodb-connector/issues?q=is%3Aissue+is%3Aclosed

https://github.com/awslabs/emr-dynamodb-connector/blob/master/pom.xml

Possible customizations that can be done to awslabs/emr-dynamodb-connector library for large imports in terms of TB’s

– infinite retries on ProvisionedThroughputExceededException’s and Internal Server Error’s.

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html

– max burst capabilities.

– log to S3 and continuation on incompatible items,

– basic CW progress monitoring to report totalItemsWritten , totalIOPSConsumed written by AbstractDynamoDBRecordWriter.

– capability to store invalid items in an s3 bucket (as staging). So, they can be later altered and retried.

23 Q: How to Import CSV or XML data on S3 to DynamoDB?

A Hive script is needed for such import job. We have some examples here for Data Pipelines.

https://github.com/aws-samples/data-pipeline-samples/tree/master/samples/DynamoDBImportCSV

https://github.com/aws-samples/data-pipeline-samples/blob/master/samples/DynamoDBImport/XMLtoDynamoDBImport.json

24 Q: Do I need to use EMRFS consistent view on EMR for Import job ?

 Consistent view on EMRFS is designed to help you with eventual consistency problem when Reading from S3.  If an Export job just completed and wrote Files to S3 and an Import job is immediately(or within minutes delay) tries to read those files, the import job can hit FileNotFound error and might fail the job.

So, If an Import job  need to read several files which are just written,  its a good idea to have this enabled.  If the Export job did not use the EMRFS consistent view to write to S3 , then the corresponding metadata might not be present on DynamoDB table – that EMR uses for checkpointing.  You can always SYNC the metadata to your checkpointing table by running EMRFS commands.

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emrfs-cli-reference.html

25 Code snippets:

 

244 MB

26 Tests:

  – Autoscaling works. AbstractDynamoDBRecordWriter will do describe table every 5 minutes and gets updated Provisioned capacity and will do New writes per second based on updated capacity.

– Will it work for multiple mappers ?

27 Todo

Querying DynamoDB export data using Athena (with LIST attribute)

Written by mannem on . Posted in Athena, AWS BIG DATA

Presto supports multiple Array and JSON functions using which you can write queries to get the required results. There is no single way to define a CREATE TABLE and later your QUERIES. You can have CREATE TABLE predefine the schema structure and later your queries can refer the elements you need in the schema. Or you can define your schema as a string and later use functions on your queries to parse this string to get the required results.

https://prestodb.io/docs/current/functions/array.html

https://prestodb.io/docs/current/functions/json.html

In addition, Athena has some examples to use most of the presto functions :

http://docs.aws.amazon.com/athena/latest/ug/querying-arrays.html

http://docs.aws.amazon.com/athena/latest/ug/rows-and-structs.html

http://docs.aws.amazon.com/athena/latest/ug/querying-JSON.html

Now, I was able to use the following CREATE TABLE syntax on the DynamoDB items having List of strings. I was able to flatten out the list of strings using some of the functions like CAST. Please note that this is not the only way to define table and query. My query might be over complexing what your are trying to get. There might be a simpler way as well. So, its really important that you understand the data types that you define and what each query returns and use the correct functions as each function takes a datatype and returns another datatype.

Using AWS Athena to query S3 Server Access Logs.

Written by mannem on . Posted in EMR || Elastic Map Reduce

S3 server access logs can grow very big over time and it is very hard for a single machine to Process/Query/Analyze all these logs. So, we can use distributed computing to query the logs quickly. Ideally we might think of Apache Hadoop/Hive/Spark/Presto etc to process these logs. But, the simplicity of AWS Athena service as a Serverless model will make it even easier. This article will guide you to use Athena to process your s3 access logs with example queries and has some partitioning considerations which can help you to query TB’s of logs just in few seconds.

The server access log files consist of a sequence of new-line delimited log records. Each log record represents one request and consists of space delimited fields. The following is an example log consisting of six log records.

This article is based on the Log format of access logs specified here https://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html. The field’s and its format may change and DDL QUERY should be changed accordingly.

Sample Data:

Data Format considerations:

On first look, the data format appears simple , which is a textfile with space filed delimiter and newline(/n) delimited. However, there is a catch in this data format, the columns like Time , RequestURI & User-Agent can have space in their data ( [06/Feb/2014:00:00:38 +0000] , "GET /gdelt/1980.csv.gz HTTP/1.1" & aws-cli/1.7.36 Python/2.7.9 Linux/3.14.44-32.39.amzn1.x86_64) which will mess up with the SerDe parsing .

For RequestURI & User-Agent field, the data is enclosed in quotes with spaces inside it. This can be parsed by any SerDe’s that support Quotes. Unfortunately, Athena does not support such SerDe’s like org.apache.hadoop.hive.serde2.OpenCSVSerde which does has quotes feature.

The org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe included by Athena will not support quotes yet.

A custom SerDe called com.amazon.emr.hive.serde.s3.S3LogDeserializer comes with all EMR AMI’s just for parsing these logs. A query like the following would create the table easily. However, this SerDe will not be supported by Athena.

Looking at these limitations, the org.apache.hadoop.hive.serde2.RegexSerDe only seems like the feasible option. Writing RegEx for your log structure was bit time consuming , but I was able to write it with the help of ELB log structure example and http://regexr.com/ .

I used the following Regex for the S3 server access logs :

([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) ([-0-9]*) ([-0-9]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)$

Also found at : http://regexr.com/3erct , where I removed some extra escape chars like \’s. In DDL , we need to add these escape characters like in following DDL.

([^ ]*) – matches a continuous string. Because the number fields like ‘time’ can return ‘-‘ character , I did not use ([-0-9]*) regex which is used for numbers. Because of same reason , I had to use STRING for all fields in DDL. We can use presto string functions to convert strings for appropriate conversions and comparison operators on DML

\\[(.*?)\\] or \\\[(.*?)\\\] – is for Time field – For a string like [06/Feb/2014:00:00:38 +0000], it will match and give a string like 05/Dec/2016:16:56:36 +0000 which is easier for querying between times.

\\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" – is for Request-URI field – removes the quotes and splits the "GET /mybucket?versioning HTTP/1.1" into three groups.

(\"[^\"]*\") – is for User-Agent – matches the whole string with quotes.

(-|[0-9]*) – is for HTTPstatus which is always a number like 200 or 404 . If you expect a ‘-‘ string, you can use ([^ ]*) instead,

CREATE TABLE DDL :

Where s3://s3server-accesslogs-bucket/logs/ is the bucket and prefix of you access logs.

Once you create the table, you can query this table for specific information :

Example Queries :

Any field can be set to “-” to indicate that the data was unknown or unavailable, or that the field was not applicable to this request.

Considerations:

1. From time to time, AWS might extend the access log record format by adding new fields to the end of each line.The DDL/RegEx must be written to handle trailing fields that it does not understand.
2. If the s3 directory is huge with a lot of log files , based on the query , you might encounter Athena’s “Query timeout: 30 minutes”. http://docs.aws.amazon.com/athena/latest/ug/service-limits.html. We have some optimizations here : https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/

Partitioning / Compression :

Overtime, the logs in your target prefix(s3://s3server-accesslogs-bucket/logs/) can grow big. This is especially true if you have this same bucket/prefix for other buckets access logs. If you enable logging on multiple source buckets that identify the same target bucket, the target bucket will have access logs for all those source buckets, but each log object will report access log records for a specific source bucket.

Amazon S3 uses the following object key format for the log objects it uploads in the target bucket: TargetPrefixYYYY-mm-DD-HH-MM-SS-UniqueString . So, it basically creates multiple files with a bunch of logs in the S3 bucket.

Querying can be slower if there are large number of small files in textformat.

To speed things up, we need at-least one of these options to consolidate these logs.
1. PARTITIONING – can work on subset of files instead of going through all files.
2. CONVERT TO a COLUMNAR STORAGE – like ORC or PARQUET For faster query performance
3. ENABLE COMPRESSION – Snappy or GZ
4. BLOOM FILTERS

Note that S3 access logs data are not partitioned and they are not organized with prefixes like /year/month/day or /year=2010/month=10/day=11/ etc using which Athena could make static or Dynamic partitions respectively. Also access logs are uncompressed and flat text files.

Unfortunately, we cannot enable all above options by just defining a DDL. We need to copy/move out text based data-set with the above options enabled. One option to use of AWS EMR to periodically structure and partition the S3 access logs so that you can query those logs easily with Athena. At this moment, it is not possible to use Athena itself to convert non-partitioned data into partitioned data.

On EMR, we could use either
HIVE : To partition/compress/covert , or
S3-DIST-CP utility : To compress/groupBy

Note that we can use EMR’s Presto / Spark etc to query the logs, but Athena being a serverless model is much easier to query without managing anything.

Here’s a example to convert Non partitioned s3 access logs to partitioned s3 access logs on EMR:

Continuing the Partitioning for incoming logs :

Note that above discussion talks about one time creation of all partitions. There’s two things to notice
1. While the above job is running , there could be new incoming logs which will not be partitioned onto destination.
2. Also after the job is completed, you keep getting new logs.

Which S3 files will be missed by the above job, really depends on hive-client’s split calculation on your source s3 bucket. when you run your ‘insert overwrite’ command, hive-client calculates splits initially by listing all objects inside the S3 prefix. This usually takes minutes and depends on number of s3 objects. After this split calculation is done, the actual jobs to do partitioning is done by mappers / reducers. So, any log files pushed to source prefix after split calculation is done will not be included on your job.

When you want to automatically add partitions to Athena table(Accesslogs_partitionedbyYearMonthDay) with new files coming to your S3 access logs with newer request days on your log bucket/prefix, you will need to have a ‘batch job on schedule’ or a ‘trigger mechanism’ to make sure the new data is also partitioned accordingly.

Batch job on schedule :

Note that our source s3 access logs are dumped into one single directory. Each file name is like 2016-12-05-17-17-06-5D51435906E10586. Since we can run our batch jobs after the above one-time-bulk-job, on the daily(or frequent) batches, we need to avoid the files that we already processed by previous one-time-job. For this , we can rely on the filename itself to get a list of newly generated files and then use an ETL to convert these files to partitioned data. We could also maintain a manifest file for the list of files already processed and the files which need to be processed etc.

Managing all that is left beyond this blog. There’s multiple ways to do ETL. One way is to run an AWS Data Pipeline with EMR, on schedule, and run the hive queries on the new data.

/

Trigger based solution :

If you want to run a partition job on every S3 PUT or bunch of PUTS , you can use AWS Lambda which can trigger a piece of code on every S3 object PUT. You can get Metadata about that put and Handle that on your Lambda code accordingly to fireoff an ETL etc to do the partitioning. AWS Lambda should come with AWS SDK and you can make all types of AWS API calls with proper permissions.
http://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html

Other Input Regex’s :

CloudFront
'^(?!#)([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+[^\(]+[\(]([^\;]+).*\%20([^\/]+)[\/](.*)$'
ELB access logs
'([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \\\”([^ ]*) ([^ ]*) (- |[^ ]*)\\\” ?(\”[^\”]*\”)? ?([A-Z0-9-]+)? ?([A-Za-z0-9.-]*)?$'
Apache web logs
'([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?'

You may also like:
Using Athena to Query CloudTrail logs.
Parsing AWS Cloudtrail logs with EMR Hive and Spark

EMR s3DistCp “–groupBy” Regex examples

Written by mannem on . Posted in EMR || Elastic Map Reduce

Sometimes the regex could be confusing on S3DistCp groupBy. I usually use some online regex tools like https://regex101.com/ to better work with string matching and grouping.

Here are some examples that I explored so far :

Example 1 :

Example 2 :

s3-dist-cp –src s3://support.elasticmapreduce/training/datasets/gdelt/ –dest hdfs:///gdeltWrongOutput1/ –groupBy ‘.*(\d{6}).*’

This command would not merge any files but copy all files with 6 numbers like 20130401.export.CSV.gz to destination.

Example 3 :

http://stackoverflow.com/questions/38374107/how-to-emr-s3distcp-groupby-properly

Example 4 :

If you want to concatenate matching files in the root directory and and all matching files inside a ‘sample_directory’ into a single file and compress that in gzip format. on http://regexr.com/3ftn9 will concatenate all matched file contents and creates one .gz file

Services in EMR – upstart

Written by mannem on . Posted in EMR || Elastic Map Reduce

Service management in EMR 4.x and 5.x are handled by upstart, and not the traditional SysVInit scripts.

You can view services by running the below command:

Services can be queried using the upstart commands, for example:

Services can be stop/start with the following commands

More upstart commands can be found here : http://upstart.ubuntu.com/cookbook/

Getting stack trace/Heap dump of a process in EMR

Written by mannem on . Posted in EMR || Elastic Map Reduce

In latest EMR AMI’s , Different Applications like Hive and Hadoop are installed with corresponding Unix USERS.

Example : Hive-server2 process in run with hive user.

To check the stack trace or heap dump of this process , you need to specify corresponding user who spawned this process.

You can use the following commands:

or

Taskrunner & workergroup threads

Written by mannem on . Posted in Data Pipelines

http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-taskrunner-config-options.html

Task Runner Configuration Options :

--tasks : The number of task poll threads to run simultaneously. Optional. The default is 2. however, these threads are just for polling.

Maximum number of Activities that can be run in parallel by Taskrunner :

If you have 10 Activities(which may be from different Data-pipelines) currently running on that single Taskrunner worker-group , new activities cannot be processed by this taskrunner and will be waiting for the previous activities to finish. i.e Taskrunner tied to a worker-group has a hard limit that it can execute a Max of 10 Activities in Parallel.

For example :

Suppose 10 activities are to be executed. By default poller pulls 2 Tasks(Activities) per second. In the next seconds it pulls another 2 activities. so, at the end of 5 seconds , there will be 10 activities submitted by poller to worker-group.

Using Kinesis agent to push data to multiple streams on multiple AWS accounts

Written by mannem on . Posted in Kinesis

A single Kinesis Agent cannot push data to multiple accounts. So, we need to run multiple independent Agents , one Agent for every account.

This post will discuss about Kinesis agent and guides you run multiple agents on Amazon Ec2 instance. It also have some sample scripts to build and run your own Agent from source.

When you install Kinesis agent as outlined in the following documentation , it runs as a service. kinesis

http://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html#agent-config-settings

The service executes a JAVA class “com.amazon.kinesis.streaming.agent.Agent” with a command like :

A help on this command or AgentOptions would provide the options that you can specify on the agent. So, you could have multiple agents , each running with different configurations. Currently(Oct 2016), each Agent can only have one authentication configuration set. So, each Agent process can only write to streams on just one account. So, with separate config files you can send data to different accounts. The authentication for each agent can be using a IAM user credentials or with an Assumed Role.

Example agent configurations and how to run them :


Building Kinesis Agent :

Instead of running agent as service , If you wish to build and use your own Agent , you can use https://github.com/awslabs/amazon-kinesis-agent .

A sample script to do so.

Parsing AWS Cloudtrail logs with EMR Hive / Presto / Spark

Written by mannem on . Posted in cloudtrail, EMR || Elastic Map Reduce

earemr

AWS CloudTrail is a web service that records AWS API calls for your account and delivers log files to you. The recorded information includes the identity of the API caller, the time of the API call, the source IP address of the API caller, the request parameters, and the response elements returned by the AWS service.

Cloudtrail usually sends the logs to S3 bucket segregated into Accounts(Regions(data(logFile.json.gz))).

Cloudtrail recommends to use aws-cloudtrail-processing-library , but it may be complex if you wish to Ad-hoc query a huge number of big log files faster. If there are many files , it may also be harder to download all logs to a Linux/Unix node , unzip it and do RegEx matching on all these files. So, we can use a distributed environment like Apache Hive on AWS EMR cluster to parse/organize all these files using very simple SQL like commands.

This article guides you to query your Cloudtrail logs using EMR Hive. This article also provides some example queries which may be useful in different scenarios. It assumes that you have a running EMR cluster which Hive application installed and explored a bit.

Directory and File structure of Cloudtrail logs :



The following Hive queries shows how to create a Hive table and reference the cloud trial s3 bucket. Cloudtrail data is processed by CloudTrailInputFormat implementation, which defines the input data split and key/value records. The CloudTrailLogDeserializer class defined in SerDe is called to format the data into a record that maps to column and data types in a table. Data (such as using an INSERT statement) to be written is translated by the Serializer class defined in SerDe to the format that the OUTPUTFORMAT class( HiveIgnoreKeyTextOutputFormat) can read.

These classes are part of /usr/share/aws/emr/goodies/lib/ EmrHadoopGoodies-x.jar & /usr/share/aws/emr/goodies/lib/ EmrHadoopGoodies-x.jar files and are automatically included in Hive classpath. Hive can also automatically de-compress the GZ files. All you need to do is to run a query similar to SQL commands. Some sample queries are included.

EMR uses an instance profile role on its nodes to auntenticate requests made to cloudtrail bucket. The default IAM policy on the role EMR_EC2_DefaultRole allows access to all S3 buckets. If your cluster do not have access , you may need to make sure the instance profile/Role has access to necessary s3 cloudtrail bucket.
Do not run any Insert overwrite on this hive table. If EMR has write access to the s3 bucket, an insert overwrite may delete all logs from this bucket. Please check hive language manual before attempting any commands
Cloudtrail json elements are extensive and are different for different kind of request. This SerDe (which is kind-of abandoned by EMR team)doesn’t include all possible rows in Ctrail. For example, if you try to query requestparameters , it would give FAILED: SemanticException [Error 10004]: Line 6:28 Invalid table alias or column reference ‘requestparameters’.
If your cloudtrail bucket has large number of files, Tez’s grouped splits or MR’s input splits calculation may take considerable time and memory. Make sure you allocate enough resources to the hive-client or tez-client.

TEZ: https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works



To query those logs with EMR Presto. We will need to copy necessary jar’s to presto’s hive plugin directory on all nodes of EMR cluster and RESTART presto-server on ALL nodes.

Master / Core :

To be safe : I also ensured the correct ownership on all these copied files –
ex:

RESTART:

Check for presto server :

Now, run the queries on the cloudtrail table already created with Hive. The query syntax and functions are different from Hive and you should use Presto’s functions. Some example of queries are provided in AWS Athena’s documentation(which uses Presto ) http://docs.aws.amazon.com/athena/latest/ug/cloudtrail-logs.html



Alternatively, you can use APACHE SPARK which has spark-shell to query cloutrail logs using the following library : https://github.com/awslabs/timely-security-analytics

Here’s some instructions on this design :



hive server 2 crashing with OutOfMemoryError (OOM) ?

Written by mannem on . Posted in EMR || Elastic Map Reduce

emr ear  

Often times HiveServer2 can be single point of failure. It can easy crash with OOM. If HiveServer2 restarts now-and-then , it must be due to OOM where it is likely set to be killed and re-spawned. We need to check the JVM options to see the behavior of this process on OOM , analyze thread dumps and check the underlying issue to mitigate such issues.

This post explains – How to identify OOM issue on HiveServer2 – How to analyze Thread/Heap dumps – How to Mitigate this issue. Some of the discussion is generic to EMR.

PART 1 : How to identify OOM issue on HiveServer2

Default JVM options of HS2 on EMR 4.7.2 AMI :

Some observations from above JVM:

1. Heap space for HiveServer2 JVM is 1000MB and this option is sourced from hive-env.sh.
2. Hive logging options like(-Dhive.log.dir=/var/log/hive -Dhive.log.file=hive-server2.log -Dhive.log.threshold=INFO -server) are sourced from /etc/init/hive-server2.conf
3. All other options starting from ‘-server’ are set by EMR.
4. If this JVM goes OutOfMemoryError , -XX:OnOutOfMemoryError=kill -9 %p option will kill HS2 on OOM.

If for some reason the HiveServer2 goes OOM , it simply gets killed and there is no way to find why.. hive-server2.log and hive-server2.out may not show OOM at all. you can check hive-server2.log to verify HiveServer2 getting restarted . So, we tweak JVM options for HS2 to include GC verbose and to Dump Heap On OutOfMemoryError.

With the following settings , if the HS2 process does go OOM , it would display the error like following on /var/log/hive/hive-server2.out. This message means that for some reason the garbage collector of this HS2 JVM is taking an excessive amount of time and recovers very little memory in each run.

An example script to enable GC verbose on HS2 JVM – can be like following on EMR 4.x.x and 5.x.x:



PART 2 : How to analyze Thread/Heap dumps

With above options if HS2 fails with OOM , it will log such error to /var/log/hive/hive-server2.out. It will also heap dump to a file like /var/log/hive/java_pid8548.hprof . There are multiple ways to open and analyze this file. I found “eclipse memory analyzer tool” helpful in identifying memory leaks and exploring thread dumps on java classes and sub-classes.

generatedata

generatedata



PART 3 : How to Mitigate this issue

1. Tweaking hive-site.xml by checking stack trace:

Best way to mitigate the OOM’s is to check the stack trace of the Hive-server2 process and see if there’s any leaks. It is also a good idea to check for top consumers and act accordingly. For each of those threads , see if any of the hive-site.xml settings would reduce its memory consumption. Some times you may not have control on any of the thread specifics , in which case , you may need to further increase the heap space.

For example : In the stack trace ,

– if you see multiple threads getting blocked etc, you can edit a setting like hive.server2.thrift.http.max.worker.threads

2. Mitigating OOM obviously involves increasing the Heap space for Hive-server2.

The heap space is defined in hive-env.sh using env variable . Its important to identify memory requirements of 3 hive services HiveServer2 , Hive metastore , Hive clients in advance , using load/concurrency tests before moving to PROD. If you observe any OOM’s , you may need to increase memory accordingly.

Identifying hive services and increasing memory :

if ‘export HADOOP_HEAPSIZE=2048’ string is present on hive-env.sh , it would be applied to all of the 3 hive services if restarted. So, you can use if-statements to provide different settings(HADOOP_HEAPSIZE and HADOOP_OPTS) for 3 of these hive services.

Example contents of hive-env.sh :

3. High availability and LoadBalancing of Hiveserver2 :

Increasing heap space for HS2 may help , However , As per Cloudera , after certain memory limit, there is possibility that you continue to hit OOM no mater how much you increase the memory for HS2. In that case, “Cloudera recommends splitting HiveServer2 into multiple instances and load balancing once you start allocating >12 GB to HiveServer2. The objective is to size to reduce impact of Java garbage collection on active processing by the service.”

You may checkout the limitations here : https://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_hive_install.html#concept_alp_4kl_3q

4. Watch out for any bugs in the LockManager :

https://hive.apache.org/javadocs/r2.0.0/api/org/apache/hadoop/hive/ql/lockmgr/

If you are using Hive’s Table Lock Manager Service by setting hive.support.concurrency to true , check if there’s issues with the Lockmanager which is responsible for maintaining locks for concurrent user support. Lockmanager can be ZooKeeperHiveLockManager based or can be EmbeddedLockManager which is shared lock manager for dedicated hive server , where – all locks are managed in memory. if the issue seems to be with these lock managers , you may need to edit their configs.

5. Check for generic version issues :

On Apache JIRA issues for hive-server2.

6. GC Tuning :

If the HS2 JVM is spending too much time on GC , you might consider some GC Tuning as discussed here : http://www.cubrid.org/blog/dev-platform/how-to-tune-java-garbage-collection/



  • cloudformation

    cloudformation

    pipeline

    Data-pipelines

    directoryservice

    directoryservicez

    cloudtrail

    cloudtrail

    config

    config

    trustedadvisor

    Trustedadvisor

  • snap

    Snapshot

    glacier

    Glacie

    storagegw

    Storage Gatewa

    s3

    S3

    cloudFront

    Cloud Front

  • r53

    Route 53

    lambda

    lambd

    directConnect

    DirectConnect

    vpc

    VPC

    kinesis

    Kinesis

    emr

    Emr

  • sns

    SNS

    transcoder

    Transcoder

    sqs

    SQS

    cloudsearch

    Cloud Search

    appstream

    App Stream

    ses

    SES

  • opsworks

    opsworks

    cloudwatch

    Cloud Watch

    beanstalk

    Elastic Beanstalk

    codedeploy

    Code Deploy

    IAM

    IAM

  • dynamodb

    dynamodb

    rds

    RDS

    elasticache

    ElastiCache

    redshift

    Redshift

    simpledb

    simpledb