Posts Tagged ‘emr’

Things to check in EMR Instance state logs

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce


EMR logs Instance state logs every 15 mins which helps to identify several important metrics related to your EMR nodes like Memory / Disk etc. In this article, I will show you how to recognize important metrics and how we can interpret  them when diagnosing an issue.


Finding Instance State logs of EMR cluster :

On EMR nodes, go to /emr/instance-state/

On S3, You will find in path like


Things to check :

CPU load Avg :

If Load Average is higher than CPU count of that Instance type, there could be communication issues b/w daemons and all sort of issues with HDFS and shuffles in jobs.

CPU load average is the average number of processes being or waiting executed over past 1, 5 and 15 minutes. So the number shown above means:

  • load average over the last 1 minute is 1.10
  • load average over the last 5 minute is 1.00
  • load average over the last 15 minute is 0.46

Lets say my Ec2 instance type is m5.large which has 2 vCPU’s according to

If my load avg. is larger than 2, then I should be concerned..

 TOP :

check if there’s any processes occupying a lot of CPU and memory.

 process list(PS).

Search for running processes like ‘HRegionServer’ to verify if a process is running. See previous instance state  log if there’s a  PID (process id) change for that process. If there is a PID change, most probably the process got killed with OOM between this time.


B = blocked process – shouldn’t be blocked.


to see OS issues like if OS is out of memory you will see OS randomly killing important processes.

“Free –m”

to check free memory. Do not overly rely on this as we only record free –m every 15 mins and its not a true representation of memory during the entire time.

“Df –h”

to check disk space.

EMR Config for Big cluster to create Many Paritions in Hive

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

EMR Configuration :





Create Cluster CLI Command :

In below command, search for <code>replace</code> and replace them with relevant parameters like your own security groups, ssh key , IAM Roles, EMR Log bucket etc.



Increase Spark Driver memory using PySpark session from EMR Notebooks

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce



Spark UI vs. Spark History Server UI , which one to use and why ?

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Is Job Running ?

1. If you have Spark Applications Running, then you should be using SPARK UI. This UI is usually hosted on Spark Driver
– In YARN cluster mode, the Driver is run on YARN Application Master run on random Core node )
– IN YARN Client  Mode, the Driver is run on Master node itself.
To access Spark UI, You should be going to  YARN ResourceManager UI First. Then navigate to corresponding Spark Application and use “Application Master” link to Access Spark UI. If you observe the link, its taking you  you to the application master’s web UI at port 20888. This is basically a proxy running on master  listening on 20888  which makes available the Spark UI(which runs on either Core node or Master node)

2. You can also access Spark UI by going directly to Driver Hostname and Portname where its hosted.
For example, when I run spark-submit in cluster mode, it spinned up application_1569345960040_0007. In my driver logs I see below messages
19/09/24 22:29:15 INFO Utils: Successfully started service ‘SparkUI’ on port 35395.
19/09/24 22:29:15 INFO SparkUI: Bound SparkUI to, and started at
Where is one of my core node.
So I can go to
This automatically routes me to Master node proxy server listening on port 20888
Please note that, these links are temporary and will only show the UI while the Spark Application is running.

Is Job Completed ?

But if you want to see UI even when Spark job is completed, you should use Spark HistoryServer UI directly at http://master-public-dns-name:18080/.
Spark History Server can also be used for Running Jobs using “Show Incomplete Applications” Button. Spark History Server does this by using Spark Event logs which is enabled on EMR by default.

Differences between Spark UI and Spark History UI

 But looks like Spark History Server has some  differences when compared to “Spark UI” (For Running Apps of course ). Some of em’ that I observed are :
– Spark UI has “Kill” Button so your can kill some Spark Stages while Spark History Server doesn’t.
– SPark UI has “SQL” tab which shows more information about spark-sql jobs while Spark History Server doesn’t.
– Spark UI can pull up live  Thread Dumps for Executors  while Spark History Server doesn’t.
– Spark UI can give most update to date info(like “Total Uptime”) on Tasks while there can be a bit lag in  Spark History Server UI.

Configuring an EC2 instance as DNS server using BIND that can only resolve forward lookups but not reverse lookups in an AWS VPC

Written by mannem on . Posted in EC2 Linux

Common issues of disk going full on EMR Cluster (or In general any Hadoop / Spark cluster)

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

A disk going full can make YARN on EMR UNHEALTHY. So, customer’s need to identify and proactively predict why each Application like Hadoop / Spark can occupy disk space and act accordingly. This Article focuses on some most common ways the EMR cluster can go full and recommends actions we could take for those specific scenarios.

Some common factors occupying disk space on EMR :

  • HDFS (/mnt/hdfs/  ) i.e may be the one that’s occupying most space.
  • YARN containers logs. (/mnt/var/log/hadoop-yarn/containers/)
  • Localized files during an Hadoop/spark job run using YARN framework. (yarn.nodemanager.local-dirs/filecache , ../usercache/filecache , ../usercache//appcache/<app-id>/), where yarn.nodemanager.local-dirs is usually /mnt/yarn/ on one disk setup. For multiple disks per Instance(not root), multiple disks will be used by comma separated value on yarn.nodemanager.local-dirs.
  • Spark Application history logs (hdfs:///var/log/spark/apps/)
  • It may also be a combination of all of the above.



If its the logs(/var/logs symlinked to  /mnt/var/logs/) that’s occupying more space in that list, we can use multiple mount points for yarn.nodemanager.log-dirs setting(Comma seperated). Currently, EMR only uses one mount point for storing YARN container  logs.

The container logs on local machines should be ideally deleted by components in this order.
1. By YARN Nodemanager after log aggregation. – (Logic Altered by EMR team)
2. By LogPusher after retention period.
3. IC’s DSM when its heuristics are satisfied.

In YARN , If log aggregation is turned on (with the yarn.log-aggregation-enable config), when the spark application is completed , container logs are copied to HDFS and after post-aggregation they are expected to be deleted from the local machine by NodeManager’s AppLogAggregatorImpl. However on EMR , we seem to keep it on local machines because we need those logs for logpusher to push them to S3.(logpusher cannot push logs from HDFS). So, EMR had a feature introduced in EMR Hadoop branch-2.7.3-amzn (not adopted in open source) by an internal commit.

With this commit, we basically are having an option to keep the files on local machines after log aggregation. Managed by “yarn.log-aggregation.enable-local-cleanup” property in yarn-site.xml on respective core/task nodes. This property is not public and can only be set on EMR distributions. In the latest EMR AMI’s , this option is set to ‘false’. This means the cleanup WILL NOT take place.

–For the logs to be deleted from local disks, we need to flip it to true with configurations API while launching the cluster. On live cluster, all core/task node’s yarn-site.xml should be updated and NM should be restarted. After the restart old container logs might still be present. — (Read below)

*** This options might not recommended because the Logpusher , will NOT be able to push those local container logs to customer’s(service’s) S3 if this option is set to true.
** and the Only source of container logs will be aggregated logs on HDFS which is not so persistent.

2. With logs non-cleanup in local machine because Logpusher needs them in local dir’s , we seem to be relying on same Logpusher to delete those local files after a certain retention period of 4 hours. More particularly on “/var/log/hadoop-yarn/containers” (/etc/logpusher/hadoop.config). LogPusher will only delete logs if they have not been touched in four hours

3. Instance contoller’s DiskSpaceManager is kind of a fail-safe to avoid disk fill up i.e If disk space goes beyond certain % , DSM will mark some files(including local container logs)for deletion . DSM does seem to have issues deleting the log files because of user/permissions issues. Ideally it need to list and delete logs from all users(yarn/ spark / hive ) and not just hadoop user’s logs.

Hadoop & Spark Streaming Jobs :

In a streaming(Hadoop or Spark using YARN) application, it is reasonable to expect that a log would be touched at least once every four hours for the entire lifetime of the streaming job, resulting in LogPusher never deleting the file. This can lead to disk space filling, which can lead to the customer wanting to spread logs across multiple mounts. Spreading across multiple mounts is not the best solution: we specifically put logs into one mount to leave space on the customer’s cluster for data

The correct solution here is to implement/configure log rotation for container logs. This way, if we rotate on an hourly basis, we:

  • keep the overall size of each log down
  • give logpusher a chance to upload and delete old logs
  • saving disk space for the customer and
  • preventing us from having to add unnecessary features to logpusher

Enabling log rotation for spark using /etc/spark/conf/ to rotate ${}/spark-%d{yyyy-MM-dd-HH-mm-ss}.log

Similarly log rotation can be done for Hadoop YARN logs using


If HDFS is occupying most space , then we might need to monitor HDFS CW metric and trigger a resize of auto-scale accordingly(Or manual resize).  After the resize the blocks will NOT be balanced. Only new data will go the node you just added. Old HDFS data blocks will not balance out automatically. you will need to re-balance out HDFS so that disk utilization on this node goes below 90% More details on HDFS re-balancing are explained in 
HDFS utilization > 90% doesn’t necessarily mean disk on a particular node will be > 90%. This really depends on HDFS replication and how blocks are spread around.

– Customer might need to check replication factor of HDFS. is it too large ?

– Is there a recent scale-down which lead to HDFS decommissioning where Blocks will be moved to available core nodes , thus filling up nodes ?



If /mnt/yarn/ (yarn.nodemanager.local-dirs) i.e  YARN localized files  is going full, it can happen at different stages of the Application.

1. /mnt/yarn/ (yarn.nodemanager.local-dirs)

On EMR , /mnt/yarn/ is configured on yarn-site.xml for with yarn.nodemanager.local-dirs . The list of directories used on this parameters is used –

–  During a MapReduce job, intermediate data and working files are written to temporary local files. Because this data includes the potentially very large output of map tasks, you need to ensure that the yarn.nodemanager.local-dirs property, which controls the location of local temporary storage for YARN containers, is configured to use disk partitions that are large enough.

–  During resource localization by YARN NM i.e NM downloads  resources from the supported source (such as HDFS, HTTP, and so on) to the NodeManager node’s local directory.

– After the job finishes, the Node Managers automatically clean up the localized files immediately by default.

Scenario 1 : /mnt/yarn/usercache/hadoop/appcache/ – occupying more space.

Localized files rarely fill up volumes. Its usually intermediate data from Mappers that fills this up.

Troubleshooting steps  :

1. Confirm if the existence of large intermediate output files. In this case , from one single application ,  large GB’s of intermediate data from mapper attempts are about to fill up disk space on one core node.

2. We can also confirm from mapper syslogs , that this directory is being used for intermediate data (mapreduce.cluster.local.dir)

3. Now,

You can refer to the following NodeManager log during resource localization:

This directory will be used by multiple YARN Applications and its containers during its lifecycle. First, we need to check if the application and its containers are  still running and currently occupying disk space. If they an running and corresponding appcache is making disk to go full, then your application needs that cache. NM doesn’t really delete any appcache that is currently being used by running containers. So, you will need to provision more space to handle your application’s cache. if multiple applications are running and filling up your appcache together , then you might need to limit the parallelism of your applications or provision bigger volumes.

If Applications are not running(containers ) and you continue to see disk being full with appcache, you might need to tune NM to speed the trigger of deletion service. Some NM parameters that you might need to configure(yarn-site.xml ) to change how NM decides to trigger the deletion service to remove the appcache..

yarn.nodemanager.localizer.cache.cleanup.interval-ms : Interval in between cache cleanups.  : Target size of localizer cache in MB, per local directory.

You can also try  running

(for running that command, you’ll need to make sure that the machine from where you are yarn-site.xml contains yarn.sharedcache.admin.address (the default is property defined. You might even try master IP instead of )

Another parameter to watch out for is yarn.nodemanager.delete.debug-delay-sec , this is Number of seconds after an application finishes before the nodemanager’s DeletionService will delete the application’s localized file directory and log directory. This is set to 0 by defaylt which means , it will not wait for deletion seervice. If you have large number of this, the appcache will not be deleted after the application finishes and will exists untill this time.

References :


Spark’s usercache & SPARK on YARN :



2.8G  ./mnt/yarn/usercache/hadoop/appcache/application_1474295547515_0187/blockmgr-42cdcd45-fe7d-4f0d-bf4b-5c819d4ef15e
3.5G  ./mnt/yarn/usercache/hadoop/appcache/application_1474295547515_0187/blockmgr-840ac0bf-b0dd-4573-8f74-aa7859d83832

/usercache/ , In usercache direcory suppose there are a lot of big folders like blockmgr-b5b55c6f-ef8a-4359-93e4-9935f2390367.

filling up  with blocks from the block manager, which could mean you’re persisting a bunch of RDDs to disk, or maybe have a huge shuffle. The first step would be to figure out which of those it may be and avoid the issue by caching in memory or designing to avoid huge shuffles. You can consider upping spark.shuffle.memoryFraction to use more memory for shuffling and spill less.

– In cluster mode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. In client mode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined in spark.local.dir. This is because the Spark driver does not run on the YARN cluster in client mode, only the Spark executors do.

– Access the application cache through yarn.nodemanager.local-dirs on the nodes on which containers are launched. This directory contains the launch script, JARs, and all environment variables used for launching each container.

spark.local.dir :     /tmp     Directory to use for “scratch” space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks.

NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager – which seem to be /mnt/ etc, that your were concerned about.




hdfs dfs -du -h /
15.7 M   /apps
0        /tmp
2.1 G    /user
199.3 G  /var

Within /var/log/spark/apps/ there is currently 15,326 files ranging in size of 100KB to 30 MB in size.

26.1 M   /var/log/spark/apps/application_1489085921151_9995_1
6.6 M    /var/log/spark/apps/application_1489085921151_9996_1
28.2 M   /var/log/spark/apps/application_1489085921151_9997_1
6.0 M    /var/log/spark/apps/application_1489085921151_9998_1
24.4 M   /var/log/spark/apps/application_1489085921151_9999_1

So why is this happening and how can I get these log files cleaned up once they have been saved to s3?

Those are spark history logs. Those retention settings are separate from YARN container log settings and can be configured to clean up at shorter intervals as defined here:

following spark-default configurations might help in cleaning up logs.

spark.history.fs.cleaner.enabled : true
spark.history.fs.cleaner.interval : 1d
spark.history.fs.cleaner.maxAge : 7d

Emr edit software settings: [classification”:”spark-defaults”,”properties”:{“spark.history.fs.cleaner.maxAge”:”7d”,”spark.history.fs.cleaner.interval”:”1d”,”spark.history.fs.cleaner.enabled”:”true”}}]

You can also disable history logs(Event logs)  if you don’t care for it; for large files it doesn’t work anyway.
For disabling, you can use  “–conf spark.eventLog.enabled=false”  on spark-submit

But EMR’S Apppusher might need this events logs to display onto EMR console the spark’s Application logs.


Some other factors to consider:

If there’s a NM restart oR RM restart during the localization , there might be some stale files on the usercache which might not be deleted by deletion service and those files might persist after job completion. So, You might need to manually delete them sometimes.


Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Contains different configurations and procedures to enable logging on different daemons on AWS EMR cluster.
[Please contribute to this article to add additional ways to enable logging]

HBASE on S3 :

This will enable calls made from EMRFS from HBASE.

Important to troubleshoot S3 consistency issues and failures for HBASE on S3 cluster.

Enabling DEBUG on Hive Metastore daemon (its Datastore) on EMR :


Logs at /var/log/hive/user/hive/hive.log


use_get_log_api=true in the beeswaxsection of the hue.ini configuration file.

Hadoop and MR :

Enable GC verbose on Hive Server 2 JVM:

WIRE OR DEBUG logging on EMR to check calls to S3 and DDB for DynamoDb connector library :

Paste the following on log4j configurations of Hadoop / hive / spark etc.


Debug on S3 Calls from EMR HIVE :

These metrics can be obtained from the hive.log when enabling debug logging in aws-java-sdk. To enable this logging, add the following line to '/etc/hive/conf/'. The Configuration API can be used as well.

Enable DEBUG logging for Http Connection pool:

(from spark) by adding the following to /etc/spark/conf/

*Tez overwrites the loglevel options we have passed. Please see the related items.*

Enabling Debug on Hadoop log to log calls by EMRFS :


You can use same logging config for other Application like spark/hbase using respective log4j config files as appropriate. You can also use EMR log4j configuration classification like hadoop-log4j or spark-log4j to set those config’s while starting EMR cluster.(see below for sample JSON for configuration API)

DEBUG on EMR Logpusher Logs :

Edit this file on Master / Slave’s manually and restart Logpusher.


(Might need to stop Service-nanny before stopping Logpusher, to properly stop/start Logpusher)

DEBUG on Spark classes :

Use the following EMR config to set DEBUG level for relevant class files.

DEBUG using spark shell:

Execute the following commands after invoking spark-shell to enable DEBUG logging on respective spark classes like Memstore. You can use the same if you want to reduce the amount of logging from INFO (which is default coming from in the spark conf ) to ERROR.

EMRFS CLI command like EMRFS SYNC :


Logs will be on the console out. We might need to redirect to a File or do both.

Enable Debug on Boto3 client :

Using AWS Athena to query S3 Server Access Logs.

Written by mannem on . Posted in EMR || Elastic Map Reduce

S3 server access logs can grow very big over time and it is very hard for a single machine to Process/Query/Analyze all these logs. So, we can use distributed computing to query the logs quickly. Ideally we might think of Apache Hadoop/Hive/Spark/Presto etc to process these logs. But, the simplicity of AWS Athena service as a Serverless model will make it even easier. This article will guide you to use Athena to process your s3 access logs with example queries and has some partitioning considerations which can help you to query TB’s of logs just in few seconds.

The server access log files consist of a sequence of new-line delimited log records. Each log record represents one request and consists of space delimited fields. The following is an example log consisting of six log records.

This article is based on the Log format of access logs specified here The field’s and its format may change and DDL QUERY should be changed accordingly.

Sample Data:

Data Format considerations:

On first look, the data format appears simple , which is a textfile with space filed delimiter and newline(/n) delimited. However, there is a catch in this data format, the columns like Time , RequestURI & User-Agent can have space in their data ( [06/Feb/2014:00:00:38 +0000] , "GET /gdelt/1980.csv.gz HTTP/1.1" & aws-cli/1.7.36 Python/2.7.9 Linux/3.14.44-32.39.amzn1.x86_64) which will mess up with the SerDe parsing .

For RequestURI & User-Agent field, the data is enclosed in quotes with spaces inside it. This can be parsed by any SerDe’s that support Quotes. Unfortunately, Athena does not support such SerDe’s like org.apache.hadoop.hive.serde2.OpenCSVSerde which does has quotes feature.

The org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe included by Athena will not support quotes yet.

A custom SerDe called comes with all EMR AMI’s just for parsing these logs. A query like the following would create the table easily. However, this SerDe will not be supported by Athena.

Looking at these limitations, the org.apache.hadoop.hive.serde2.RegexSerDe only seems like the feasible option. Writing RegEx for your log structure was bit time consuming , but I was able to write it with the help of ELB log structure example and .

I used the following Regex for the S3 server access logs :

([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) ([-0-9]*) ([-0-9]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)$

Also found at : , where I removed some extra escape chars like \’s. In DDL , we need to add these escape characters like in following DDL.

([^ ]*) – matches a continuous string. Because the number fields like ‘time’ can return ‘-‘ character , I did not use ([-0-9]*) regex which is used for numbers. Because of same reason , I had to use STRING for all fields in DDL. We can use presto string functions to convert strings for appropriate conversions and comparison operators on DML

\\[(.*?)\\] or \\\[(.*?)\\\] – is for Time field – For a string like [06/Feb/2014:00:00:38 +0000], it will match and give a string like 05/Dec/2016:16:56:36 +0000 which is easier for querying between times.

\\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" – is for Request-URI field – removes the quotes and splits the "GET /mybucket?versioning HTTP/1.1" into three groups.

(\"[^\"]*\") – is for User-Agent – matches the whole string with quotes.

(-|[0-9]*) – is for HTTPstatus which is always a number like 200 or 404 . If you expect a ‘-‘ string, you can use ([^ ]*) instead,


Where s3://s3server-accesslogs-bucket/logs/ is the bucket and prefix of you access logs.

Once you create the table, you can query this table for specific information :

Example Queries :

Any field can be set to “-” to indicate that the data was unknown or unavailable, or that the field was not applicable to this request.


1. From time to time, AWS might extend the access log record format by adding new fields to the end of each line.The DDL/RegEx must be written to handle trailing fields that it does not understand.
2. If the s3 directory is huge with a lot of log files , based on the query , you might encounter Athena’s “Query timeout: 30 minutes”. We have some optimizations here :

Partitioning / Compression :

Overtime, the logs in your target prefix(s3://s3server-accesslogs-bucket/logs/) can grow big. This is especially true if you have this same bucket/prefix for other buckets access logs. If you enable logging on multiple source buckets that identify the same target bucket, the target bucket will have access logs for all those source buckets, but each log object will report access log records for a specific source bucket.

Amazon S3 uses the following object key format for the log objects it uploads in the target bucket: TargetPrefixYYYY-mm-DD-HH-MM-SS-UniqueString . So, it basically creates multiple files with a bunch of logs in the S3 bucket.

Querying can be slower if there are large number of small files in textformat.

To speed things up, we need at-least one of these options to consolidate these logs.
1. PARTITIONING – can work on subset of files instead of going through all files.
2. CONVERT TO a COLUMNAR STORAGE – like ORC or PARQUET For faster query performance

Note that S3 access logs data are not partitioned and they are not organized with prefixes like /year/month/day or /year=2010/month=10/day=11/ etc using which Athena could make static or Dynamic partitions respectively. Also access logs are uncompressed and flat text files.

Unfortunately, we cannot enable all above options by just defining a DDL. We need to copy/move out text based data-set with the above options enabled. One option to use of AWS EMR to periodically structure and partition the S3 access logs so that you can query those logs easily with Athena. At this moment, it is not possible to use Athena itself to convert non-partitioned data into partitioned data.

On EMR, we could use either
HIVE : To partition/compress/covert , or
S3-DIST-CP utility : To compress/groupBy

Note that we can use EMR’s Presto / Spark etc to query the logs, but Athena being a serverless model is much easier to query without managing anything.

Here’s a example to convert Non partitioned s3 access logs to partitioned s3 access logs on EMR:

Continuing the Partitioning for incoming logs :

Note that above discussion talks about one time creation of all partitions. There’s two things to notice
1. While the above job is running , there could be new incoming logs which will not be partitioned onto destination.
2. Also after the job is completed, you keep getting new logs.

Which S3 files will be missed by the above job, really depends on hive-client’s split calculation on your source s3 bucket. when you run your ‘insert overwrite’ command, hive-client calculates splits initially by listing all objects inside the S3 prefix. This usually takes minutes and depends on number of s3 objects. After this split calculation is done, the actual jobs to do partitioning is done by mappers / reducers. So, any log files pushed to source prefix after split calculation is done will not be included on your job.

When you want to automatically add partitions to Athena table(Accesslogs_partitionedbyYearMonthDay) with new files coming to your S3 access logs with newer request days on your log bucket/prefix, you will need to have a ‘batch job on schedule’ or a ‘trigger mechanism’ to make sure the new data is also partitioned accordingly.

Batch job on schedule :

Note that our source s3 access logs are dumped into one single directory. Each file name is like 2016-12-05-17-17-06-5D51435906E10586. Since we can run our batch jobs after the above one-time-bulk-job, on the daily(or frequent) batches, we need to avoid the files that we already processed by previous one-time-job. For this , we can rely on the filename itself to get a list of newly generated files and then use an ETL to convert these files to partitioned data. We could also maintain a manifest file for the list of files already processed and the files which need to be processed etc.

Managing all that is left beyond this blog. There’s multiple ways to do ETL. One way is to run an AWS Data Pipeline with EMR, on schedule, and run the hive queries on the new data.


Trigger based solution :

If you want to run a partition job on every S3 PUT or bunch of PUTS , you can use AWS Lambda which can trigger a piece of code on every S3 object PUT. You can get Metadata about that put and Handle that on your Lambda code accordingly to fireoff an ETL etc to do the partitioning. AWS Lambda should come with AWS SDK and you can make all types of AWS API calls with proper permissions.

Other Input Regex’s :

'^(?!#)([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+[^\(]+[\(]([^\;]+).*\%20([^\/]+)[\/](.*)$'
ELB access logs
'([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \\\”([^ ]*) ([^ ]*) (- |[^ ]*)\\\” ?(\”[^\”]*\”)? ?([A-Z0-9-]+)? ?([A-Za-z0-9.-]*)?$'
Apache web logs
'([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?'

You may also like:
Using Athena to Query CloudTrail logs.
Parsing AWS Cloudtrail logs with EMR Hive and Spark

EMR s3DistCp “–groupBy” Regex examples

Written by mannem on . Posted in EMR || Elastic Map Reduce

Sometimes the regex could be confusing on S3DistCp groupBy. I usually use some online regex tools like to better work with string matching and grouping.

Here are some examples that I explored so far :

Example 1 :

Example 2 :

s3-dist-cp –src s3://support.elasticmapreduce/training/datasets/gdelt/ –dest hdfs:///gdeltWrongOutput1/ –groupBy ‘.*(\d{6}).*’

This command would not merge any files but copy all files with 6 numbers like 20130401.export.CSV.gz to destination.

Example 3 :

Example 4 :

If you want to concatenate matching files in the root directory and and all matching files inside a ‘sample_directory’ into a single file and compress that in gzip format. on will concatenate all matched file contents and creates one .gz file

Parsing AWS Cloudtrail logs with EMR Hive / Presto / Spark

Written by mannem on . Posted in cloudtrail, EMR || Elastic Map Reduce


AWS CloudTrail is a web service that records AWS API calls for your account and delivers log files to you. The recorded information includes the identity of the API caller, the time of the API call, the source IP address of the API caller, the request parameters, and the response elements returned by the AWS service.

Cloudtrail usually sends the logs to S3 bucket segregated into Accounts(Regions(data(logFile.json.gz))).

Cloudtrail recommends to use aws-cloudtrail-processing-library , but it may be complex if you wish to Ad-hoc query a huge number of big log files faster. If there are many files , it may also be harder to download all logs to a Linux/Unix node , unzip it and do RegEx matching on all these files. So, we can use a distributed environment like Apache Hive on AWS EMR cluster to parse/organize all these files using very simple SQL like commands.

This article guides you to query your Cloudtrail logs using EMR Hive. This article also provides some example queries which may be useful in different scenarios. It assumes that you have a running EMR cluster which Hive application installed and explored a bit.

Directory and File structure of Cloudtrail logs :

The following Hive queries shows how to create a Hive table and reference the cloud trial s3 bucket. Cloudtrail data is processed by CloudTrailInputFormat implementation, which defines the input data split and key/value records. The CloudTrailLogDeserializer class defined in SerDe is called to format the data into a record that maps to column and data types in a table. Data (such as using an INSERT statement) to be written is translated by the Serializer class defined in SerDe to the format that the OUTPUTFORMAT class( HiveIgnoreKeyTextOutputFormat) can read.

These classes are part of /usr/share/aws/emr/goodies/lib/ EmrHadoopGoodies-x.jar & /usr/share/aws/emr/goodies/lib/ EmrHadoopGoodies-x.jar files and are automatically included in Hive classpath. Hive can also automatically de-compress the GZ files. All you need to do is to run a query similar to SQL commands. Some sample queries are included.

EMR uses an instance profile role on its nodes to auntenticate requests made to cloudtrail bucket. The default IAM policy on the role EMR_EC2_DefaultRole allows access to all S3 buckets. If your cluster do not have access , you may need to make sure the instance profile/Role has access to necessary s3 cloudtrail bucket.
Do not run any Insert overwrite on this hive table. If EMR has write access to the s3 bucket, an insert overwrite may delete all logs from this bucket. Please check hive language manual before attempting any commands
Cloudtrail json elements are extensive and are different for different kind of request. This SerDe (which is kind-of abandoned by EMR team)doesn’t include all possible rows in Ctrail. For example, if you try to query requestparameters , it would give FAILED: SemanticException [Error 10004]: Line 6:28 Invalid table alias or column reference ‘requestparameters’.
If your cloudtrail bucket has large number of files, Tez’s grouped splits or MR’s input splits calculation may take considerable time and memory. Make sure you allocate enough resources to the hive-client or tez-client.


To query those logs with EMR Presto. We will need to copy necessary jar’s to presto’s hive plugin directory on all nodes of EMR cluster and RESTART presto-server on ALL nodes.

Master / Core :

To be safe : I also ensured the correct ownership on all these copied files –


Check for presto server :

Now, run the queries on the cloudtrail table already created with Hive. The query syntax and functions are different from Hive and you should use Presto’s functions. Some example of queries are provided in AWS Athena’s documentation(which uses Presto )

Alternatively, you can use APACHE SPARK which has spark-shell to query cloutrail logs using the following library :

Here’s some instructions on this design :