Taskrunner & workergroup threads

Written by mannem on . Posted in Data Pipelines


Task Runner Configuration Options :

--tasks : The number of task poll threads to run simultaneously. Optional. The default is 2. however, these threads are just for polling.

Maximum number of Activities that can be run in parallel by Taskrunner :

If you have 10 Activities(which may be from different Data-pipelines) currently running on that single Taskrunner worker-group , new activities cannot be processed by this taskrunner and will be waiting for the previous activities to finish. i.e Taskrunner tied to a worker-group has a hard limit that it can execute a Max of 10 Activities in Parallel.

For example :

Suppose 10 activities are to be executed. By default poller pulls 2 Tasks(Activities) per second. In the next seconds it pulls another 2 activities. so, at the end of 5 seconds , there will be 10 activities submitted by poller to worker-group.

SqlActivity & RedshiftCopyActivity fails ? Use shellCommandActivity instead

Written by mannem on . Posted in Data Pipelines

There are several limitations of SQLActivity and RedshiftCopyActivity. If the psql/sql commands are too complex, these activities fail to prepare the statements correctly and will throw out some errors which cannot be easily rectified. So, you always have the option to use shellCommandActivity to run your complex script.

This article guides you to create a shell Script and corresponding Data-Pipeline template to run your complex script directly(part 1)or when present in S3(Part 2 ). As the true purpose of Data-Pipelines is automation, The script can also take arguments that you can reference using placeholders.


The following shell script takes arguments referenced in ScriptArguments object of the ShellCommandActivity. Its runs COPY command to copy files from S3 to PostgreRDS. Another example shows a copy from S3 to Redshift.

Run a PSQL command to copy from S3 to PostgreRDS

Run a PSQL command to copy from S3 to Redshift

A sample Pipeline template to copy from s3 to RDSPostgres

You may use a similar definition to copy from S3 to Redshift.


If your script is in S3 and you wanna pass arguments to your script:

Ex: insert into :v1 select * from source where source.id = :v2; -> s3://YourBucket/event_data_yyyymm.sql

$1 -> S3File location
$2 -> Password for Redshift
$3 -> Table Name
$4 -> condition value

A snippet of Definition for ShellCommandActivity can be :

How Data-Pipeline’s RDS to Redshift copy template works ? (and limitations)

Written by mannem on . Posted in Data Pipelines

The template contains 4 Activities.

1. RDSToS3CopyActivity – Creates a CSV file in S3 based on mySQL table.
2. RedshiftTableCreateActivity – Translates MySQL table to PSQL and creates a table(if it does not exist).
3. S3ToRedshiftCopyActivity – Runs a Redshift COPY command.
4. S3StagingCleanupActivity – Cleans up the staging S3 directory.

RedshiftTableCreateActivity is the key activity and where all the complexity lies.

It runs a shell script using ShellCommandActivity which Translates the MySQL table structure to Psql syntax , and creates a table on Redshift with translated table structure. the data-type translation between RDS and Redhsift is provided here

Now, if you dig more into the ShellCommandActivity, The conversion script is downloaded as part of s3://datapipeline-us-east-1/sample-scripts/dbconv.sh , which in-turn downloads the translator python script

curl -O https://s3.amazonaws.com/datapipeline-us-east-1/sample-scripts/mysql_to_redshift.py

You can check out the contents of this script on how exactly it translates.


mysql> create table TestTable (Id int not null , Lname varchar(20));

According to translation table , this activity , translates to a psql script , which runs on the ec2 instance.

Make note of limitations on this script while using it !

mysql_to_redshift.py :

Run Data Pipelines Taskrunner jar forever on Existing Resources

Written by mannem on . Posted in Data Pipelines

When you run Data Pipelines Taskrunner on your own resource, according to the following document , it do not exit out from shell.

To get the Taskrunner detached from the terminal, use nohup , nohup

nohup java -jar /home/ec2-user/TaskRunner.jar —config /home/ec2-user/credentials.json --workerGroup=group-emr --region=us-west-2 --logUri=s3://mannem.ne/foldername &

An alternative is using screen/tmux/byobu, which will keep the shell running, independent of the terminal.

Now, if you want to run this Taskrunner every time your machine Boots, It really depends on the OS distribution of your machine.

On a Amazon linux which are based on RedHat(RHEL) :

Create a script like taskrunner-bootup

Put your script in /etc/init.d/, owned by root and executable. At the top of the script, you can give a directive for chkconfig. Example, the following script is used to start this Taskrunner java application as ec2-user. As user root you can use chkconfig to enable or disable the script at startup,

chkconfig –list taskrunner-bootup
chkconfig –add taskrunner-bootup
and you can use service start/stop taskrunner-bootup

You can also use cloud-init if you wish.

If this is an AMI, you can use USER-DATA script which only works during first start.

Here’s some solutions for

Ubuntu : http://www.askubuntu.com/a/228313
Debian : http://www.cyberciti.biz/tips/linux-how-to-run-a-command-when-boots-up.html

All about AWS Data-Pipelines Taskrunner

Written by mannem on . Posted in Data Pipelines

How Data-Pipeline installs taskrunner on Ec2 instance?

Data-pipeline launches an Ec2 instances on your behalf using with the following user-data script.



> It downloads a script called remote-runner-install which installs the Taskrunner with options passed from Data-Pipelines service.

Here’s how the script looks like:

Now, this script in-turn runs and passes all arguments to aws-datapipeline-taskrunner-v2.sh. aws-datapipeline-taskrunner-v2.sh script is responsible for running the task runner by invoking the actual TaskRunner jar.

As you can see , Just like installing taskrunner on existing resources[1], Data-Pipelines runs the command java -cp "$TASKRUNNER_CLASSPATH" amazonaws.datapipeline.taskrunner.Main \
--workerGroup "$WORKER_GROUP" --endpoint "$ENDPOINT" --region "$REGION" --logUri "$LOG_URI" --taskrunnerId "$TASKRUNNER_ID"
, by passing all required arguments to the taskrunner.

Taskrunner process is responsible for polling AWS Data Pipeline service for tasks and then performs those tasks.

Task Lifecycle

Running complex queries on redshift with Data-pipelines

Written by mannem on . Posted in Data Pipelines, Redshift

Sometimes AWS Data-Pipelines SQLActivity may not support complex queries. This is because Data-Pieplines SqlActivity passes this script to JDBS executeStatement(Prepared statement). This script is supposed to be idempotent. So here’s an alternative to run psql/sql commands using Data-Pipelines.

Suppose you have the following psql command,

select 'insert into event_data_' ||to_char(current_date,'yyyymm')|| '_v2 select * from stage_event_data_v2 where event_time::date >= '''||to_char(current_date, 'YYYY-MM')||'-01'' and event_time::date <= '''||last_day(current_date)||''';';

and it should output,

insert into event_data_201511_v2 select * from stage_event_data_v2 where event_time::date >= '2015-11-01' and event_time::date <= '2015-11-30';

This is a valid command in psql and can be successfully executed with workbenches and psql shell.

But using Data-pipelines, executing the above command will throw and error:

ERROR processing query/statement. Error: Parsing failed

This is because the script appears to be changing(not idempotent) when it is executed.

If you have a complex redshift commands and wish to performing operations against Redshift that involve custom logic. You could rather write a program in your favorite language and run it
using ShellCommandActivity. This is a quite valid way of interacting with Redshift.

There are several ways to do this. I am including a shell script and its Data-pipelne template as a reference here.

Sample shell command:

Sample Data-pipelines template:

Some info on the script and Data-pipeline:

1. This script file has 2 arguments (Arg 1 is the sql script that you need to execute , Arg 2 is the Redshift password). These arguments are provided in Data-pipeline shellCommandActivity object using scriptArgument field.

2. The script outputs its result to v2.sql and uploads to s3 bucket (with -t tuples only option), so that you can run the script later.

3. The Data-pipeline template uses the *myRedshiftPass parameter id to hide the password from DataPipelines.

Incremental Load: avoiding data loss

Written by mannem on . Posted in Data Pipelines, Redshift

While copying data from RDS to Redshift..

To avoid data loss, start the ‘Incremental copy template’ before the ‘Full copy’

A sample implementation can be,

Incremental copy scheduled start time – 1:50 PM

Full copy start time – 2:00 PM
A DB Insert – 2:10 PM
Full copy End Time – 4:00 PM

A DB Insert – 4:05 PM

Incremental copy First run – 4:10 PM

> In the above example, the contents of first DB Insert at 2:10 may or may not be included in FULL copy.
> Contents of the second insert will not be included in Full copy.

How to ensure that these new inserts will show up in Redshift database ?

> As the ‘Incremental copy template’ uses TIME SERIES scheduling, the actual ‘Incremental copy activity’ run wont start at scheduled start time(1:50), rather it will start and the end of scheduled start time(4:10). All the DB changes between ‘scheduled start date/time’ and ‘first run of the actual copy activity’ will be copied to redshift.
> So, the first incremental copy run will copy all new DB inserts between 1:50 PM and 4:10 PM to redshift. This includes the contents of two DB inserts which are happening during/after FULL copy activity.

  • cloudformation












  • snap





    Storage Gatewa




    Cloud Front

  • r53

    Route 53











  • sns







    Cloud Search


    App Stream



  • opsworks



    Cloud Watch


    Elastic Beanstalk


    Code Deploy



  • dynamodb