Query AWS ES cluster by signing http requests with AWS IAM roles (python)

Written by mannem on . Posted in Elasticsearch

The AWS public facing documentation provides some python examples to sign the http reqests with IAM users’s to access other AWS resources. In this case, AWS ES cluster whose access policies are restricted to those IAM users.

If you wish to restrict the access to ES cluster with IAM roles instead, the signing process is a bit different.

The document (http://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html) seem to be only for IAM users but not for IAM roles.


Changing the signed header

Signing requests with IAM roles need additional header called ‘session token’ added the request header using a header name of ‘x-amz-security-token’.

So, in the ESrequest.py replacing this line:

headers = {'x-amz-date':amzdate, 'Authorization':authorization_header}

With the following should work for signing requests with IAM roles cred’s.

headers = {'x-amz-date':amzdate, 'Authorization':authorization_header, 'x-amz-security-token':token}

Where, the session ‘token’ string should be obtained from the corresponding IAM role. (These credential trio for roles are rotated frequently and they have an expiration date. So make sure you are using unexpired token)


Obtaining ‘token’ string from IAM Roles

If an EC2 instance is assuming a role, you can get it with

curl http://169.254.169.254/latest/meta-data/iam/security-credentials/MyEc2Role
(http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials)

In python, all these credentials can be obtained with ‘requests’ module and parsing them accordingly.

The token can be obtained similarly from other services assuming IAM roles like Lambda etc.


Find the end-to-end code here : https://github.com/mannem/elasticsearchDev/blob/master/ES_V4Signing_IAMRoles

YARN Log aggregation on EMR Cluster – How to ?

Written by mannem on . Posted in EMR || Elastic Map Reduce

earear

Why Log aggregation ?

User logs of Hadoop jobs serve multiple purposes. First and foremost, they can be used to debug issues while running a MapReduce application – correctness problems with the application itself, race conditions when running on a cluster, and debugging task/job failures due to hardware or platform bugs. Secondly, one can do historical analyses of the logs to see how individual tasks in job/workflow perform over time. One can even analyze the Hadoop MapReduce user-logs using Hadoop MapReduce(!) to determine any performance issues.

More info found here:
https://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/

Without log aggregation, when you try to access the job histoy logs from ResoureManager UI, you see this error.

“Aggregation is not enabled. Try the nodemanager at ip-172-31-41-154.us-west-2.compute.internal:8041”

This article guides you on how various EMR versions defaults the YARN log aggregation option. It also guides you to enable yarn log aggregation on EMR AMI’s that do not have aggregation by default. (Both on live cluster and while launching a cluster)

EMR Log aggregation vs YARN Log aggregation :

EMR had its own log aggregation into S3 bucket for persistent logging and debugging. EMR Web console provides similar feature as “yarn logs -applicationId” if you turn on debugging feature.

YARN log aggregation stores the application container logs in HDFS , where as EMR’s LogPusher (process to push logs to S3 as persistent option) needed the files in local file system. After post-aggregation , the default behavior of YARN is to copy the containers logs in local machines of core-nodes to HDFS and then after post-aggregation , DELETE those local files on individual core-nodes. Since they are being deleted by Nodemanager on individual node’s , EMR has no way to save those logs to a more persistent storage such as S3. So, historically, EMR had to the disable the YARN Log aggregation so that the container logs stay in local machines and could be pushed to S3. For Hue’s integrated feature such as displaying application log in Hue console, when you install Hue on EMR either AMI 3.x.x or 4.x.x, yarn.log-aggregation-enable will be enabled by default and container logs might be missing.

This behavior had been changed in latest EMR AMI’s like 4.3.0 , where both YARN Log aggregation and EMR log aggregation can work together. EMR basically introduced a patch described in https://issues.apache.org/jira/browse/YARN-3929 to all EMR AMI’s after EMR 4.3.0 having Hadoop branch-2.7.3-amzn-0 . Note that this patch is not adopted in open-source and is now part of EMR Hadoop.

With this commit, we basically are having an option to keep the files on local machines after log aggregation. Managed by “yarn.log-aggregation.enable-local-cleanup” property in yarn-site.xml on respective core/task nodes. This property is not public and can only be set on EMR distributions. This option is by default set to FALSE which means the cleanup on local machines WILL NOT take place. Logpusher will need this logs on local machines to push to S3 and LogPusher is the one which is responsible for removing those local logs after they are copied over and after certain retention period(4 hours for containers ).

For the logs to be deleted from local disks, we need to flip it to true with configurations API while launching the cluster. On live cluster, all core/task node’s yarn-site.xml should be updated and NM should be restarted. After the restart old container logs might still be present.

*** This options might not recommended because the Logpusher , will NOT be able to push those local container logs to customer’s(service’s) S3 if this option is set to true.
** and the Only source of container logs will be aggregated logs on HDFS which is not so persistent.


If you decide not to rely on both EMR LogPusher(that pushes to s3) and YARN Nodemanager(that aggregates logs to HDFS) and have your own monitoring solution that uploads logs to lets say an ElasticSearch or Splunk , then there’s few things to consider

1. Disable the YARN log aggregation using yarn.log-aggregation-enable = false . This means the YARN NodeManager on core nodes will not push the respective container logs on local disk to centralized HDFS. Note that Lohpusher can still delete(and can try to upload) the container logs to s3 after 4 hours(see /etc/logpusher/hadoop.config)
2. Once log aggregation is disabled, yarn.nodemanager.log.retain-seconds comes under picture, which will delete logs on local disks by default after 3 hours. This means Nodemanager can try to remove logs even before Logpusher tries to send them to s3 and delete the logs itself. So, make sure you increase this time so that your custom monitoring application has enough time to send logs to your preferred destination.


4.3.0 and above / 5.0.0 and Above

YARN log aggregation is enabled by default and the logs will also get pushed to your S3 log bucket.


EMR 4.x.x :

AMI 4.0.0 doesn’t support Hue, so yarn.log-aggregation-enable=false is default.

To enable it on 4.0.0 ->

Step 1: For 4.0.0, You may need to enable it by following the below procedure for 3.x.x.

As AMI 4.x.x uses upstart, You need to use the following command rather than “sudo service “.

sudo restart hadoop-mapreduce-historyserver

/etc/init/ folder has all service configuration files

Step 2: /etc/hadoop/conf.empty/yarn-site.xml is the configuration file that need to be edited.

Step 3: While launching the cluster, you can use yarn-site release-label classification to specify to make necessary changes to enable log-aggregation.

Configuration that needs to be enabled :

yarn.log-aggregation-enable=true

You can refer to the following page to configure your cluster.
https://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-configure-apps.html

4.1.0,4.2.0:

If you don’t install Hue, you need to turn it on specifically using EMR’s configuration(yarn.log-aggregation-enable=true) using yarn-site classification .

If you turn on yarn.log-aggregation-enable by specifically or Hue, application container logs will not be saved on your S3 location. It will stay on HDFS for Yarn’s log aggregation feature.


EMR 3.x.x :

Here’s how you enable log aggregation on 3.x.x EMR cluster:

Step 1:

Change yarn-site.xml settings to enable log aggregation

On Master node:
> vim /home/hadoop/conf/yarn-site.xml

Step 2:

Copy this configuration on all nodes.

yarn node -list|sed -n "s/^\(ip[^:]*\):.*/\1/p" | xargs -t -I{} -P10 scp -o StrictHostKeyChecking=no -i ~/MyKeyName.pem /home/hadoop/conf/yarn-site.xml hadoop@{}://home/hadoop/conf/

Where MyKeyName.pem is the private key for SSH’ing into slaves.

The above command lists all slave nodes and uses scp to copy(replace) yarn-site.xml present on master onto slaves nodes

Step 3:

Restart hadoop HistoryServer daemon on all nodes.

yarn node -list|sed -n "s/^\(ip[^:]*\):.*/\1/p" | xargs -t -I{} -P10 ssh -o StrictHostKeyChecking=no -i ~/MyKeyName.pem hadoop@{} "sudo service hadoop-mapreduce-historyserver restart"



scratch

Using DynamoDB as session provider with AWS SDK V3

Written by mannem on . Posted in Dynamo DB

The DynamoDB Session Handler is a custom session handler for PHP that allows developers to use Amazon DynamoDB as a session store. Using DynamoDB for session storage alleviates issues that occur with session handling in a distributed web application by moving sessions off of the local file system and into a shared location. DynamoDB is fast, scalable, easy to setup, and handles replication of your data automatically.

Setting up:

1. Make sure you have PHP >= 5.5.0
2. install AWS PHP SDK(v3) from here http://docs.aws.amazon.com/aws-sdk-php/v3/guide/getting-started/installation.html
3. Configure PHP SDK to use any of the credentials options as mentioned here: http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html
4. See more details about DyanmoDB provided session handler here: https://docs.aws.amazon.com/aws-sdk-php/v3/guide/service/dynamodb-session-handler.html
5. A DynamoDB table to store session info, with ‘id’ (String) as Hash key.

End to End PHP code with debug turned on:

> php sessionProvider.php
successfully connected

Now, check the DynamoDB table if the session information is stored successfully.

Here is the example structure(DynamoDB JSON format):

References :

scratch

hbase snapshot / export

Written by mannem on . Posted in EMR || Elastic Map Reduce

I observed that exporting large Hbase tables with Hbase provided ‘Export’ utility is very high CPU bound. If you are using default cluster configurations, the mappers may consume 100% CPU and may crash the regionServer(core-node) and your hbase. This article discusses some tuning on your map/reduce hbase export job. Also it focus on an alternate Hbase utility called ‘ExportSnapshot’ which mitigates problems with Hbase Export utility.

This article touches import/export tools that ship with hbase and shows how to use them efficiently

A small intro on Hbase Utilities.

————————————————-
org.apache.hadoop.hbase.mapreduce.Export

Export is a utility that will dump the contents of table to HDFS in a sequence file.

org.apache.hadoop.hbase.mapreduce.CopyTable

CopyTable is a utility that can copy part or of all of a table, either to the same cluster or another cluster. The target table must first exist. The usage is as follows:

org.apache.hadoop.hbase.snapshot.ExportSnapshot

The ExportSnapshot tool copies all the data related to a snapshot (hfiles, logs, snapshot metadata) to another cluster. The tool executes a Map-Reduce job, similar to distcp, to copy files between the two clusters, and since it works at file-system level the hbase cluster does not have to be online.
————————————————-

The main difference between Exporting a Snapshot and Copying/Exporting a table is that ExportSnapshot operates at HDFS level.

This means that Master and Region Servers are not involved in this operations.

Consequently, no unnecessary caches for data are created and there is no triggering of additional GC pauses due to the number of objects created during the scan process.

So, when Exporting snapshot, the job is not longer CPU bound.

Please see :

  • http://hbase.apache.org/book.html#ops.snapshots
  • http://blog.cloudera.com/blog/2013/03/introduction-to-apache-hbase-snapshots/
  • http://www.cloudera.com/content/cloudera/en/documentation/core/v5-3-x/topics/cm_bdr_managing_hbase_snapshots.html
  • Using snapshots, i was able to Export my hbase table from Source cluster with EMR 3.0.0 AMI to Target cluster with latest(at the time of writing this post) EMR 3.9.0 AMI.

    This job was considerably very fast than Hbase Export.

    Prepping source and target clusters:

    ————————————————-

    —Enabling snapshots on Hbase—

    You must add the following property to hbase-site.xml of HBase Master node:
    ‘hbase.snapshot.enabled’ property with value ‘true’.

    /home/hadoop/hbase/conf/hbase-site.xml

    Stop the HMaster process.

    > sudo service hbase-master stop

    service-nanny will restart this process again. This will load the new configuration.

    —Enabling communication between source and Target clusters—

    To transfer snapshot from source cluster to target cluster,

    The source cluster need to communicate with target node’s name-node process listening on hdfs default port 9000(hdfs getconf -confKey fs.default.name)

    if both these clusters are using default EMR security groups, then they can communicate by default.Otherwise you may need to enable the security groups to allow communication over that port.

    (Check telnet 9000 from source cluster’s master)

    ————————————————-

    —Creating a snapshot on Source cluster—

    On source cluster(EMR 3.0.0 )

    > hbase shell
    hbase(main):001:0> snapshot 'sourceTable', 'snapshotName'

    check if the snapshot is created at HDFS /hbase/.hbase-snapshot/

    —Exporting this snapshot(snapshotName) to Hdfs /hbase/.hbase-snapshot/ of Target cluster(EMR 3.9.0)—

    with Master node’s internal IP 172.31.42.191
    listening on HDFS port 9000.
    Using 16 mappers
    with ‘ExportSnapshot’

    > hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot snapshotName -copy-to hdfs://172.31.42.191:9000/hbase -mappers 16

    ————————————————-

    On Target cluster, check if the snapshot is exported

    > hadoop fs -ls /hbase/.hbase-snapshot/

    —Clone this snapshot to actual Hbase table on Target cluster—

    > hbase shell
    hbase(main):001:0> clone_snapshot 'snapshotName', 'newTableName'

    ————————————————-

    Installing and using Apache sqoop to export/Import large datasets (MySQL, S3) (CSV/TSV..) on EMR cluster

    Written by mannem on . Posted in EMR || Elastic Map Reduce

    emremr

    Often times the export/import activity may be limited on several performance bottlenecks. So, the activity may be faster if a distributed transfer is used instead of normal transfer. Some of the bottlenecks include Read Throughput , Write throughput , how the code parses the data(Inline or Batch etc,. Apache Sqoop(TM) is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.

    http://sqoop.apache.org/

    This guide shows you
    > To Install sqoop and
    > Export/Import MySQL tables (from S3 to RDS) ,(from RDS to S3) respectively.

    Considering a sample MySQL Table in RDS

    > mysql -h myrds.crezaaaruhfx.us-west-2.rds.amazonaws.com -u mannem -p

    mysql> describe dailyStockPrices;

    mysql> SELECT table_name AS "Table",
    -> round(((data_length + index_length) / 1024 / 1024), 2) "Size in MB"
    -> FROM information_schema.TABLES
    -> WHERE table_schema = "pipetest"
    -> AND table_name = "dailyStockPrices";

    > My MySQL table dataset has lot of commas in the fields, so I choose TSV format instead of CSV to import/export.
    > If I used CSV format, Sqoop will get confused parsing data.

    Sqoop on EMR 4.4.0 + is pre-installed

    Starting from EMR AMI version 4.4.0 , Sqoop 1.4.6 is available as sandbox. This can be installed by simply selecting this option while provisioning the EMR cluster. By default, Sqoop on EMR has a MariaDB and PostgresSQL driver installed. To install an alternate set of JDBC connectors for Sqoop, you need to install them in /usr/lib/sqoop/lib.

    http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-sandbox.html#emr-sqoop

    Sqoop on EMR 3.x.x can be installed with the following script:

    Import Commands:

    Import sqoop
    This command copies MySQL table from RDS to S3. The S3 file content type is TSV & File name will be in “part-m-00000” format.
    Note that with -m 1 , I am using single mapper task to run in parallel.
    sqoop import --connect jdbc:mysql://myrds.crezaaaruhfx.us-west-2.rds.amazonaws.com/pipetest --username mannem --password Password123 --table dailyStockPrices --target-dir s3://mannem/sqoopmatrix -m 1 --fields-terminated-by '\t' --lines-terminated-by '\n'
    Check S3 contents
    hadoop fs -cat s3://mannem/sqoopmatrix/part-m-00000
    Sqoop command usage:
    http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

    Export commands :

    Before export, The Destination MySQL/PSQL table should already be created with a similar schema.
    Export to RDS(MySQL)
    This command copies TSV file from S3 to MySQL Table.
    sqoop export --connect jdbc:mysql://myrds.crezaaaruhfx.us-west-2.rds.amazonaws.com/pipetest --username mannem --password Password123 --table dailyStockPrices --input-fields-terminated-by '\t' --input-lines-terminated-by '\n' --export-dir s3://mannem/sqoopmatrix/part-m-00000
    Export to Redshift(PSQL)
    sqoop export --connect jdbc:redshift://$MYREDSHIFTHOST:5439/mydb --table mysqoopexport --export-dir s3://mybucket/myinputfiles/ --driver com.amazon.redshift.jdbc41.Driver --username master --password Mymasterpass1
    Export commands with mariadb connection string
    sqoop export --connect jdbc:mariadb://$HOSTNAME:3306/mydb --table mysqoopexport --export-dir s3://mybucket/myinputfiles/ --driver org.mariadb.jdbc.Driver --username master --password Mymasterpass1
    Export with using Secure Socket Layer encryption
    sqoop export --connect jdbc:mariadb://$HOSTNAME:3306/mydb?verifyServerCertificate=false&useSSL=true&requireSSL=true --table mysqoopexport --export-dir s3://mybucket/myinputfiles/ --driver org.mariadb.jdbc.Driver --username master --password Mymasterpass1
    scratch

    How to retrieve Cluster ID / JobFlow ID from EMR master node.

    Written by mannem on . Posted in EMR || Elastic Map Reduce

    You may look at  /mnt/var/lib/info/ on Master node to find lot of info about your EMR cluster setup.

    More specifically /mnt/var/lib/info/job-flow.json contains the jobFlowId or ClusterID.

    I was able to install a JSON parser on my Master node to retrieve jobFlowID.

    # Install JSON PARSER
    # Install pkgs required for jsawk
    wget http://pkgs.repoforge.org/js/js-1.60-1.el6.rf.x86_64.rpm
    wget http://pkgs.repoforge.org/js/js-devel-1.60-1.el6.rf.x86_64.rpm
    sudo yum install -y js-1.60-1.el6.rf.x86_64.rpm
    sudo yum install -y js-devel-1.60-1.el6.rf.x86_64.rpm
    # Install JSAWK curl -L http://github.com/micha/jsawk/raw/master/jsawk > jsawk
    chmod 755 jsawk && sudo mv jsawk /usr/bin/
    # Parse and print job flow ID
    jsawk 'return this.jobFlowId' < /mnt/var/lib/info/job-flow.json

    Incremental Load: avoiding data loss

    Written by mannem on . Posted in Data Pipelines, Redshift

    While copying data from RDS to Redshift..

    To avoid data loss, start the ‘Incremental copy template’ before the ‘Full copy’

    A sample implementation can be,

    ————————————————-
    Incremental copy scheduled start time – 1:50 PM

    Full copy start time – 2:00 PM
    A DB Insert – 2:10 PM
    Full copy End Time – 4:00 PM

    A DB Insert – 4:05 PM

    Incremental copy First run – 4:10 PM
    ————————————————-

    > In the above example, the contents of first DB Insert at 2:10 may or may not be included in FULL copy.
    > Contents of the second insert will not be included in Full copy.

    How to ensure that these new inserts will show up in Redshift database ?

    > As the ‘Incremental copy template’ uses TIME SERIES scheduling, the actual ‘Incremental copy activity’ run wont start at scheduled start time(1:50), rather it will start and the end of scheduled start time(4:10). All the DB changes between ‘scheduled start date/time’ and ‘first run of the actual copy activity’ will be copied to redshift.
    > So, the first incremental copy run will copy all new DB inserts between 1:50 PM and 4:10 PM to redshift. This includes the contents of two DB inserts which are happening during/after FULL copy activity.