Posts Tagged ‘json’

Converting Concatenated or Streaming JSON -> newline-delimited JSON for Kinesis Anaytics to Athena

Written by mannem on . Posted in Athena, AWS BIG DATA, Kinesis

kinesis

kinesis

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) needs JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream). However records emitting from Kinesis Analytics can only be in concatenated JSON format which cannot be used for Athena. This arcticle guldes you on different options and shows how to convert the JSON with Firehose transformations on Lambda.

Kinesis Analytics being a real time streaming solution , will choose concatenated/streaming JSON (instead of usual newline-delimited JSON) for its JSON output which better supports records which has delimiters like /n within itself. So records like  <code>{“id”: 1}{“id”: 2}</code> are expected and a streaming consumer would have to parse individual records in that case.

https://en.wikipedia.org/wiki/JSON_Streaming

Athena supporting (Hive/HCatalog JsonSerDe and the OpenX SerDe) will be needing JSON records with some kind of delimiter(“newline-delimited JSON”) to identify every record and it will not support concatenated JSON(or JSON stream) like in this case. So, you likely need to some alternatives to eventually work on this concatenated JSON data with Athena.

For a Pipeline like the following who wants to do both Kinesis Analytics & Run Athena , I see 2 options
Current pipeline : (Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> Athena )

————————————————-
1.  They can keep the old pipeline , but need to write and run an additional parser at end of S3 to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) ->  Athena )

The easiest way for the customer to solve this is to use Firehose in-line transformations to add a new line character at the end of every record. Alternatively, the customer could configure a Lambda function to trigger based on the S3 put to do the same thing.

Firehose in-line transformations :

Kinesis Analytics uses PutRecordBatch API(firehose) to emit the records to the Destination like Firehose. Each JSON record is translated to a Firehose record. You should be able to perform in-line transformation to append newline character per record, by writing a custom function on the Lambda or editing some existing blue prints.

http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Sample blog post on transformations : https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

————————————————-

2. You can keep the old pipeline , but need to write and run an additional parser probably a Lambda function to trigger based on the S3 put, to convert concatenated JSON) -> newline-delimited JSON.

(Kinesis Stream -> Kinesis Analytics(JSON output) -> Firehose -> S3 -> StreamingParser(JSON) on Lambda -> Athena

————————————————-
3. Some customers seem to use Analytics as just a plug  which can translate the Kinesis stream records to Firehose(to S3) without writing any additional consumer. They probably need to eliminate Analytics unless it supports JSON with delimiters as output that can be processed using Ahena. They may need to replace it with some consumer with inbuilt streaming parser which can put to S3 with delimited JSON.

(Kinesis Stream -> consumer(can be Lambda) -> Firehose -> S3 -> Athena ) or,
(Kinesis Stream -> consumer(can be Lambda) -> S3 -> Athena )

However, if Analytics in future does support JSON with new line delimiter , they can keep old Pipeline.
————————————————-

Querying DynamoDB export data using Athena (with LIST attribute)

Written by mannem on . Posted in Athena, AWS BIG DATA

Presto supports multiple Array and JSON functions using which you can write queries to get the required results. There is no single way to define a CREATE TABLE and later your QUERIES. You can have CREATE TABLE predefine the schema structure and later your queries can refer the elements you need in the schema. Or you can define your schema as a string and later use functions on your queries to parse this string to get the required results.

https://prestodb.io/docs/current/functions/array.html

https://prestodb.io/docs/current/functions/json.html

In addition, Athena has some examples to use most of the presto functions :

http://docs.aws.amazon.com/athena/latest/ug/querying-arrays.html

http://docs.aws.amazon.com/athena/latest/ug/rows-and-structs.html

http://docs.aws.amazon.com/athena/latest/ug/querying-JSON.html

Now, I was able to use the following CREATE TABLE syntax on the DynamoDB items having List of strings. I was able to flatten out the list of strings using some of the functions like CAST. Please note that this is not the only way to define table and query. My query might be over complexing what your are trying to get. There might be a simpler way as well. So, its really important that you understand the data types that you define and what each query returns and use the correct functions as each function takes a datatype and returns another datatype.