hive server 2 crashing with OutOfMemoryError (OOM) ?

Written by mannem on . Posted in EMR || Elastic Map Reduce

emr ear  

Often times HiveServer2 can be single point of failure. It can easy crash with OOM. If HiveServer2 restarts now-and-then , it must be due to OOM where it is likely set to be killed and re-spawned. We need to check the JVM options to see the behavior of this process on OOM , analyze thread dumps and check the underlying issue to mitigate such issues.

This post explains – How to identify OOM issue on HiveServer2 – How to analyze Thread/Heap dumps – How to Mitigate this issue. Some of the discussion is generic to EMR.

PART 1 : How to identify OOM issue on HiveServer2

Default JVM options of HS2 on EMR 4.7.2 AMI :

Some observations from above JVM:

1. Heap space for HiveServer2 JVM is 1000MB and this option is sourced from hive-env.sh.
2. Hive logging options like(-Dhive.log.dir=/var/log/hive -Dhive.log.file=hive-server2.log -Dhive.log.threshold=INFO -server) are sourced from /etc/init/hive-server2.conf
3. All other options starting from ‘-server’ are set by EMR.
4. If this JVM goes OutOfMemoryError , -XX:OnOutOfMemoryError=kill -9 %p option will kill HS2 on OOM.

If for some reason the HiveServer2 goes OOM , it simply gets killed and there is no way to find why.. hive-server2.log and hive-server2.out may not show OOM at all. you can check hive-server2.log to verify HiveServer2 getting restarted . So, we tweak JVM options for HS2 to include GC verbose and to Dump Heap On OutOfMemoryError.

With the following settings , if the HS2 process does go OOM , it would display the error like following on /var/log/hive/hive-server2.out. This message means that for some reason the garbage collector of this HS2 JVM is taking an excessive amount of time and recovers very little memory in each run.

An example script to enable GC verbose on HS2 JVM – can be like following on EMR 4.x.x and 5.x.x:



PART 2 : How to analyze Thread/Heap dumps

With above options if HS2 fails with OOM , it will log such error to /var/log/hive/hive-server2.out. It will also heap dump to a file like /var/log/hive/java_pid8548.hprof . There are multiple ways to open and analyze this file. I found “eclipse memory analyzer tool” helpful in identifying memory leaks and exploring thread dumps on java classes and sub-classes.

generatedata

generatedata



PART 3 : How to Mitigate this issue

1. Tweaking hive-site.xml by checking stack trace:

Best way to mitigate the OOM’s is to check the stack trace of the Hive-server2 process and see if there’s any leaks. It is also a good idea to check for top consumers and act accordingly. For each of those threads , see if any of the hive-site.xml settings would reduce its memory consumption. Some times you may not have control on any of the thread specifics , in which case , you may need to further increase the heap space.

For example : In the stack trace ,

– if you see multiple threads getting blocked etc, you can edit a setting like hive.server2.thrift.http.max.worker.threads

2. Mitigating OOM obviously involves increasing the Heap space for Hive-server2.

The heap space is defined in hive-env.sh using env variable . Its important to identify memory requirements of 3 hive services HiveServer2 , Hive metastore , Hive clients in advance , using load/concurrency tests before moving to PROD. If you observe any OOM’s , you may need to increase memory accordingly.

Identifying hive services and increasing memory :

if ‘export HADOOP_HEAPSIZE=2048’ string is present on hive-env.sh , it would be applied to all of the 3 hive services if restarted. So, you can use if-statements to provide different settings(HADOOP_HEAPSIZE and HADOOP_OPTS) for 3 of these hive services.

Example contents of hive-env.sh :

3. High availability and LoadBalancing of Hiveserver2 :

Increasing heap space for HS2 may help , However , As per Cloudera , after certain memory limit, there is possibility that you continue to hit OOM no mater how much you increase the memory for HS2. In that case, “Cloudera recommends splitting HiveServer2 into multiple instances and load balancing once you start allocating >12 GB to HiveServer2. The objective is to size to reduce impact of Java garbage collection on active processing by the service.”

You may checkout the limitations here : https://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_hive_install.html#concept_alp_4kl_3q

4. Watch out for any bugs in the LockManager :

https://hive.apache.org/javadocs/r2.0.0/api/org/apache/hadoop/hive/ql/lockmgr/

If you are using Hive’s Table Lock Manager Service by setting hive.support.concurrency to true , check if there’s issues with the Lockmanager which is responsible for maintaining locks for concurrent user support. Lockmanager can be ZooKeeperHiveLockManager based or can be EmbeddedLockManager which is shared lock manager for dedicated hive server , where – all locks are managed in memory. if the issue seems to be with these lock managers , you may need to edit their configs.

5. Check for generic version issues :

On Apache JIRA issues for hive-server2.

6. GC Tuning :

If the HS2 JVM is spending too much time on GC , you might consider some GC Tuning as discussed here : http://www.cubrid.org/blog/dev-platform/how-to-tune-java-garbage-collection/



spark-redshift library from databricks – Installation on EMR and using InstanceProfile

Written by mannem on . Posted in EMR || Elastic Map Reduce

ear earearspark-redshift is a library to load data into Spark SQL DataFrames from Amazon Redshift, and write them back to Redshift tables. Amazon S3 is used to efficiently transfer data in and out of Redshift, and JDBC is used to automatically trigger the appropriate COPY and UNLOAD commands on Redshift.This library is more suited to ETL than interactive queries, since large amounts of data could be extracted to S3 for each query execution

spark-redshift installation instructions on EMR:
Steps 1-8 shows how to compile your own spark-redshift package(JAR). You can directly skip to step 9 if you wish to use pre-compiled JAR’s(V 0.6.1). Later we use spark-shell to invoke these JAR’s and run scala code to query Redshift table and put contents into a dataframe. This guide assumes you had followed github page https://github.com/databricks/spark-redshift and its tutorial.

Download pre-req’s to compile the your own JAR’s

————————————————-

If you wanna skip above steps , you can download the pre-compiled JAR’s using –

wget https://s3.amazonaws.com/emrsupport/spark/spark-redshift-databricks/minimal-json-0.9.5-SNAPSHOT.jar

wget https://s3.amazonaws.com/emrsupport/spark/spark-redshift-databricks/spark-redshift_2.10-0.6.1-SNAPSHOT.jar

With these JAR’s you can skip all above options (1-8)

————————————————-

10. In SCALA shell, you can run the following commands to init SQLContext. Note that the below code automatically uses IAM role’s (Instance profile ) cred’s to authenticate against S3

Sample Scala code uses Instance profile

————————————————-


Sample spark-sql invocation

spark-sql --jars RedshiftJDBC41-1.1.13.1013.jar,spark-redshift_2.10-0.6.1-SNAPSHOT.jar,minimal-json-0.9.5-SNAPSHOT.jar


Note that while running the above commands , the spark-redshift executes a COPY/Unload with a command like below:

These temporary credentials are from IAM role’s (EMR_EC2_DefaultRole) and this role should have policy that allows S3 access to atleast temp bucket mentioned in the command.


SqlActivity & RedshiftCopyActivity fails ? Use shellCommandActivity instead

Written by mannem on . Posted in Data Pipelines

There are several limitations of SQLActivity and RedshiftCopyActivity. If the psql/sql commands are too complex, these activities fail to prepare the statements correctly and will throw out some errors which cannot be easily rectified. So, you always have the option to use shellCommandActivity to run your complex script.

This article guides you to create a shell Script and corresponding Data-Pipeline template to run your complex script directly(part 1)or when present in S3(Part 2 ). As the true purpose of Data-Pipelines is automation, The script can also take arguments that you can reference using placeholders.

PART 1:

The following shell script takes arguments referenced in ScriptArguments object of the ShellCommandActivity. Its runs COPY command to copy files from S3 to PostgreRDS. Another example shows a copy from S3 to Redshift.

Run a PSQL command to copy from S3 to PostgreRDS

Run a PSQL command to copy from S3 to Redshift

A sample Pipeline template to copy from s3 to RDSPostgres

You may use a similar definition to copy from S3 to Redshift.


PART 2:

If your script is in S3 and you wanna pass arguments to your script:

Ex: insert into :v1 select * from source where source.id = :v2; -> s3://YourBucket/event_data_yyyymm.sql

$1 -> S3File location
$2 -> Password for Redshift
$3 -> Table Name
$4 -> condition value

A snippet of Definition for ShellCommandActivity can be :

How Data-Pipeline’s RDS to Redshift copy template works ? (and limitations)

Written by mannem on . Posted in Data Pipelines

The template contains 4 Activities.

1. RDSToS3CopyActivity – Creates a CSV file in S3 based on mySQL table.
2. RedshiftTableCreateActivity – Translates MySQL table to PSQL and creates a table(if it does not exist).
3. S3ToRedshiftCopyActivity – Runs a Redshift COPY command.
4. S3StagingCleanupActivity – Cleans up the staging S3 directory.

RedshiftTableCreateActivity is the key activity and where all the complexity lies.

It runs a shell script using ShellCommandActivity which Translates the MySQL table structure to Psql syntax , and creates a table on Redshift with translated table structure. the data-type translation between RDS and Redhsift is provided here

http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-template-redshift.html
Now, if you dig more into the ShellCommandActivity, The conversion script is downloaded as part of s3://datapipeline-us-east-1/sample-scripts/dbconv.sh , which in-turn downloads the translator python script

curl -O https://s3.amazonaws.com/datapipeline-us-east-1/sample-scripts/mysql_to_redshift.py

You can check out the contents of this script on how exactly it translates.

Ex:

mysql> create table TestTable (Id int not null , Lname varchar(20));

According to translation table , this activity , translates to a psql script , which runs on the ec2 instance.

Make note of limitations on this script while using it !

mysql_to_redshift.py :

Run Data Pipelines Taskrunner jar forever on Existing Resources

Written by mannem on . Posted in Data Pipelines

When you run Data Pipelines Taskrunner on your own resource, according to the following document , it do not exit out from shell.

To get the Taskrunner detached from the terminal, use nohup , nohup

nohup java -jar /home/ec2-user/TaskRunner.jar —config /home/ec2-user/credentials.json --workerGroup=group-emr --region=us-west-2 --logUri=s3://mannem.ne/foldername &

An alternative is using screen/tmux/byobu, which will keep the shell running, independent of the terminal.


Now, if you want to run this Taskrunner every time your machine Boots, It really depends on the OS distribution of your machine.

On a Amazon linux which are based on RedHat(RHEL) :

Create a script like taskrunner-bootup

Put your script in /etc/init.d/, owned by root and executable. At the top of the script, you can give a directive for chkconfig. Example, the following script is used to start this Taskrunner java application as ec2-user. As user root you can use chkconfig to enable or disable the script at startup,

chkconfig –list taskrunner-bootup
chkconfig –add taskrunner-bootup
and you can use service start/stop taskrunner-bootup

You can also use cloud-init if you wish.

If this is an AMI, you can use USER-DATA script which only works during first start.

Here’s some solutions for

Ubuntu : http://www.askubuntu.com/a/228313
Debian : http://www.cyberciti.biz/tips/linux-how-to-run-a-command-when-boots-up.html

Push data to AWS Kinesis firehose with AWS API gateway via Service proxy

Written by mannem on . Posted in Kinesis

 

firehose

With API Gateway, developers can create and operate APIs for their back-end services without developing and maintaining infrastructure to handle authorization and access control, traffic management, monitoring and analytics, version management, and software development kit (SDK) generation.

API Gateway is designed for web and mobile developers who are looking to provide secure, reliable access to back-end APIs for access from mobile apps, web apps, and server apps that are built internally or by third-party ecosystem partners. The business logic behind the APIs can either be provided by a publicly accessible endpoint API Gateway proxies call to, or it can be entirely run as a Lambda function.

In this article, we will create an Publicly accessible API endpoint on which your application can issue POST requests. Via Service proxy, the contents of this post request go to Firehose as PutRecord API call and eventually the data goes to S3/Redshift/ES-Cluster based on your Firehose settings. Usage of service proxy eliminates invoking an AWS Lambda function.

The end result would be :

1. Your application issues a POST request to the API gateway endpoint that you create –

Ex:

2. The API gateway translates and authenticates this request as PutRecord API call via Service proxy and puts data “SampleDataStringToFirehose” into your Firehose.

3. The Firehose eventually hydrates the destination (Either S3 or Redshift) with the data from your POST requests.


Here’s step by step walkthrough on setting this up:

This walkthrough assumes you had explored other walkthrough’s in http://docs.aws.amazon.com/apigateway/latest/developerguide/getting-started-intro.html
1. Creating Gateway:

> Create an API Gateway by going through the web console.
> Create a resource under that API and create a POST method.
> In this method, choose integration type as Advanced and select “AWS Service Proxy”.
> Method settings:

Select desired Region,
Service as Firehose ,
Leave subdomain empty ,
Http Method -> POST,
Ation -> PutRecord
Role -> ARN of the role that can be assumed by API gateway and had policies to allow at-least ‘PutRecord’ action on your firehose. A sample role which allows all actions – is attached later.
Ex: arn:aws:iam::618548141234:role/RoleToAllowPUTsOnFirehose

Confused? you can also checkout a sample role creation here: http://docs.aws.amazon.com/apigateway/latest/developerguide/getting-started-aws-proxy.html#getting-started-aws-proxy-add-roles

2. Testing:

Save this method and TEST the method with following request body that can be found on PutRecord API call webpage.

Replace ‘test’ with your Firehose stream name.

http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecord.html

3. Verify S3 contents:

Now if you see the S3 contents that the firehose is supposed to hydrate (after s3 buffer interval or Buffer size, which ever satisfied first) ,

The contents will be binary format like äfiõÚ)≤ÈøäœÏäjeÀ˜nöløµÏm˛áˇ∂ø¶∏ß∂)‡ which isn’t the data that you just pushed via API call.

This is because the Firehose expects the datablob to be encoded in Base64. (This can be confirmed by running ( aws firehose put-record --delivery-stream-name test --debug --region us-west-2 --record Data=SampleDataStringToFirehose ) , which automatically encodes the data blob in base64 before sending the request ). While we mention ‘SampleDataStringToFirehose’ as data , we see AWS CLI actually sends ‘U2FtcGxlRGF0YVN0cmluZ1RvRmlyZWhvc2U=’

{‘body’: ‘{“Record”: {“Data”: “U2FtcGxlRGF0YVN0cmluZ1RvRmlyZWhvc2U=”}, “DeliveryStreamName”: “test”}’ ,

where base64-encoded(SampleDataStringToFirehose) = ‘U2FtcGxlRGF0YVN0cmluZ1RvRmlyZWhvc2U=’

So, You need to apply transformations on your POST payload to encode the Data in base64.

You can use a $util variable like $util.base64Encode() to encode in base64 at the API Gateway layer.

4. Applying transformations:

Using transformations, you can modify the JSON schema during the request and response cycles.

By defining a mapping template, the request and response payloads can be transformed to reflect a custom schema.

For a request body like-

Here’s a sample mapping template that I created checking documentation (application/json):

Usage:

> While testing a Resource -> Integration Request -> Add mapping template -> Content-Type = application/json
> Instead of Input passthrough, use mapping template to paste your template and save.
> Now test with a request body similar to what you had used before, and Verify in the Logs section – “Method request body after transformations” ,
it should look like

> You may need to modify the mapping template, so that include whatever payload you want for your application.
> Instead of using these transformations on API GW, you can also choose your client to encode the data before framing a request to API GW.

5. Deployment:

Now that we have a working method that can issue PutRecord to Firehose, we deploy the API to get a publicly accessible HTTP endpoint to issue POST requests. Your application can issue POST requests on this Endpoint and contents of this post requests go to Firehose as PutRecord API call and eventually the data goes to S3/Redshift based on your firehose settings.

Make sure you include Content-Type: application/json header in the POST request. You can also try application/x-amz-json-1.1

6. Monitor and Extend:
  • Monitoring – Check Cloudwatch monitoring tab on AWS Firehose for incoming Records and Bytes. You can also verify Cloudwatch Logs to verify failures.
  • Of course you verify the contents of the S3 bucket / Redshift tables / ES cluster
  • Extend – You may extend the functionality to work with other API calls on other AWS Services required by your client App. Similar setup can be used to POST data to Kinesis streams from your Applications.

scratch

A sample role :

Search DynamoDB tables using Elasticsearch/Kibana via Logstash plugin

Written by mannem on . Posted in Dynamo DB, Elasticsearch


The Logstash plugin for Amazon DynamoDB gives you a nearly real-time view of the data in your DynamoDB table. The Logstash plugin for DynamoDB uses DynamoDB Streams to parse and output data as it is added to a DynamoDB table. After you install and activate the Logstash plugin for DynamoDB, it scans the data in the specified table, and then it starts consuming your updates using Streams and then outputs them to Elasticsearch, or a Logstash output of your choice.

Logstash is a data pipeline service that processes data, parses data, and then outputs it to a selected location in a selected format. Elasticsearch is a distributed, full-text search server. For more information about Logstash and Elasticsearch, go to https://www.elastic.co/products/elasticsearch.

Amazon Elasticsearch Service is a managed service that makes it easy to deploy, operate, and scale Elasticsearch in the AWS Cloud. aws.amazon.com/elasticsearch-service/


This article includes an installation guide that is tested on EC2 instance where all the per-requsites are installed and Logstash is configured so that it connects to Amazon ElasticSearch using the input/Output plugins to start indexing records from DynamoDB. Click here to get all the instructions :
https://github.com/mannem/logstash-input-dynamodb


Logstash configuration:

After running a similar command on the shell, Logstash should successfully start and begin indexing the records from your DynamoDB table.


Throughput considerations:


Kibana:


References:
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Tools.DynamoDBLogstash.html

https://aws.amazon.com/blogs/aws/new-logstash-plugin-search-dynamodb-content-using-elasticsearch/

https://github.com/awslabs/logstash-input-dynamodb


Similar plugins:

https://github.com/kzwang/elasticsearch-river-dynamodb

All about AWS Data-Pipelines Taskrunner

Written by mannem on . Posted in Data Pipelines

How Data-Pipeline installs taskrunner on Ec2 instance?

Data-pipeline launches an Ec2 instances on your behalf using with the following user-data script.

————————————————-

————————————————-

> It downloads a script called remote-runner-install which installs the Taskrunner with options passed from Data-Pipelines service.

Here’s how the script looks like:

Now, this script in-turn runs and passes all arguments to aws-datapipeline-taskrunner-v2.sh. aws-datapipeline-taskrunner-v2.sh script is responsible for running the task runner by invoking the actual TaskRunner jar.


As you can see , Just like installing taskrunner on existing resources[1], Data-Pipelines runs the command java -cp "$TASKRUNNER_CLASSPATH" amazonaws.datapipeline.taskrunner.Main \
--workerGroup "$WORKER_GROUP" --endpoint "$ENDPOINT" --region "$REGION" --logUri "$LOG_URI" --taskrunnerId "$TASKRUNNER_ID"
, by passing all required arguments to the taskrunner.

Taskrunner process is responsible for polling AWS Data Pipeline service for tasks and then performs those tasks.

Task Lifecycle

Using a single hive warehouse for all EMR(Hadoop) clusters

Written by mannem on . Posted in EMR || Elastic Map Reduce

As the EMR/Hadoop cluster’s are transient, tracking all those databases and tables across clusters may be difficult. So, Instead of having different warehouse directories across clusters, You can use a single permanent hive warehouse across all EMR clusters. S3 would be a great choice as it is persistent storage and had robust architecture providing redundancy and read-after-write consistency.

For each cluster:

This can be configured using hive.metastore.warehouse.dir property on hive-site.xml.

You may need to update this setting on all nodes.

On a single hive session:

this can be configured using a command like set hive.metastore.warehouse.dir ="s3n://bucket/hive_warehouse"

or initialize hive cli with the following invocation -hiveconf hive.metastore.warehouse.dir=s3n://bucket/hive_warehouse

Note that using above configuration, all default databases and tables will be stored on s3 on path like s3://bucket/hive_warehouse/myHiveDatabase.db/

Running complex queries on redshift with Data-pipelines

Written by mannem on . Posted in Data Pipelines, Redshift

Sometimes AWS Data-Pipelines SQLActivity may not support complex queries. This is because Data-Pieplines SqlActivity passes this script to JDBS executeStatement(Prepared statement). This script is supposed to be idempotent. So here’s an alternative to run psql/sql commands using Data-Pipelines.

Suppose you have the following psql command,

select 'insert into event_data_' ||to_char(current_date,'yyyymm')|| '_v2 select * from stage_event_data_v2 where event_time::date >= '''||to_char(current_date, 'YYYY-MM')||'-01'' and event_time::date <= '''||last_day(current_date)||''';';

and it should output,

insert into event_data_201511_v2 select * from stage_event_data_v2 where event_time::date >= '2015-11-01' and event_time::date <= '2015-11-30';

This is a valid command in psql and can be successfully executed with workbenches and psql shell.

But using Data-pipelines, executing the above command will throw and error:

ERROR processing query/statement. Error: Parsing failed

This is because the script appears to be changing(not idempotent) when it is executed.

If you have a complex redshift commands and wish to performing operations against Redshift that involve custom logic. You could rather write a program in your favorite language and run it
using ShellCommandActivity. This is a quite valid way of interacting with Redshift.

There are several ways to do this. I am including a shell script and its Data-pipelne template as a reference here.

Sample shell command:

Sample Data-pipelines template:


-------------------------------------------------
Some info on the script and Data-pipeline:

1. This script file has 2 arguments (Arg 1 is the sql script that you need to execute , Arg 2 is the Redshift password). These arguments are provided in Data-pipeline shellCommandActivity object using scriptArgument field.

2. The script outputs its result to v2.sql and uploads to s3 bucket (with -t tuples only option), so that you can run the script later.

3. The Data-pipeline template uses the *myRedshiftPass parameter id to hide the password from DataPipelines.
-------------------------------------------------