Author Archive

All about AWS Data-Pipelines Taskrunner

Written by mannem on . Posted in Data Pipelines

How Data-Pipeline installs taskrunner on Ec2 instance?

Data-pipeline launches an Ec2 instances on your behalf using with the following user-data script.

————————————————-

————————————————-

> It downloads a script called remote-runner-install which installs the Taskrunner with options passed from Data-Pipelines service.

Here’s how the script looks like:

Now, this script in-turn runs and passes all arguments to aws-datapipeline-taskrunner-v2.sh. aws-datapipeline-taskrunner-v2.sh script is responsible for running the task runner by invoking the actual TaskRunner jar.


As you can see , Just like installing taskrunner on existing resources[1], Data-Pipelines runs the command java -cp "$TASKRUNNER_CLASSPATH" amazonaws.datapipeline.taskrunner.Main \
--workerGroup "$WORKER_GROUP" --endpoint "$ENDPOINT" --region "$REGION" --logUri "$LOG_URI" --taskrunnerId "$TASKRUNNER_ID"
, by passing all required arguments to the taskrunner.

Taskrunner process is responsible for polling AWS Data Pipeline service for tasks and then performs those tasks.

Task Lifecycle

Using a single hive warehouse for all EMR(Hadoop) clusters

Written by mannem on . Posted in EMR || Elastic Map Reduce

As the EMR/Hadoop cluster’s are transient, tracking all those databases and tables across clusters may be difficult. So, Instead of having different warehouse directories across clusters, You can use a single permanent hive warehouse across all EMR clusters. S3 would be a great choice as it is persistent storage and had robust architecture providing redundancy and read-after-write consistency.

For each cluster:

This can be configured using hive.metastore.warehouse.dir property on hive-site.xml.

You may need to update this setting on all nodes.

On a single hive session:

this can be configured using a command like set hive.metastore.warehouse.dir ="s3n://bucket/hive_warehouse"

or initialize hive cli with the following invocation -hiveconf hive.metastore.warehouse.dir=s3n://bucket/hive_warehouse

Note that using above configuration, all default databases and tables will be stored on s3 on path like s3://bucket/hive_warehouse/myHiveDatabase.db/

Running complex queries on redshift with Data-pipelines

Written by mannem on . Posted in Data Pipelines, Redshift

Sometimes AWS Data-Pipelines SQLActivity may not support complex queries. This is because Data-Pieplines SqlActivity passes this script to JDBS executeStatement(Prepared statement). This script is supposed to be idempotent. So here’s an alternative to run psql/sql commands using Data-Pipelines.

Suppose you have the following psql command,

select 'insert into event_data_' ||to_char(current_date,'yyyymm')|| '_v2 select * from stage_event_data_v2 where event_time::date >= '''||to_char(current_date, 'YYYY-MM')||'-01'' and event_time::date <= '''||last_day(current_date)||''';';

and it should output,

insert into event_data_201511_v2 select * from stage_event_data_v2 where event_time::date >= '2015-11-01' and event_time::date <= '2015-11-30';

This is a valid command in psql and can be successfully executed with workbenches and psql shell.

But using Data-pipelines, executing the above command will throw and error:

ERROR processing query/statement. Error: Parsing failed

This is because the script appears to be changing(not idempotent) when it is executed.

If you have a complex redshift commands and wish to performing operations against Redshift that involve custom logic. You could rather write a program in your favorite language and run it
using ShellCommandActivity. This is a quite valid way of interacting with Redshift.

There are several ways to do this. I am including a shell script and its Data-pipelne template as a reference here.

Sample shell command:

Sample Data-pipelines template:


-------------------------------------------------
Some info on the script and Data-pipeline:

1. This script file has 2 arguments (Arg 1 is the sql script that you need to execute , Arg 2 is the Redshift password). These arguments are provided in Data-pipeline shellCommandActivity object using scriptArgument field.

2. The script outputs its result to v2.sql and uploads to s3 bucket (with -t tuples only option), so that you can run the script later.

3. The Data-pipeline template uses the *myRedshiftPass parameter id to hide the password from DataPipelines.
-------------------------------------------------

Query AWS ES cluster by signing http requests with AWS IAM roles (python)

Written by mannem on . Posted in Elasticsearch

The AWS public facing documentation provides some python examples to sign the http reqests with IAM users’s to access other AWS resources. In this case, AWS ES cluster whose access policies are restricted to those IAM users.

If you wish to restrict the access to ES cluster with IAM roles instead, the signing process is a bit different.

The document (http://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html) seem to be only for IAM users but not for IAM roles.


Changing the signed header

Signing requests with IAM roles need additional header called ‘session token’ added the request header using a header name of ‘x-amz-security-token’.

So, in the ESrequest.py replacing this line:

headers = {'x-amz-date':amzdate, 'Authorization':authorization_header}

With the following should work for signing requests with IAM roles cred’s.

headers = {'x-amz-date':amzdate, 'Authorization':authorization_header, 'x-amz-security-token':token}

Where, the session ‘token’ string should be obtained from the corresponding IAM role. (These credential trio for roles are rotated frequently and they have an expiration date. So make sure you are using unexpired token)


Obtaining ‘token’ string from IAM Roles

If an EC2 instance is assuming a role, you can get it with

curl http://169.254.169.254/latest/meta-data/iam/security-credentials/MyEc2Role
(http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials)

In python, all these credentials can be obtained with ‘requests’ module and parsing them accordingly.

The token can be obtained similarly from other services assuming IAM roles like Lambda etc.


Find the end-to-end code here : https://github.com/mannem/elasticsearchDev/blob/master/ES_V4Signing_IAMRoles

UK police data

Written by mannem on . Posted in Data-sets

data.police.uk provides a complete snapshot of crime, outcome, and stop and search data, as held by the Home Office at a particular point in history.

The actual data is located on S3 under bucket policeuk-data and can be accessed with a URL similar to
https://policeuk-data.s3.amazonaws.com/archive/20yy-mm.zip , (Where yy,mm are year and month that can be replaced accordingly)

The Structure:

All files are organized by YEAR and MONTH.

Each month has a ZIP file with CSV files inside the zip file.

The January 2015 file 2015-01.zip contains data for all months starting from 2010-12 to 2015-01

Contents of a sample file:


The columns in the CSV files are as follows:

FieldMeaning
Reported byThe force that provided the data about the crime.
Falls withinAt present, also the force that provided the data about the crime. This is currently being looked into and is likely to change in the near future.
Longitude and LatitudeThe anonymised coordinates of the crime. See Location Anonymisation for more information.
LSOA code and LSOA nameReferences to the Lower Layer Super Output Area that the anonymised point falls into, according to the LSOA boundaries provided by the Office for National Statistics.
Crime typeOne of the crime types listed in the Police.UK FAQ.
Last outcome categoryA reference to whichever of the outcomes associated with the crime occurred most recently. For example, this crime's 'Last outcome category' would be 'Offender fined'.
ContextA field provided for forces to provide additional human-readable data about individual crimes. Currently, for newly added CSVs, this is always empty.

The Challenge:

  • The given data contains some inbuilt errors in the Easting, Northing , Crime_type fields.
  • Data is in CSV format with commas in data itself.
  • The CSV files contains column HEADERS i.e the first record in a CSV file is a header record containing column (field) names

What is unique ?

  • The same data can be accessed over API. The API is implemented as a standard JSON web service using HTTP GET and POST requests. Full request and response examples are provided in the documentation.
  • The response contains ID of the crime which may be unique and can used as HashKey while storing and Querying in NoSql.
  • The JSON file can also be used for as index document for Elasticsearch.

Example API call via REST: https://data.police.uk/api/crimes-street/all-crime?lat=52.629729&lng=-1.131592&date=2013-01

Example Responce:

More details on API access can be found here: data.police.uk/docs/


YARN Log aggregation on EMR Cluster – How to ?

Written by mannem on . Posted in EMR || Elastic Map Reduce

earear

Why Log aggregation ?

User logs of Hadoop jobs serve multiple purposes. First and foremost, they can be used to debug issues while running a MapReduce application – correctness problems with the application itself, race conditions when running on a cluster, and debugging task/job failures due to hardware or platform bugs. Secondly, one can do historical analyses of the logs to see how individual tasks in job/workflow perform over time. One can even analyze the Hadoop MapReduce user-logs using Hadoop MapReduce(!) to determine any performance issues.

More info found here:
https://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/

Without log aggregation, when you try to access the job histoy logs from ResoureManager UI, you see this error.

“Aggregation is not enabled. Try the nodemanager at ip-172-31-41-154.us-west-2.compute.internal:8041”

This article guides you on how various EMR versions defaults the YARN log aggregation option. It also guides you to enable yarn log aggregation on EMR AMI’s that do not have aggregation by default. (Both on live cluster and while launching a cluster)

EMR Log aggregation vs YARN Log aggregation :

EMR had its own log aggregation into S3 bucket for persistent logging and debugging. EMR Web console provides similar feature as “yarn logs -applicationId” if you turn on debugging feature.

YARN log aggregation stores the application container logs in HDFS , where as EMR’s LogPusher (process to push logs to S3 as persistent option) needed the files in local file system. After post-aggregation , the default behavior of YARN is to copy the containers logs in local machines of core-nodes to HDFS and then after post-aggregation , DELETE those local files on individual core-nodes. Since they are being deleted by Nodemanager on individual node’s , EMR has no way to save those logs to a more persistent storage such as S3. So, historically, EMR had to the disable the YARN Log aggregation so that the container logs stay in local machines and could be pushed to S3. For Hue’s integrated feature such as displaying application log in Hue console, when you install Hue on EMR either AMI 3.x.x or 4.x.x, yarn.log-aggregation-enable will be enabled by default and container logs might be missing.

This behavior had been changed in latest EMR AMI’s like 4.3.0 , where both YARN Log aggregation and EMR log aggregation can work together. EMR basically introduced a patch described in https://issues.apache.org/jira/browse/YARN-3929 to all EMR AMI’s after EMR 4.3.0 having Hadoop branch-2.7.3-amzn-0 . Note that this patch is not adopted in open-source and is now part of EMR Hadoop.

With this commit, we basically are having an option to keep the files on local machines after log aggregation. Managed by “yarn.log-aggregation.enable-local-cleanup” property in yarn-site.xml on respective core/task nodes. This property is not public and can only be set on EMR distributions. This option is by default set to FALSE which means the cleanup on local machines WILL NOT take place. Logpusher will need this logs on local machines to push to S3 and LogPusher is the one which is responsible for removing those local logs after they are copied over and after certain retention period(4 hours for containers ).

For the logs to be deleted from local disks, we need to flip it to true with configurations API while launching the cluster. On live cluster, all core/task node’s yarn-site.xml should be updated and NM should be restarted. After the restart old container logs might still be present.

*** This options might not recommended because the Logpusher , will NOT be able to push those local container logs to customer’s(service’s) S3 if this option is set to true.
** and the Only source of container logs will be aggregated logs on HDFS which is not so persistent.


If you decide not to rely on both EMR LogPusher(that pushes to s3) and YARN Nodemanager(that aggregates logs to HDFS) and have your own monitoring solution that uploads logs to lets say an ElasticSearch or Splunk , then there’s few things to consider

1. Disable the YARN log aggregation using yarn.log-aggregation-enable = false . This means the YARN NodeManager on core nodes will not push the respective container logs on local disk to centralized HDFS. Note that Lohpusher can still delete(and can try to upload) the container logs to s3 after 4 hours(see /etc/logpusher/hadoop.config)
2. Once log aggregation is disabled, yarn.nodemanager.log.retain-seconds comes under picture, which will delete logs on local disks by default after 3 hours. This means Nodemanager can try to remove logs even before Logpusher tries to send them to s3 and delete the logs itself. So, make sure you increase this time so that your custom monitoring application has enough time to send logs to your preferred destination.


4.3.0 and above / 5.0.0 and Above

YARN log aggregation is enabled by default and the logs will also get pushed to your S3 log bucket.


EMR 4.x.x :

AMI 4.0.0 doesn’t support Hue, so yarn.log-aggregation-enable=false is default.

To enable it on 4.0.0 ->

Step 1: For 4.0.0, You may need to enable it by following the below procedure for 3.x.x.

As AMI 4.x.x uses upstart, You need to use the following command rather than “sudo service “.

sudo restart hadoop-mapreduce-historyserver

/etc/init/ folder has all service configuration files

Step 2: /etc/hadoop/conf.empty/yarn-site.xml is the configuration file that need to be edited.

Step 3: While launching the cluster, you can use yarn-site release-label classification to specify to make necessary changes to enable log-aggregation.

Configuration that needs to be enabled :

yarn.log-aggregation-enable=true

You can refer to the following page to configure your cluster.
https://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-configure-apps.html

4.1.0,4.2.0:

If you don’t install Hue, you need to turn it on specifically using EMR’s configuration(yarn.log-aggregation-enable=true) using yarn-site classification .

If you turn on yarn.log-aggregation-enable by specifically or Hue, application container logs will not be saved on your S3 location. It will stay on HDFS for Yarn’s log aggregation feature.


EMR 3.x.x :

Here’s how you enable log aggregation on 3.x.x EMR cluster:

Step 1:

Change yarn-site.xml settings to enable log aggregation

On Master node:
> vim /home/hadoop/conf/yarn-site.xml

Step 2:

Copy this configuration on all nodes.

yarn node -list|sed -n "s/^\(ip[^:]*\):.*/\1/p" | xargs -t -I{} -P10 scp -o StrictHostKeyChecking=no -i ~/MyKeyName.pem /home/hadoop/conf/yarn-site.xml hadoop@{}://home/hadoop/conf/

Where MyKeyName.pem is the private key for SSH’ing into slaves.

The above command lists all slave nodes and uses scp to copy(replace) yarn-site.xml present on master onto slaves nodes

Step 3:

Restart hadoop HistoryServer daemon on all nodes.

yarn node -list|sed -n "s/^\(ip[^:]*\):.*/\1/p" | xargs -t -I{} -P10 ssh -o StrictHostKeyChecking=no -i ~/MyKeyName.pem hadoop@{} "sudo service hadoop-mapreduce-historyserver restart"



scratch

Using DynamoDB as session provider with AWS SDK V3

Written by mannem on . Posted in Dynamo DB

The DynamoDB Session Handler is a custom session handler for PHP that allows developers to use Amazon DynamoDB as a session store. Using DynamoDB for session storage alleviates issues that occur with session handling in a distributed web application by moving sessions off of the local file system and into a shared location. DynamoDB is fast, scalable, easy to setup, and handles replication of your data automatically.

Setting up:

1. Make sure you have PHP >= 5.5.0
2. install AWS PHP SDK(v3) from here http://docs.aws.amazon.com/aws-sdk-php/v3/guide/getting-started/installation.html
3. Configure PHP SDK to use any of the credentials options as mentioned here: http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html
4. See more details about DyanmoDB provided session handler here: https://docs.aws.amazon.com/aws-sdk-php/v3/guide/service/dynamodb-session-handler.html
5. A DynamoDB table to store session info, with ‘id’ (String) as Hash key.

End to End PHP code with debug turned on:

> php sessionProvider.php
successfully connected

Now, check the DynamoDB table if the session information is stored successfully.

Here is the example structure(DynamoDB JSON format):

References :

scratch

hbase snapshot / export

Written by mannem on . Posted in EMR || Elastic Map Reduce

I observed that exporting large Hbase tables with Hbase provided ‘Export’ utility is very high CPU bound. If you are using default cluster configurations, the mappers may consume 100% CPU and may crash the regionServer(core-node) and your hbase. This article discusses some tuning on your map/reduce hbase export job. Also it focus on an alternate Hbase utility called ‘ExportSnapshot’ which mitigates problems with Hbase Export utility.

This article touches import/export tools that ship with hbase and shows how to use them efficiently

A small intro on Hbase Utilities.

————————————————-
org.apache.hadoop.hbase.mapreduce.Export

Export is a utility that will dump the contents of table to HDFS in a sequence file.

org.apache.hadoop.hbase.mapreduce.CopyTable

CopyTable is a utility that can copy part or of all of a table, either to the same cluster or another cluster. The target table must first exist. The usage is as follows:

org.apache.hadoop.hbase.snapshot.ExportSnapshot

The ExportSnapshot tool copies all the data related to a snapshot (hfiles, logs, snapshot metadata) to another cluster. The tool executes a Map-Reduce job, similar to distcp, to copy files between the two clusters, and since it works at file-system level the hbase cluster does not have to be online.
————————————————-

The main difference between Exporting a Snapshot and Copying/Exporting a table is that ExportSnapshot operates at HDFS level.

This means that Master and Region Servers are not involved in this operations.

Consequently, no unnecessary caches for data are created and there is no triggering of additional GC pauses due to the number of objects created during the scan process.

So, when Exporting snapshot, the job is not longer CPU bound.

Please see :

  • http://hbase.apache.org/book.html#ops.snapshots
  • http://blog.cloudera.com/blog/2013/03/introduction-to-apache-hbase-snapshots/
  • http://www.cloudera.com/content/cloudera/en/documentation/core/v5-3-x/topics/cm_bdr_managing_hbase_snapshots.html
  • Using snapshots, i was able to Export my hbase table from Source cluster with EMR 3.0.0 AMI to Target cluster with latest(at the time of writing this post) EMR 3.9.0 AMI.

    This job was considerably very fast than Hbase Export.

    Prepping source and target clusters:

    ————————————————-

    —Enabling snapshots on Hbase—

    You must add the following property to hbase-site.xml of HBase Master node:
    ‘hbase.snapshot.enabled’ property with value ‘true’.

    /home/hadoop/hbase/conf/hbase-site.xml

    Stop the HMaster process.

    > sudo service hbase-master stop

    service-nanny will restart this process again. This will load the new configuration.

    —Enabling communication between source and Target clusters—

    To transfer snapshot from source cluster to target cluster,

    The source cluster need to communicate with target node’s name-node process listening on hdfs default port 9000(hdfs getconf -confKey fs.default.name)

    if both these clusters are using default EMR security groups, then they can communicate by default.Otherwise you may need to enable the security groups to allow communication over that port.

    (Check telnet 9000 from source cluster’s master)

    ————————————————-

    —Creating a snapshot on Source cluster—

    On source cluster(EMR 3.0.0 )

    > hbase shell
    hbase(main):001:0> snapshot 'sourceTable', 'snapshotName'

    check if the snapshot is created at HDFS /hbase/.hbase-snapshot/

    —Exporting this snapshot(snapshotName) to Hdfs /hbase/.hbase-snapshot/ of Target cluster(EMR 3.9.0)—

    with Master node’s internal IP 172.31.42.191
    listening on HDFS port 9000.
    Using 16 mappers
    with ‘ExportSnapshot’

    > hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot snapshotName -copy-to hdfs://172.31.42.191:9000/hbase -mappers 16

    ————————————————-

    On Target cluster, check if the snapshot is exported

    > hadoop fs -ls /hbase/.hbase-snapshot/

    —Clone this snapshot to actual Hbase table on Target cluster—

    > hbase shell
    hbase(main):001:0> clone_snapshot 'snapshotName', 'newTableName'

    ————————————————-

    Installing and using Apache sqoop to export/Import large datasets (MySQL, S3) (CSV/TSV..) on EMR cluster

    Written by mannem on . Posted in EMR || Elastic Map Reduce

    emremr

    Often times the export/import activity may be limited on several performance bottlenecks. So, the activity may be faster if a distributed transfer is used instead of normal transfer. Some of the bottlenecks include Read Throughput , Write throughput , how the code parses the data(Inline or Batch etc,. Apache Sqoop(TM) is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.

    http://sqoop.apache.org/

    This guide shows you
    > To Install sqoop and
    > Export/Import MySQL tables (from S3 to RDS) ,(from RDS to S3) respectively.

    Considering a sample MySQL Table in RDS

    > mysql -h myrds.crezaaaruhfx.us-west-2.rds.amazonaws.com -u mannem -p

    mysql> describe dailyStockPrices;

    mysql> SELECT table_name AS "Table",
    -> round(((data_length + index_length) / 1024 / 1024), 2) "Size in MB"
    -> FROM information_schema.TABLES
    -> WHERE table_schema = "pipetest"
    -> AND table_name = "dailyStockPrices";

    > My MySQL table dataset has lot of commas in the fields, so I choose TSV format instead of CSV to import/export.
    > If I used CSV format, Sqoop will get confused parsing data.

    Sqoop on EMR 4.4.0 + is pre-installed

    Starting from EMR AMI version 4.4.0 , Sqoop 1.4.6 is available as sandbox. This can be installed by simply selecting this option while provisioning the EMR cluster. By default, Sqoop on EMR has a MariaDB and PostgresSQL driver installed. To install an alternate set of JDBC connectors for Sqoop, you need to install them in /usr/lib/sqoop/lib.

    http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-sandbox.html#emr-sqoop

    Sqoop on EMR 3.x.x can be installed with the following script:

    Import Commands:

    Import sqoop
    This command copies MySQL table from RDS to S3. The S3 file content type is TSV & File name will be in “part-m-00000” format.
    Note that with -m 1 , I am using single mapper task to run in parallel.
    sqoop import --connect jdbc:mysql://myrds.crezaaaruhfx.us-west-2.rds.amazonaws.com/pipetest --username mannem --password Password123 --table dailyStockPrices --target-dir s3://mannem/sqoopmatrix -m 1 --fields-terminated-by '\t' --lines-terminated-by '\n'
    Check S3 contents
    hadoop fs -cat s3://mannem/sqoopmatrix/part-m-00000
    Sqoop command usage:
    http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

    Export commands :

    Before export, The Destination MySQL/PSQL table should already be created with a similar schema.
    Export to RDS(MySQL)
    This command copies TSV file from S3 to MySQL Table.
    sqoop export --connect jdbc:mysql://myrds.crezaaaruhfx.us-west-2.rds.amazonaws.com/pipetest --username mannem --password Password123 --table dailyStockPrices --input-fields-terminated-by '\t' --input-lines-terminated-by '\n' --export-dir s3://mannem/sqoopmatrix/part-m-00000
    Export to Redshift(PSQL)
    sqoop export --connect jdbc:redshift://$MYREDSHIFTHOST:5439/mydb --table mysqoopexport --export-dir s3://mybucket/myinputfiles/ --driver com.amazon.redshift.jdbc41.Driver --username master --password Mymasterpass1
    Export commands with mariadb connection string
    sqoop export --connect jdbc:mariadb://$HOSTNAME:3306/mydb --table mysqoopexport --export-dir s3://mybucket/myinputfiles/ --driver org.mariadb.jdbc.Driver --username master --password Mymasterpass1
    Export with using Secure Socket Layer encryption
    sqoop export --connect jdbc:mariadb://$HOSTNAME:3306/mydb?verifyServerCertificate=false&useSSL=true&requireSSL=true --table mysqoopexport --export-dir s3://mybucket/myinputfiles/ --driver org.mariadb.jdbc.Driver --username master --password Mymasterpass1
    scratch

    How to retrieve Cluster ID / JobFlow ID from EMR master node.

    Written by mannem on . Posted in EMR || Elastic Map Reduce

    You may look at  /mnt/var/lib/info/ on Master node to find lot of info about your EMR cluster setup.

    More specifically /mnt/var/lib/info/job-flow.json contains the jobFlowId or ClusterID.

    I was able to install a JSON parser on my Master node to retrieve jobFlowID.

    # Install JSON PARSER
    # Install pkgs required for jsawk
    wget http://pkgs.repoforge.org/js/js-1.60-1.el6.rf.x86_64.rpm
    wget http://pkgs.repoforge.org/js/js-devel-1.60-1.el6.rf.x86_64.rpm
    sudo yum install -y js-1.60-1.el6.rf.x86_64.rpm
    sudo yum install -y js-devel-1.60-1.el6.rf.x86_64.rpm
    # Install JSAWK curl -L http://github.com/micha/jsawk/raw/master/jsawk > jsawk
    chmod 755 jsawk && sudo mv jsawk /usr/bin/
    # Parse and print job flow ID
    jsawk 'return this.jobFlowId' < /mnt/var/lib/info/job-flow.json