Converting Concatenated or Streaming JSON -> newline-delimited JSON for Kinesis Anaytics to Athena

Written by mannem on . Posted in Athena, AWS BIG DATA, Kinesis

kinesis

kinesis

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) needs JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream). However records emitting from Kinesis Analytics can only be in concatenated JSON format which cannot be used for Athena. This arcticle guldes you on different options and shows how to convert the JSON with Firehose transformations on Lambda.

Kinesis Analytics being a real time streaming solution , will choose concatenated/streaming JSON (instead of usual newline-delimited JSON) for its JSON output which better supports records which has delimiters like /n within itself. So records like  <code>{“id”: 1}{“id”: 2}</code> are expected and a streaming consumer would have to parse individual records in that case.

https://en.wikipedia.org/wiki/JSON_Streaming

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) will be needing JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream) like in this case. So, you likely need to some alternatives to eventually work on this concatenated JSON data with Athena.

For a Pipeline like the following who wants to do both Kinesis Analytics & Run Athena , I see 2 options
Current pipeline : (Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> Athena )

————————————————-
1.  They can keep the old pipeline , but need to write and run an additional parser at end of S3 to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) ->  Athena )

The easiest way for the customer to solve this is to use Firehose in-line transformations to add a new line character at the end of every record. Alternatively, the customer could configure a Lambda function to trigger based on the S3 put to do the same thing.

Firehose in-line transformations :

Kinesis Analytics uses PutRecordBatch API(firehose) to emit the records to the Destination like Firehose. Each JSON record is translated to a Firehose record. You should be able to perform in-line transformation to append newline character per record, by writing a custom function on the Lambda or editing some existing blue prints.

http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Sample blog post on transformations : https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

————————————————-

2. You can keep the old pipeline , but need to write and run an additional parser probably a Lambda function to trigger based on the S3 put, to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) on Lambda -> Athena

————————————————-
3. Some customers seem to use Analytics as just a plug  which can translate the Kinesis stream records to Firehose(to S3) without writing any additional consumer. They probably need to eliminate Analytics unless it supports JSON with delimiters as output that can be processed using Ahena. They may need to replace it with some consumer with inbuilt streaming parser which can put to S3 with delimited JSON.

(Kinesis Stream -> consumer(can be Lambda) -> Firehose -> S3 -> Athena ) or,
(Kinesis Stream -> consumer(can be Lambda) -> S3 -> Athena )

However, if Analytics in future does support JSON with new line delimiter , they can keep old Pipeline.
————————————————-

Using Kinesis agent to push data to multiple streams on multiple AWS accounts

Written by mannem on . Posted in Kinesis

A single Kinesis Agent cannot push data to multiple accounts. So, we need to run multiple independent Agents , one Agent for every account.

This post will discuss about Kinesis agent and guides you run multiple agents on Amazon Ec2 instance. It also have some sample scripts to build and run your own Agent from source.

When you install Kinesis agent as outlined in the following documentation , it runs as a service. kinesis

http://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html#agent-config-settings

The service executes a JAVA class “com.amazon.kinesis.streaming.agent.Agent” with a command like :

A help on this command or AgentOptions would provide the options that you can specify on the agent. So, you could have multiple agents , each running with different configurations. Currently(Oct 2016), each Agent can only have one authentication configuration set. So, each Agent process can only write to streams on just one account. So, with separate config files you can send data to different accounts. The authentication for each agent can be using a IAM user credentials or with an Assumed Role.

Example agent configurations and how to run them :


Building Kinesis Agent :

Instead of running agent as service , If you wish to build and use your own Agent , you can use https://github.com/awslabs/amazon-kinesis-agent .

A sample script to do so.

Push data to AWS Kinesis firehose with AWS API gateway via Service proxy

Written by mannem on . Posted in Kinesis

 

firehose

With API Gateway, developers can create and operate APIs for their back-end services without developing and maintaining infrastructure to handle authorization and access control, traffic management, monitoring and analytics, version management, and software development kit (SDK) generation.

API Gateway is designed for web and mobile developers who are looking to provide secure, reliable access to back-end APIs for access from mobile apps, web apps, and server apps that are built internally or by third-party ecosystem partners. The business logic behind the APIs can either be provided by a publicly accessible endpoint API Gateway proxies call to, or it can be entirely run as a Lambda function.

In this article, we will create an Publicly accessible API endpoint on which your application can issue POST requests. Via Service proxy, the contents of this post request go to Firehose as PutRecord API call and eventually the data goes to S3/Redshift/ES-Cluster based on your Firehose settings. Usage of service proxy eliminates invoking an AWS Lambda function.

The end result would be :

1. Your application issues a POST request to the API gateway endpoint that you create –

Ex:

2. The API gateway translates and authenticates this request as PutRecord API call via Service proxy and puts data “SampleDataStringToFirehose” into your Firehose.

3. The Firehose eventually hydrates the destination (Either S3 or Redshift) with the data from your POST requests.


Here’s step by step walkthrough on setting this up:

This walkthrough assumes you had explored other walkthrough’s in http://docs.aws.amazon.com/apigateway/latest/developerguide/getting-started-intro.html
1. Creating Gateway:

> Create an API Gateway by going through the web console.
> Create a resource under that API and create a POST method.
> In this method, choose integration type as Advanced and select “AWS Service Proxy”.
> Method settings:

Select desired Region,
Service as Firehose ,
Leave subdomain empty ,
Http Method -> POST,
Ation -> PutRecord
Role -> ARN of the role that can be assumed by API gateway and had policies to allow at-least ‘PutRecord’ action on your firehose. A sample role which allows all actions – is attached later.
Ex: arn:aws:iam::618548141234:role/RoleToAllowPUTsOnFirehose

Confused? you can also checkout a sample role creation here: http://docs.aws.amazon.com/apigateway/latest/developerguide/getting-started-aws-proxy.html#getting-started-aws-proxy-add-roles

2. Testing:

Save this method and TEST the method with following request body that can be found on PutRecord API call webpage.

Replace ‘test’ with your Firehose stream name.

http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecord.html

3. Verify S3 contents:

Now if you see the S3 contents that the firehose is supposed to hydrate (after s3 buffer interval or Buffer size, which ever satisfied first) ,

The contents will be binary format like äfiõÚ)≤ÈøäœÏäjeÀ˜nöløµÏm˛áˇ∂ø¶∏ß∂)‡ which isn’t the data that you just pushed via API call.

This is because the Firehose expects the datablob to be encoded in Base64. (This can be confirmed by running ( aws firehose put-record --delivery-stream-name test --debug --region us-west-2 --record Data=SampleDataStringToFirehose ) , which automatically encodes the data blob in base64 before sending the request ). While we mention ‘SampleDataStringToFirehose’ as data , we see AWS CLI actually sends ‘U2FtcGxlRGF0YVN0cmluZ1RvRmlyZWhvc2U=’

{‘body’: ‘{“Record”: {“Data”: “U2FtcGxlRGF0YVN0cmluZ1RvRmlyZWhvc2U=”}, “DeliveryStreamName”: “test”}’ ,

where base64-encoded(SampleDataStringToFirehose) = ‘U2FtcGxlRGF0YVN0cmluZ1RvRmlyZWhvc2U=’

So, You need to apply transformations on your POST payload to encode the Data in base64.

You can use a $util variable like $util.base64Encode() to encode in base64 at the API Gateway layer.

4. Applying transformations:

Using transformations, you can modify the JSON schema during the request and response cycles.

By defining a mapping template, the request and response payloads can be transformed to reflect a custom schema.

For a request body like-

Here’s a sample mapping template that I created checking documentation (application/json):

Usage:

> While testing a Resource -> Integration Request -> Add mapping template -> Content-Type = application/json
> Instead of Input passthrough, use mapping template to paste your template and save.
> Now test with a request body similar to what you had used before, and Verify in the Logs section – “Method request body after transformations” ,
it should look like

> You may need to modify the mapping template, so that include whatever payload you want for your application.
> Instead of using these transformations on API GW, you can also choose your client to encode the data before framing a request to API GW.

5. Deployment:

Now that we have a working method that can issue PutRecord to Firehose, we deploy the API to get a publicly accessible HTTP endpoint to issue POST requests. Your application can issue POST requests on this Endpoint and contents of this post requests go to Firehose as PutRecord API call and eventually the data goes to S3/Redshift based on your firehose settings.

Make sure you include Content-Type: application/json header in the POST request. You can also try application/x-amz-json-1.1

6. Monitor and Extend:
  • Monitoring – Check Cloudwatch monitoring tab on AWS Firehose for incoming Records and Bytes. You can also verify Cloudwatch Logs to verify failures.
  • Of course you verify the contents of the S3 bucket / Redshift tables / ES cluster
  • Extend – You may extend the functionality to work with other API calls on other AWS Services required by your client App. Similar setup can be used to POST data to Kinesis streams from your Applications.

scratch

A sample role :