Author Archive

Things to check in EMR Instance state logs

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

emr

EMR logs Instance state logs every 15 mins which helps to identify several important metrics related to your EMR nodes like Memory / Disk etc. In this article, I will show you how to recognize important metrics and how we can interpret  them when diagnosing an issue.

 

Finding Instance State logs of EMR cluster :

On EMR nodes, go to /emr/instance-state/

On S3, You will find in path like

s3://emr-log-bucket/j-QHD70YCKZWTG/node/i-0d861d80c83e33ec0/daemons/instance-state/

Things to check :

CPU load Avg :

If Load Average is higher than CPU count of that Instance type, there could be communication issues b/w daemons and all sort of issues with HDFS and shuffles in jobs.

CPU load average is the average number of processes being or waiting executed over past 1, 5 and 15 minutes. So the number shown above means:

  • load average over the last 1 minute is 1.10
  • load average over the last 5 minute is 1.00
  • load average over the last 15 minute is 0.46

Lets say my Ec2 instance type is m5.large which has 2 vCPU’s according to

https://aws.amazon.com/ec2/instance-types/m5/

If my load avg. is larger than 2, then I should be concerned..

 TOP :

check if there’s any processes occupying a lot of CPU and memory.

 process list(PS).

Search for running processes like ‘HRegionServer’ to verify if a process is running. See previous instance state  log if there’s a  PID (process id) change for that process. If there is a PID change, most probably the process got killed with OOM between this time.

 VMSTAT R B ,

B = blocked process – shouldn’t be blocked.

 DMESG

to see OS issues like if OS is out of memory you will see OS randomly killing important processes.

“Free –m”

to check free memory. Do not overly rely on this as we only record free –m every 15 mins and its not a true representation of memory during the entire time.

“Df –h”

to check disk space.

EMR Config for Big cluster to create Many Paritions in Hive

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

EMR Configuration :

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

 

 

 

 

Create Cluster CLI Command :

https://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html

In below command, search for <code>replace</code> and replace them with relevant parameters like your own security groups, ssh key , IAM Roles, EMR Log bucket etc.

 

scratch

Increase Spark Driver memory using PySpark session from EMR Notebooks

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

 

 

Spark UI vs. Spark History Server UI , which one to use and why ?

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Is Job Running ?

1. If you have Spark Applications Running, then you should be using SPARK UI. This UI is usually hosted on Spark Driver
– In YARN cluster mode, the Driver is run on YARN Application Master run on random Core node )
– IN YARN Client  Mode, the Driver is run on Master node itself.
To access Spark UI, You should be going to  YARN ResourceManager UI First. Then navigate to corresponding Spark Application and use “Application Master” link to Access Spark UI. If you observe the link, its taking you  you to the application master’s web UI at port 20888. This is basically a proxy running on master  listening on 20888  which makes available the Spark UI(which runs on either Core node or Master node)

2. You can also access Spark UI by going directly to Driver Hostname and Portname where its hosted.
For example, when I run spark-submit in cluster mode, it spinned up application_1569345960040_0007. In my driver logs I see below messages
19/09/24 22:29:15 INFO Utils: Successfully started service ‘SparkUI’ on port 35395.
19/09/24 22:29:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://ip-10-0-0-69.myermdomain.com:35395
Where ip-10-0-0-69.myermdomain.com is one of my core node.
So I can go to
http://ip-10-0-0-69.myermdomain.com:35395
This automatically routes me to Master node proxy server listening on port 20888
 http://ip-10-0-0-113.ec2.internal:20888/proxy/application_1569345960040_0007/
Please note that, these links are temporary and will only show the UI while the Spark Application is running.

Is Job Completed ?

But if you want to see UI even when Spark job is completed, you should use Spark HistoryServer UI directly at http://master-public-dns-name:18080/.
Spark History Server can also be used for Running Jobs using “Show Incomplete Applications” Button. Spark History Server does this by using Spark Event logs which is enabled on EMR by default.

Differences between Spark UI and Spark History UI

 But looks like Spark History Server has some  differences when compared to “Spark UI” (For Running Apps of course ). Some of em’ that I observed are :
– Spark UI has “Kill” Button so your can kill some Spark Stages while Spark History Server doesn’t.
– SPark UI has “SQL” tab which shows more information about spark-sql jobs while Spark History Server doesn’t.
– Spark UI can pull up live  Thread Dumps for Executors  while Spark History Server doesn’t.
– Spark UI can give most update to date info(like “Total Uptime”) on Tasks while there can be a bit lag in  Spark History Server UI.

Download and parse Presto Server logs on EMR to find an Exception

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

<p class=”dropcap”>This article guides your to download all presto server logs from all EMR nodes using AWS S3 CLI so we can parse and look for errors. We can extend this for any EMR Application logs. </p>

Here’s the script :

– Replace s3://EMR-Log-bucket with the Bucket/Prefix where you have your EMR logs.
Please see this document to locate the path.

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html#emr-manage-view-web-log-files-s3

– Replace Cluster Id (Here I am using an EMR cluster ID j-1QZPX8GEC1234)

– Replace “some-absolute-path” with your own absolute path where you want to download(like /mnt/emr-logs/)

 

EMRFS Role Mappings integration with LDAP JupyterHub EMR

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

This Article Assumes you have explored following Articles

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-emrfs-iam-roles.html

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-jupyterhub-user-impersonation.html

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-jupyterhub-ldap-users.html

This Article expects your have a working Windows AD server with LDAP enabled.

EMRFS Role Mapping Allows you to use a Different Roles than default EMR_EC2_DefaultRole which will be used to make calls to S3. Using security configurations, we define a mapping to map an User/Group/Prefix to a particular IAM Role. Example configuration,

user1_onGroup1 -> Group1

user1_onGroup1 -> Group2

 

Here, the User and Group can be LDAP User/ LDAP Group respectively.

This article guides you to integrate LDAP with JupyterHub on EMR. After this setup,  User’s on your LDAP server can login in EMR’s JupyterHub to submit YARN Jobs. We will also enable User Impersonation where YARN Jobs are run by your LDAP user and not default user like ‘yarn’. If  EMRFS Role Mapping is enabled, then an IAM Role corresponding to your LDAP User will be used to make calls to S3. This will also make sure that if an LDAP User belongs to an LDAP group, IAM Role Mapping corresponding to that Group will be used.

First Spin up EMR cluster using Following Configuration and also using above EMRFS Role mappings using security configurations.

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

 

  • livy.impersonation.enabled=true to enable Livy user impersonation.
  • hadoop.security.group.mapping = org.apache.hadoop.security.LdapGroupsMapping to make sure Hadoop connect directly to an LDAP server to resolve the list of groups instead of operating systems’ group name resolution. If we do not do this step, EMRFS Role Mapping will not work with LDAP Groups.
  • hadoop.security.group.mapping.ldap.bind.user , the user that will be used to make LDAP search to retreive Group information.
  • hadoop.security.group.mapping.ldap.bind.password, this user’s LDAP password
  • hadoop.security.group.mapping.ldap.url , hostname and port of your LDAP server
  • hadoop.security.group.mapping.ldap.base, configures the search base for the LDAP connection. This is a distinguished name, and will typically be the root of the LDAP directory.

See https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html#LDAP_Groups_Mapping

Now Login to Master Node and run the following scripts

Now login to JupyterHub UI using LDAP user’s credentials. Once you submit a spark job, the job will make use of IAM Role Mapped to this LDAP user to make calls to S3.

 

Some other Considerations

– we can use “hadoop.user.group.static.mapping.overrides” to provide necessary mapping so that for a user like Hadoop will use mapping defined here and not your LDAP server. Please see https://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-common/core-default.xml

Example :

“hadoop.user.group.static.mapping.overrides” : “hive=hadoop,hive;hdfs=hadoop,hdfs;oozie=users,hadoop,oozie;mapred=hadoop,mapred;yarn=hadoop;”

– We can use “hdfs group <ldap-user-name>” to test if the org.apache.hadoop.security.LdapGroupsMapping is working or not. This command will contact your LDAP server configured on “hadoop.security.group.mapping.ldap.url” to get the LDAP Group information. If this returns an error, then there might be some issue with your LDAP server config that you set using “hadoop.security.group.mapping.ldap.*”.

 

 

Converting Concatenated or Streaming JSON -> newline-delimited JSON for Kinesis Anaytics to Athena

Written by mannem on . Posted in Athena, AWS BIG DATA, Kinesis

kinesis

kinesis

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) needs JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream). However records emitting from Kinesis Analytics can only be in concatenated JSON format which cannot be used for Athena. This arcticle guldes you on different options and shows how to convert the JSON with Firehose transformations on Lambda.

Kinesis Analytics being a real time streaming solution , will choose concatenated/streaming JSON (instead of usual newline-delimited JSON) for its JSON output which better supports records which has delimiters like /n within itself. So records like  <code>{“id”: 1}{“id”: 2}</code> are expected and a streaming consumer would have to parse individual records in that case.

https://en.wikipedia.org/wiki/JSON_Streaming

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) will be needing JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream) like in this case. So, you likely need to some alternatives to eventually work on this concatenated JSON data with Athena.

For a Pipeline like the following who wants to do both Kinesis Analytics & Run Athena , I see 2 options
Current pipeline : (Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> Athena )

————————————————-
1.  They can keep the old pipeline , but need to write and run an additional parser at end of S3 to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) ->  Athena )

The easiest way for the customer to solve this is to use Firehose in-line transformations to add a new line character at the end of every record. Alternatively, the customer could configure a Lambda function to trigger based on the S3 put to do the same thing.

Firehose in-line transformations :

Kinesis Analytics uses PutRecordBatch API(firehose) to emit the records to the Destination like Firehose. Each JSON record is translated to a Firehose record. You should be able to perform in-line transformation to append newline character per record, by writing a custom function on the Lambda or editing some existing blue prints.

http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Sample blog post on transformations : https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

————————————————-

2. You can keep the old pipeline , but need to write and run an additional parser probably a Lambda function to trigger based on the S3 put, to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) on Lambda -> Athena

————————————————-
3. Some customers seem to use Analytics as just a plug  which can translate the Kinesis stream records to Firehose(to S3) without writing any additional consumer. They probably need to eliminate Analytics unless it supports JSON with delimiters as output that can be processed using Ahena. They may need to replace it with some consumer with inbuilt streaming parser which can put to S3 with delimited JSON.

(Kinesis Stream -> consumer(can be Lambda) -> Firehose -> S3 -> Athena ) or,
(Kinesis Stream -> consumer(can be Lambda) -> S3 -> Athena )

However, if Analytics in future does support JSON with new line delimiter , they can keep old Pipeline.
————————————————-

Configuring an EC2 instance as DNS server using BIND that can only resolve forward lookups but not reverse lookups in an AWS VPC

Written by mannem on . Posted in EC2 Linux

Expectations while using EMRFS

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Expectations while using EMRFS :

– $ hadoop The fs -put command updates both the S3 and EMRFS tables (files are created in S3 and also in the EMRFS table).
Ex) hadoop fs -put ./111.txt s3: // mannem / emrfstest1
– $ hadoop The fs -rm command updates S3 but does not update the EMRFS table. (Deleted in S3, but still in the EMRFS table)
Ex) hadoop fs -rm -f s3: //mannem/emrfstest1/111.txt
– $ hadoop The fs -mv command will rename S3 (create a new name after deleting it), but only add a new name to the EMRFS table (add new name without deleting existing information).
Ex) hadoop fs -mv s3: //mannem/emrfstest1/emrfs-didnt-work.png s3: //mannem/emrfstest1/emrfs-didnt-work_new.png
– Adding files from the S3 console (WEB UI) is added to S3 but not to the EMRFS table
– Deleting files from the S3 console (WEB UI) will delete them in S3, but they will not be deleted in the EMRFS table
– Renaming a file in the S3 console (WEB UI) changes the name in S3, but does not rename the EMRFS table.
– The EMRFS CLI (for example, $ emrfs delete or $ emrfs sync) does not change the actual data in S3. Only add / delete DynamODB meta tables used by EMRFS.
– EMRFS uses DynamoDB. In EMR clusters, you can view the table names with the emrfs describe-metadata command. You can also see it on the EMR web console.

 

– In the EMRFS table, S3 prefix value is entered in HashKey and Object name is entered in Rangekey.
– You can optionally specify the number of retries and the time to wait until the next retry when an exception occurs
: Http://docs.aws.amazon.com/emr/latest/ManagementGuide/emrfs-retry-logic.html
– Please refer to http://docs.aws.amazon.com/emr/latest/ManagementGuide/emrfs-files-tracked.html for related information.

EMR vCPU vCore issue

Written by mannem on . Posted in AWS BIG DATA, EMR || Elastic Map Reduce

Several customer confuse when they see vCore’s used by EMR is different from what Ec2 vCPU’s. This article will clarify why EMR had to use vCore’s and some problems that exist with Instance Fleets and how to workaround them.

When you choose an instance type using the AWS EMR Management Console, the number of vCPU shown for each Instance type is the number of YARN vCores for that instance type, not the number of EC2 vCPUs for that instance type.
http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-purchasing-options.html

1.  EMR console is picking the yarn.nodemanager.resource.cpu-vcores value for the respective instance type from a predefined fixed mapping done by EMR for every instance type / Family.  For some instance types/families like M5’s, EMR made vCore same as Ec2 vCPU‘s. And for some other instance types(Like M4 family), the setting is double the actual Ec2 vCPU’s.

For example : EMR used 80 vcore’s for m4.10xlarge whereas Ec2 reports vCPU’s as 40.

2. So it seems that the intent here is to report VCore usage at the YARN level, as opposed to the actual ec2 instance level.

3. The discrepancy on the EMR Console exists because we’re trying to represent a cluster’s compute power from YARN perspective. Since EMR clusters run applications according to their YARN settings, I think some decision may have been made to deem this a better representation of the compute resources than ec2’s vCPU.

4. The reason this is done is  to ensure that YARN runs enough containers to max out the CPU as much as possible. EMR determined at the introduction of some instance type families, that for a majority of use-cases, without doubling this value, the instances CPUs will usually be underutilized because most of the time applications are I/O bound.That is, if vCPUs were set to the actual number of CPUs for these instance types, you’d get about one YARN container per actual vCPU, but those containers would spend most of their time blocked on I/O anyway, so you could probably actually run more containers in order to max out the CPU.

5. Amazon EMR makes an effort to provide optimal configuration settings as defaults for each instance type for the broadest range of use cases(types of application and Engines like MapReduce and Spark).  However it is possible that you may need to manually adjust these settings for the needs of your application. This value may be changed via the Configuration API referencing the yarn.nodemanager.resource.cpu-vcores for your different applications and workloads using “yarn-site” classification.
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

https://hadoop.apache.org/docs/r2.8.3/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

Problems :

Sometime customer do not want to double the VCore’s for each vCPU. Some use-cases can cause  containers to compete for a single hyperthread(VCPU) causing subpar performance.  despite other nodes still having idle hyperthreads.

For example : If a customer has

– 1 m4.10xlarge core(which will double 40 vCPU to 80 vCore’s) &

– 1 m5.12xlarge Task node(with 1:1 mapping of  vCPU to vCore i.e 48 )

We cannot define “yarn.nodemanager.resource.cpu-vcores” as a fixed value for this cluster.

– For Instance Fleets, where  multiple instance families can be specified by customer  and fulfilled by EMR to satisfy Target capacity, its hard to set a single EMR configuration that encompasses all instance families.

– Similar issue exists for Uniform instance groups, with different instance families on different Core’s / Task group.

Workarounds :

————————————————-
– I tested the following settings to be set on “yarn-site” classification. This will tell YARN Nodemanagers(NM) on each node to use the vCpu’s of its underlying ec2 Linux instance  as vCore’s. This will ignore the defaults set by EMR. This means a m4.10xlarge node(NM) will only allocate 40 vcore’s for YARN containers because there are 40 logical processors.  The correct number off Vcore’s should be confirmed from ResourceManager UI or using YARN commands like  “yarn node -status ip-172-31-44-155.ec2.internal:8041”

https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YarnCommands.html#node

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-web-interfaces.html

Please see what these parameters mean here :
https://hadoop.apache.org/docs/r2.8.3/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

JSON :

Note: If you use above parameters, AWS EMR Console(or Describe cluster API call) will still show 80 vcore’s(double the VCPU) for m4.10xlarge because that info is based on the predefined fixed mapping for different instance types and NOT pulled from your live nodes. For Instance fleets, EMR will still use 80 vcore’s count towards  capacity calculations to launch resources that satisfy your  Target capacity units per Fleet.  This could cause discrepancy in what Vcore capacity you need vs. what actually is provisioned and being used.

– For this reason, another suggestion is to use same Instance family(Like M5) on all your instance groups of the instance fleets and also for all fleets ,  so that they can have consistency with predictable 1:1 vCpu and vCore mapping.