Archive for October 10, 2019

Converting Concatenated or Streaming JSON -> newline-delimited JSON for Kinesis Anaytics to Athena

Written by mannem on . Posted in Athena, AWS BIG DATA, Kinesis

kinesis

kinesis

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) needs JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream). However records emitting from Kinesis Analytics can only be in concatenated JSON format which cannot be used for Athena. This arcticle guldes you on different options and shows how to convert the JSON with Firehose transformations on Lambda.

Kinesis Analytics being a real time streaming solution , will choose concatenated/streaming JSON (instead of usual newline-delimited JSON) for its JSON output which better supports records which has delimiters like /n within itself. So records like  <code>{“id”: 1}{“id”: 2}</code> are expected and a streaming consumer would have to parse individual records in that case.

https://en.wikipedia.org/wiki/JSON_Streaming

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) will be needing JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream) like in this case. So, you likely need to some alternatives to eventually work on this concatenated JSON data with Athena.

For a Pipeline like the following who wants to do both Kinesis Analytics & Run Athena , I see 2 options
Current pipeline : (Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> Athena )

————————————————-
1.  They can keep the old pipeline , but need to write and run an additional parser at end of S3 to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) ->  Athena )

The easiest way for the customer to solve this is to use Firehose in-line transformations to add a new line character at the end of every record. Alternatively, the customer could configure a Lambda function to trigger based on the S3 put to do the same thing.

Firehose in-line transformations :

Kinesis Analytics uses PutRecordBatch API(firehose) to emit the records to the Destination like Firehose. Each JSON record is translated to a Firehose record. You should be able to perform in-line transformation to append newline character per record, by writing a custom function on the Lambda or editing some existing blue prints.

http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Sample blog post on transformations : https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

————————————————-

2. You can keep the old pipeline , but need to write and run an additional parser probably a Lambda function to trigger based on the S3 put, to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) on Lambda -> Athena

————————————————-
3. Some customers seem to use Analytics as just a plug  which can translate the Kinesis stream records to Firehose(to S3) without writing any additional consumer. They probably need to eliminate Analytics unless it supports JSON with delimiters as output that can be processed using Ahena. They may need to replace it with some consumer with inbuilt streaming parser which can put to S3 with delimited JSON.

(Kinesis Stream -> consumer(can be Lambda) -> Firehose -> S3 -> Athena ) or,
(Kinesis Stream -> consumer(can be Lambda) -> S3 -> Athena )

However, if Analytics in future does support JSON with new line delimiter , they can keep old Pipeline.
————————————————-