How Data-Pipeline’s RDS to Redshift copy template works ? (and limitations)
The template contains 4 Activities.
1. RDSToS3CopyActivity – Creates a CSV file in S3 based on mySQL table.
2. RedshiftTableCreateActivity – Translates MySQL table to PSQL and creates a table(if it does not exist).
3. S3ToRedshiftCopyActivity – Runs a Redshift COPY command.
4. S3StagingCleanupActivity – Cleans up the staging S3 directory.
RedshiftTableCreateActivity
is the key activity and where all the complexity lies.
It runs a shell script using ShellCommandActivity
which Translates
the MySQL table structure to Psql syntax , and creates a table on Redshift with translated table structure. the data-type translation between RDS and Redhsift is provided here
http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-template-redshift.html
Now, if you dig more into the ShellCommandActivity, The conversion script is downloaded as part of s3://datapipeline-us-east-1/sample-scripts/dbconv.sh
, which in-turn downloads the translator python script
curl -O https://s3.amazonaws.com/datapipeline-us-east-1/sample-scripts/mysql_to_redshift.py
You can check out the contents of this script on how exactly it translates.
Ex:
mysql> create table TestTable (Id int not null , Lname varchar(20));
According to translation table , this activity , translates to a psql script , which runs on the ec2 instance.
1 2 3 4 5 |
[ec2-user@ip-172-31-1-122 ~]$ cat red0303180850.psql -- Converted by MySql to Redshift db converter START TRANSACTION; CREATE TABLE RedshiftTarget (Id int not null , Lname varchar(80)); COMMIT; |
mysql_to_redshift.py :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
__author__ = 'ramaraa' #!/usr/bin/env python """ Fixes a MySQL dump made with the right format so it can be imported to a Redshift database. Dump using: mysqldump --compatible=postgresql --default-character-set=utf8 -r databasename.mysql -u root databasename tablename """ import re import sys import os import time import getopt def file_len(filename): """ Returns the equivalent of wc - l :param filename: file name :return: number of lines in the file """ with open(filename) as f: i = 0 for i, l in enumerate(f): pass return i + 1 def parse(input_filename, output_filename, table_name, distribution_key, redshift_sort_keys, type_map): # State storage """ Feed it a mysqldump file, and it'll output a redshift equivalent one. Optional params specified here are designed for mysqldump containing only one table definition :rtype : void :param input_filename: input mysqldump file :param output_filename: :param table_name: optional. generated table name for the first table encountered, else the same as input. :param distribution_key: optional. distribution key column in redshift for the first table encountered. :param redshift_sort_keys: optional. sort key(s) column(s) in redshift for the first table encountered. :param type_map: optional. override type conversion from mysql to redshift. tinyint(1):smallint,char(35):varchar(70),bigint(20) unsigned:bigint :param insert_mode: optional. https://docs.aws.amazon.com/console/datapipeline/redshiftcopyactivity. Determines how to handle pre-existing data in the target table that overlaps with rows in the data to be loaded. "allowedValues": [ "OVERWRITE_EXISTING", "KEEP_EXISTING", "TRUNCATE"] if insert_mode is OVERWRITE_EXISTING and distribution_key not specified, first primary key is set as dist key. """ if input_filename == "-": num_lines = -1 else: num_lines = int(file_len(input_filename)) tables = {} current_table = None creation_lines = [] foreign_key_lines = [] num_inserts = 0 started = time.time() table_primary_key_not_done = True # Open output file and write header. Logging file handle will be std out # unless we're writing output to std out, in which case NO PROGRESS FOR YOU. if output_filename == "-": output = sys.stdout logging = open(os.devnull, "w") else: output = open(output_filename, "w") logging = sys.stdout if input_filename == "-": input_fh = sys.stdin else: input_fh = open(input_filename) output.write("-- Converted by MySql to Redshift db converter\n") output.write("START TRANSACTION;\n") for i, line in enumerate(input_fh): time_taken = time.time() - started percentage_done = (i + 1) / float(num_lines) secs_left = (time_taken / percentage_done) - time_taken logging.write("\rLine %i (of %s: %.2f%%) [%s tables] [%s inserts] [ETA: %i min %i sec]" % ( i + 1, num_lines, ((i + 1) / float(num_lines)) * 100, len(tables), num_inserts, secs_left // 60, secs_left % 60, )) logging.flush() line = line.decode("utf8").strip() # Ignore comment lines if line.startswith("--") or line.startswith("/*") or line.startswith("LOCK TABLES") or line.startswith( "DROP TABLE") or line.startswith("UNLOCK TABLES") or not line: continue # Outside of anything handling if current_table is None: # Start of a table creation statement? if line.startswith("CREATE TABLE"): # current_table is the table name in MySQL current_table = line.split('"')[1] if table_name and not table_name.isspace(): logging.write("\rLine %i [Encountered table %s ] [Generated table name %s]" % ( i + 1, current_table, table_name)) current_table = table_name table_name = None tables[current_table] = {"columns": []} creation_lines = [] table_primary_key_not_done = True # Inserting data into a table? elif line.startswith("INSERT INTO"): output.write(line.encode("utf8").replace("'0000-00-00 00:00:00'", "NULL") + "\n") num_inserts += 1 # ??? else: print "\n ! Ignore. Unknown line in main body: %s" % line # Inside-create-statement handling else: # Is it a column? if line.startswith('"'): useless, name, definition = line.strip(",").split('"', 2) try: sql_type, extra = definition.strip().split(" ", 1) # This must be a tricky enum if ')' in extra: sql_type, extra = definition.strip().split(")") except ValueError: sql_type = definition.strip() extra = "" # check if extra contains unsigned unsigned = "unsigned" in extra.lower() # remove unsigned now extra = re.sub("CHARACTER SET [\w\d]+\s*", "", extra.replace("unsigned", "")) extra = re.sub("COLLATE [\w\d]+\s*", "", extra.replace("unsigned", "")) extra = extra.replace("AUTO_INCREMENT", "") extra = extra.replace("SERIAL", "") extra = extra.replace("ZEROFILL", "") extra = extra.replace("UNSIGNED", "") sql_type = sql_type.lower() if type_map is not None and sql_type in type_map: red_type = type_map[sql_type] elif type_map is not None and unsigned and sql_type+ " unsigned" in type_map: red_type = type_map[sql_type+ " unsigned"] elif sql_type == "tinyint(1)": red_type = "boolean" elif sql_type.startswith("tinyint("): red_type = "smallint" elif sql_type.startswith("smallint("): if unsigned: red_type = "integer" else: red_type = "smallint" elif sql_type.startswith("mediumint("): red_type = "integer" elif sql_type.startswith("int("): if unsigned: red_type = "bigint" else: red_type = "integer" elif sql_type.startswith("bigint("): if unsigned: red_type = "varchar(80)" else: red_type = "bigint" elif sql_type.startswith("float"): red_type = "real" elif sql_type.startswith("double"): red_type = "double precision" elif sql_type.startswith("decimal"): # same decimal red_type = sql_type elif sql_type.startswith("char("): size = int(sql_type.split("(")[1].rstrip(")")) red_type = "varchar(%s)" % (size * 4) elif sql_type.startswith("varchar("): size = int(sql_type.split("(")[1].rstrip(")")) red_type = "varchar(%s)" % (size * 4) elif sql_type == "longtext": red_type = "varchar(max)" elif sql_type == "mediumtext": red_type = "varchar(max)" elif sql_type == "tinytext": red_type = "text(%s)" % (255 * 4) elif sql_type == "text": red_type = "varchar(max)" elif sql_type.startswith("enum(") or sql_type.startswith("set("): red_type = "varchar(%s)" % (255 * 2) elif sql_type == "blob": red_type = "varchar(max)" elif sql_type == "mediumblob": red_type = "varchar(max)" elif sql_type == "longblob": red_type = "varchar(max)" elif sql_type == "tinyblob": red_type = "varchar(255)" elif sql_type.startswith("binary"): red_type = "varchar(255)" elif sql_type == "date": # same red_type = sql_type elif sql_type == "time": red_type = "varchar(40)" elif sql_type == "datetime": red_type = "timestamp" elif sql_type == "year": red_type = "varchar(16)" elif sql_type == "timestamp": # same red_type = sql_type else: # all else, e.g., varchar binary red_type = "varchar(max)" # Record it creation_lines.append('"%s" %s %s' % (name, red_type, extra)) tables[current_table]['columns'].append((name, red_type, extra)) # Is it a constraint or something? elif line.startswith("PRIMARY KEY"): #composite primary key not supported in redshift if table_primary_key_not_done: #aws datapipeline redshift copy only supports 1 primary key. remove this restriction after that is fixed #creation_lines.append(line.rstrip(",")) first_pkey = line.rstrip(",").rstrip(")").lstrip("PRIMARY KEY").lstrip("(\"").split(",")[0].rstrip("\"") pkey_line ='PRIMARY KEY("' + first_pkey + '")' creation_lines.append(pkey_line) if not distribution_key: distribution_key = first_pkey table_primary_key_not_done = False elif line.startswith("CONSTRAINT"): # Try adding foreign key in a different transaction. If it fails, no big deal. foreign_key_lines.append("ALTER TABLE \"%s\" ADD CONSTRAINT %s DEFERRABLE INITIALLY DEFERRED" % ( current_table, line.split("CONSTRAINT")[1].strip().rstrip(","))) # No need for index on foreign key column as Redshift does not support indexes elif line.startswith("UNIQUE KEY"): creation_lines.append("UNIQUE (%s)" % line.split("(")[1].split(")")[0]) elif line.startswith("FULLTEXT KEY"): # No indexes in Redshift pass elif line.startswith("KEY"): pass # Is it the end of the table? elif line == ");": output.write("CREATE TABLE IF NOT EXISTS \"%s\" (\n" % current_table) for j, outline in enumerate(creation_lines): output.write(" %s%s\n" % (outline, "," if j != (len(creation_lines) - 1) else "")) output.write(')\n') if distribution_key and not distribution_key.isspace(): output.write('distkey(%s)\n' % distribution_key) distribution_key = None if redshift_sort_keys and not redshift_sort_keys.isspace(): output.write('sortkey(%s)\n' % redshift_sort_keys) redshift_sort_keys = None output.write(';\n\n') current_table = None # ??? else: print "\n ! Ignore. Unknown line inside table creation: %s" % line # Finish file output.write("COMMIT;\n") #output.write("START TRANSACTION;\n") # Write FK constraints out #output.write("\n-- Foreign keys --\n") #for line in foreign_key_lines: # output.write("%s;\n" % line) # Finish file #output.write("\n") #output.write("COMMIT;\n") print "" def usage(): print("Usage: %s -i input -o output [-t table_name] [-d DistKey] [-s SortKey1,SortKey2...] " "[-m MySQL1:RedType1,MySQL2:RedType2...]" % sys.argv[0]) if __name__ == "__main__": input_file = '' output_file = '' gen_table_name = None dist_key = None sort_keys = None map_types = None insert_mode = None try: opts, args = getopt.getopt(sys.argv[1:], "i:o:t:d:s:m:n:", ["input_file=", "output_file=", "table_name=", "dist_key=", "sort_keys=", "map_types=", "insert_mode="]) except getopt.GetoptError as e: print (str(e)) usage() sys.exit(2) for opt, arg in opts: if opt in ("-i", "--input_file"): input_file = arg elif opt in ("-o", "--output_file"): output_file = arg elif opt in ("-t", "--table_name"): gen_table_name = arg elif opt in ("-d", "--dist_key"): dist_key = arg elif opt in ("-s", "--sort_keys"): sort_keys = arg elif opt in ("-m", "--map_types"): map_types = arg elif opt in ("-n", "--insert_mode"): insert_mode = arg map_types_dict = None if map_types and not map_types.isspace(): map_types_dict_temp = dict([arg.split(':') for arg in map_types.lower().lstrip().split(',')]) #sanitize keys to strip whitespaces map_types_dict = dict() for key in map_types_dict_temp: map_types_dict[key.lstrip().rstrip()] = map_types_dict_temp[key] parse(input_file, output_file, gen_table_name, dist_key, sort_keys, map_types_dict) sys.exit(0) |