Using AWS Athena to query S3 Server Access Logs.
S3 server access logs can grow very big over time and it is very hard for a single machine to Process/Query/Analyze all these logs. So, we can use distributed computing to query the logs quickly. Ideally we might think of Apache Hadoop/Hive/Spark/Presto etc to process these logs. But, the simplicity of AWS Athena service as a Serverless model will make it even easier. This article will guide you to use Athena to process your s3 access logs with example queries and has some partitioning considerations which can help you to query TB’s of logs just in few seconds.
The server access log files consist of a sequence of new-line delimited
log records. Each log record represents one request and consists of space delimited fields
. The following is an example log consisting of six log records.
Sample Data:
1 2 3 4 5 6 |
79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 3E57427F3EXAMPLE REST.GET.VERSIONING - "GET /mybucket?versioning HTTP/1.1" 200 - 113 - 7 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 891CE47D2EXAMPLE REST.GET.LOGGING_STATUS - "GET /mybucket?logging HTTP/1.1" 200 - 242 - 11 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be A1206F460EXAMPLE REST.GET.BUCKETPOLICY - "GET /mybucket?policy HTTP/1.1" 404 NoSuchBucketPolicy 297 - 38 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:01:00 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 7B4A0FABBEXAMPLE REST.GET.VERSIONING - "GET /mybucket?versioning HTTP/1.1" 200 - 113 - 33 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:01:57 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be DD6CC733AEXAMPLE REST.PUT.OBJECT s3-dg.pdf "PUT /mybucket/s3-dg.pdf HTTP/1.1" 200 - - 4406583 41754 28 "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:03:21 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be BC3C074D0EXAMPLE REST.GET.VERSIONING - "GET /mybucket?versioning HTTP/1.1" 200 - 113 - 28 - "-" "S3Console/0.4" - |
Data Format considerations:
Time
, RequestURI
& User-Agent
can have space in their data ( [06/Feb/2014:00:00:38 +0000]
, "GET /gdelt/1980.csv.gz HTTP/1.1"
& aws-cli/1.7.36 Python/2.7.9 Linux/3.14.44-32.39.amzn1.x86_64
) which will mess up with the SerDe parsing .
For RequestURI & User-Agent field, the data is enclosed in quotes with spaces inside it. This can be parsed by any SerDe’s that support Quotes. Unfortunately, Athena does not support such SerDe’s like org.apache.hadoop.hive.serde2.OpenCSVSerde which does has quotes feature.
The org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
included by Athena will not support quotes yet.
A custom SerDe called com.amazon.emr.hive.serde.s3.S3LogDeserializer
comes with all EMR AMI’s just for parsing these logs. A query like the following would create the table easily. However, this SerDe will not be supported by Athena.
1 2 3 |
CREATE EXTERNAL TABLE IF NOT EXISTS s3AccessLogsTable ROW FORMAT SERDE 'com.amazon.emr.hive.serde.s3.S3LogDeserializer' LOCATION 's3://${REGION}.elasticmapreduce.samples/s3-access-logs/logs/'; |
Looking at these limitations, the org.apache.hadoop.hive.serde2.RegexSerDe only seems like the feasible option. Writing RegEx for your log structure was bit time consuming , but I was able to write it with the help of ELB log structure example and http://regexr.com/ .
I used the following Regex for the S3 server access logs :
([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) ([-0-9]*) ([-0-9]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)$
Also found at : http://regexr.com/3erct , where I removed some extra escape chars like \’s. In DDL , we need to add these escape characters like in following DDL.
([^ ]*)
– matches a continuous string. Because the number fields like ‘time’ can return ‘-‘ character , I did not use ([-0-9]*) regex which is used for numbers. Because of same reason , I had to use STRING for all fields in DDL. We can use presto string functions to convert strings for appropriate conversions and comparison operators on DML
\\[(.*?)\\] or \\\[(.*?)\\\]
– is for Time
field – For a string like [06/Feb/2014:00:00:38 +0000], it will match and give a string like 05/Dec/2016:16:56:36 +0000
which is easier for querying between times.
\\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\"
– is for Request-URI
field – removes the quotes and splits the "GET /mybucket?versioning HTTP/1.1"
into three groups.
(\"[^\"]*\")
– is for User-Agent
– matches the whole string with quotes.
(-|[0-9]*)
– is for HTTPstatus which is always a number like 200 or 404 . If you expect a ‘-‘ string, you can use ([^ ]*)
instead,
CREATE TABLE DDL :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
CREATE DATABASE s3_AccessLogsDB; CREATE EXTERNAL TABLE IF NOT EXISTS s3_AccessLogs.Accesslogs( BucketOwner string, Bucket string, RequestDateTime string, RemoteIP string, Requester string, RequestID string, Operation string, Key string, RequestURI_operation string, RequestURI_key string, RequestURI_httpProtoversion string, HTTPstatus string, ErrorCode string, BytesSent string, ObjectSize string, TotalTime string, TurnAroundTime string, Referrer string, UserAgent string, VersionId string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1', 'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)$' ) LOCATION 's3://s3-server-access/logs/' |
Where s3://s3server-accesslogs-bucket/logs/ is the bucket and prefix of you access logs.
Once you create the table, you can query this table for specific information :
Example Queries :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
--list some rows. select * from Accesslogs limit 10; -- Get count of all records in table SELECT COUNT(*) FROM Accesslogs; --Show all 'REST.GET.OBJECT' calls made between time period T1 and T2. --using presto Java Date Functions https://prestodb.io/docs/current/functions/datetime.html SELECT Requester , Operation , RequestDateTime FROM Accesslogs WHERE Operation='REST.GET.OBJECT' AND parse_datetime(RequestDateTime,'dd/MMM/yyyy:HH:mm:ss Z') BETWEEN parse_datetime('2016-12-05:16:56:36','yyyy-MM-dd:HH:mm:ss') AND parse_datetime('2016-12-05:16:56:40','yyyy-MM-dd:HH:mm:ss'); -- Get distinct s3paths which were accessed SELECT DISTINCT(Key) FROM Accesslogs; -- Number of GET requests per requestor SELECT UserAgent, count(*) AS cnt FROM Accesslogs WHERE RequestURI_operation LIKE '%GET%' OR RequestURI_operation LIKE '%HEAD%' GROUP BY useragent ORDER BY cnt DESC; -- Get access count for each s3 path after a given timestamp SELECT key, count(*) AS cnt FROM Accesslogs WHERE parse_datetime(RequestDateTime,'dd/MMM/yyyy:HH:mm:ss Z') > parse_datetime('2016-12-05:16:56:40','yyyy-MM-dd:HH:mm:ss') GROUP BY key ORDER BY cnt DESC; |
Considerations:
2. If the s3 directory is huge with a lot of log files , based on the query , you might encounter Athena’s “Query timeout: 30 minutes”. http://docs.aws.amazon.com/athena/latest/ug/service-limits.html. We have some optimizations here : https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/
Partitioning / Compression :
Overtime, the logs in your target prefix(s3://s3server-accesslogs-bucket/logs/) can grow big. This is especially true if you have this same bucket/prefix for other buckets access logs. If you enable logging on multiple source buckets that identify the same target bucket, the target bucket will have access logs for all those source buckets, but each log object will report access log records for a specific source bucket.
Amazon S3 uses the following object key format for the log objects it uploads in the target bucket: TargetPrefixYYYY-mm-DD-HH-MM-SS-UniqueString
. So, it basically creates multiple files with a bunch of logs in the S3 bucket.
Querying can be slower if there are large number of small files in textformat.
To speed things up, we need at-least one of these options to consolidate these logs.
1. PARTITIONING – can work on subset of files instead of going through all files.
2. CONVERT TO a COLUMNAR STORAGE – like ORC or PARQUET For faster query performance
3. ENABLE COMPRESSION – Snappy or GZ
4. BLOOM FILTERS
Note that S3 access logs data are not partitioned and they are not organized with prefixes like /year/month/day or /year=2010/month=10/day=11/ etc using which Athena could make static or Dynamic partitions respectively. Also access logs are uncompressed and flat text files.
Unfortunately, we cannot enable all above options by just defining a DDL. We need to copy/move out text based data-set with the above options enabled. One option to use of AWS EMR to periodically structure and partition the S3 access logs so that you can query those logs easily with Athena. At this moment, it is not possible to use Athena itself to convert non-partitioned data into partitioned data.
On EMR, we could use either
– HIVE : To partition/compress/covert , or
– S3-DIST-CP utility : To compress/groupBy
Note that we can use EMR’s Presto / Spark etc to query the logs, but Athena being a serverless model is much easier to query without managing anything.
Here’s a example to convert Non partitioned s3 access logs to partitioned s3 access logs on EMR:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# Suppose we already had s3 access logs on s3://s3-server-access/logs/ aws s3 ls s3://s3-server-access/logs/ | awk '{print $4}' 2016-12-05-17-17-06-5D51435906E10586 2016-12-05-17-17-16-025EA7CBD09F2B9B 2016-12-05-17-17-21-66445C690EEC5C2C 2016-12-05-17-17-23-D144F63F117328BF 2016-12-05-17-17-55-7882BD5AD4F94E0A 2016-12-05-17-18-08-2C48D7E4D1CDC8EF 2016-12-05-17-20-42-81A990C0C1857E62 2017-01-02-02-36-57-5A32C2EFB524C5BB 2017-01-02-08-28-41-064793A24240CD17 ------------------------------------------------- # we want to COPY by organizing(partition) and converting_to_orc # all s3 logs from your original s3 access log bucket(s3://s3-server-access/logs/) into (s3://s3-server-access/partitioned-logs). # At this moment, Athena doesn't support "INSERT OVERWRITE TABLE .. Partition" # for creating the dynamic partitions onto S3 buckets. # So, you will need to use EMR to run the below statement on EMR HIVE. # The size of the EMR cluster that you want to provision really depends on, # the data size that you are trying to convert # and how fast you want the initial conversion job to go through. # Choosing a correct configuration for EMR DOES matter and that discussion is beyond the scope of this article. # ssh to created EMR cluster's master ssh -i ~/retro.pem hadoop@ec2-xx-xx-xx-xx.us-west-2.compute.amazonaws.com # We want our shell not to die in middle of our job. # I prefer to use screen and later 'screen -dr' to recover or "ctr+a d" to detach. # https://www.rackaid.com/blog/linux-screen-tutorial-and-how-to/ screen > hive # set desired parameters to cope up with your data size and cluster size. set hive.exec.max.dynamic.partitions.pernode=100000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=1000000; set hive.exec.parallel=true; set hive.exec.dynamic.partition.mode=nonstrict; # use either a MR engine or TEZ engine (If you specify TEZ, make sure you have it on EMR while provisioning) set hive.execution.engine=mr; set hive.stats.autogather=true; set hive.optimize.sort.dynamic.partition=true; # some Mapred settings for MR engine set mapred.job.reduce.input.buffer.percent=0.0; set mapreduce.input.fileinputformat.split.minsizee=240000000; set mapreduce.input.fileinputformat.split.minsize.per.node=240000000; set mapreduce.input.fileinputformat.split.minsize.per.rack=240000000; -- set mapred.map.child.java.opts=-server -Xmx2800m -Djava.net.preferIPv4Stack=true; -- set mapred.reduce.child.java.opts=-server -Xms1024m -Xmx3800m -Djava.net.preferIPv4Stack=true; -- set mapreduce.map.memory.mb=3072; -- set mapreduce.reduce.memory.mb=4096; -- set io.sort.mb=800; # some hive settings for tez engine set hive.tez.java.opts=-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/; # Create the source un-partitioned external table(just like how we did on Athena) CREATE EXTERNAL TABLE IF NOT EXISTS Accesslogs( BucketOwner string, Bucket string, RequestDateTime string, RemoteIP string, Requester string, RequestID string, Operation string, Key string, RequestURI_operation string, RequestURI_key string, RequestURI_httpProtoversion string, HTTPstatus string, ErrorCode string, BytesSent string, ObjectSize string, TotalTime string, TurnAroundTime string, Referrer string, UserAgent string, VersionId string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1', 'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)$' ) LOCATION 's3://s3-server-access/logs/' # Create a partitioned external table to specify how you want you s3 logs to look like # Here I want my partitioning to take place by year , month , date of the S3 request. # You might extend/alter it to partition by other data columns like BUCKET / RequestID .. as well. # This table will also use columnar format ORC instead of Textfile to speed queries up. CREATE EXTERNAL TABLE Accesslogs_partitionedbyYearMonthDay( BucketOwner string, Bucket string, RemoteIP string, Requester string, RequestID string, Operation string, Key string, RequestURI_operation string, RequestURI_key string, RequestURI_httpProtoversion string, HTTPstatus string, ErrorCode string, BytesSent string, ObjectSize string, TotalTime string, TurnAroundTime string, Referrer string, UserAgent string, VersionId string) PARTITIONED by ( Request_YEAR STRING, Request_MONTH STRING, Request_DAY STRING) STORED AS ORC LOCATION 's3://s3-server-access/partitioned-logs/' # Alter to enable ZLIB compression to reduce the size of the logs. # ( bloom filters isnt supported on presto orc reader at this moment) , otherwise we can also add 'orc.bloom.filter.columns'='*'. ALTER TABLE Accesslogs_partitionedbyYearMonthDay SET TBLPROPERTIES('orc.compress'='ZLIB'); # Now, we want to COPY by organizing(partition) and converting_to_orc # all s3 logs from your original s3 access log bucket(s3://s3-server-access/logs/) into (s3://s3-server-access/partitioned-logs). # The below query spins up a MR/ TEZ job that automatically creates partitions and copies relevant data to those partitions. # This is to be only run on HIVE and not on Athena. INSERT OVERWRITE TABLE Accesslogs_partitionedbyYearMonthDay Partition(Request_YEAR,Request_MONTH,Request_DAY) select BucketOwner , Bucket , RemoteIP , Requester , RequestID , Operation , Key , RequestURI_operation , RequestURI_key , RequestURI_httpProtoversion , HTTPstatus , ErrorCode , BytesSent , ObjectSize , TotalTime , TurnAroundTime , Referrer , UserAgent , VersionId , year(from_unixtime(unix_timestamp(RequestDateTime,'dd/MMM/yyyy:HH:mm:ss Z'))), month(from_unixtime(unix_timestamp(RequestDateTime,'dd/MMM/yyyy:HH:mm:ss Z'))), day(from_unixtime(unix_timestamp(RequestDateTime,'dd/MMM/yyyy:HH:mm:ss Z'))) from Accesslogs; hive> exit; ------------------------------------------------- # Once the job is completed, we can see the following output aws s3 ls s3://s3-server-access/partitioned-logs/ --recursive | awk '{print $4}' partitioned-logs/ partitioned-logs/request_year=2016/request_month=12/request_day=5/000000_0 partitioned-logs/request_year=2017/request_month=1/request_day=2/000000_0 ------------------------------------------------- Now, we have our data partitioned / Compressed and Columnar data on s3 bucket. From this point onwards , it is very similar to what you find on this article http://docs.aws.amazon.com/athena/latest/ug/partitions.html 1. Create a table to reference this location on Athena(just like first create on 'Accesslogs_partitionedbyYearMonthDay' table ) 2. MSCK REPAIR TABLE Accesslogs_partitionedbyYearMonthDay - to load all partitions on S3 to Athena's metadata or Catalog. (Dynamic Partitioning - which means Athena automatically recognizes all our partitions) 3. Run queries on this table with WHERE clauses on specific year/month/date partition to speed the querying up. Ex: select * from Accesslogs_partitionedbyYearMonthDay where request_year='2016' AND request_month='12' ; ------------------------------------------------- |
Continuing the Partitioning for incoming logs :
Note that above discussion talks about one time creation of all partitions
. There’s two things to notice
1. While the above job is running , there could be new incoming logs which will not be partitioned onto destination.
2. Also after the job is completed, you keep getting new logs.
Which S3 files will be missed by the above job, really depends on hive-client’s split calculation on your source s3 bucket. when you run your ‘insert overwrite’ command, hive-client calculates splits initially by listing all objects inside the S3 prefix. This usually takes minutes and depends on number of s3 objects. After this split calculation is done, the actual jobs to do partitioning is done by mappers / reducers. So, any log files pushed to source prefix after split calculation is done will not be included on your job.
When you want to automatically add partitions to Athena table(Accesslogs_partitionedbyYearMonthDay) with new files coming to your S3 access logs with newer request days on your log bucket/prefix, you will need to have a ‘batch job on schedule’ or a ‘trigger mechanism’ to make sure the new data is also partitioned accordingly.
Batch job on schedule :
Note that our source s3 access logs are dumped into one single directory. Each file name is like 2016-12-05-17-17-06-5D51435906E10586. Since we can run our batch jobs after the above one-time-bulk-job, on the daily(or frequent) batches, we need to avoid the files that we already processed by previous one-time-job. For this , we can rely on the filename itself to get a list of newly generated files and then use an ETL to convert these files to partitioned data. We could also maintain a manifest file for the list of files already processed and the files which need to be processed etc.
Managing all that is left beyond this blog. There’s multiple ways to do ETL. One way is to run an AWS Data Pipeline with EMR, on schedule, and run the hive queries on the new data.
/
Trigger based solution :
If you want to run a partition job on every S3 PUT or bunch of PUTS , you can use AWS Lambda which can trigger a piece of code on every S3 object PUT. You can get Metadata about that put and Handle that on your Lambda code accordingly to fireoff an ETL etc to do the partitioning. AWS Lambda should come with AWS SDK and you can make all types of AWS API calls with proper permissions.
http://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
http://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html
Other Input Regex’s :
- CloudFront
'^(?!#)([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+[^\(]+[\(]([^\;]+).*\%20([^\/]+)[\/](.*)$'
- ELB access logs
'([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \\\”([^ ]*) ([^ ]*) (- |[^ ]*)\\\” ?(\”[^\”]*\”)? ?([A-Z0-9-]+)? ?([A-Za-z0-9.-]*)?$'
- Apache web logs
'([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?'
You may also like:
Using Athena to Query CloudTrail logs.
Parsing AWS Cloudtrail logs with EMR Hive and Spark