123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- """This code is part of the final project.
- After auditing is complete the next step is to prepare the data to be inserted into a SQL database.
- To do so, I will parse the elements in the OSM XML file, transforming them from document format to
- tabular format, thus making it possible to write to .csv files. These csv files can then easily be
- imported to a SQL database as tables.
- """
- # Initial Imports
- import csv
- import codecs
- import pprint
- import re
- import xml.etree.cElementTree as ET
- from audit import * #Imports all the functions from audit.py file
- import cerberus
- import schema
- # The directory where the OSM file is located.
- OSM_PATH = 'san-francisco_california.osm'
- # The directory where the created CSV files will be located
- NODES_PATH = "nodes.csv"
- NODE_TAGS_PATH = "nodes_tags.csv"
- WAYS_PATH = "ways.csv"
- WAY_NODES_PATH = "ways_nodes.csv"
- WAY_TAGS_PATH = "ways_tags.csv"
- # The SQL schema that is defined in schema.py file.
- # Both files need to be in the same directory.
- SCHEMA = schema.schema
- # Regular expression pattern to find problematic characters in value attributes.
- problem_chars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
- #Regular expression pattern to find different types of streets in street names.
- street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
- # The list of street types that we want to have.
- expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square",
- "Lane", "Road", "Trail", "Parkway", "Commons"]
- # The list of dictionaries, containing street types
- # that need to be changed to match the 'expected' list.
- mapping = { "St": "Street", "St.": "Street", "street": "Street",
- "Ave": "Avenue", "Ave.": "Avenue", "AVE": "Avenue,",
- "avenue": "Avenue", "Rd.": "Road", "Rd": "Road", "road": "Road",
- "Blvd": "Boulevard", "Blvd.": "Boulevard", "Blvd,": "Boulevard",
- "boulevard": "Boulevard", "broadway": "Broadway",
- "square": "Square", "square": "Square", "Sq": "Square",
- "way": "Way",
- "Dr.": "Drive", "Dr": "Drive",
- "ct": "Court", "Ct": "Court", "court": "Court",
- "cres": "Crescent", "Cres": "Crescent", "Ctr": "Center",
- "Hwy": "Highway", "hwy": "Highway",
- "Ln": "Lane", "Ln.": "Lane",
- "parkway": "Parkway" }
- # The columns in the CSV files. The same columns need to be created for the database.
- NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
- NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
- WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
- WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
- WAY_NODES_FIELDS = ['id', 'node_id', 'position']
- def shape_element(element):
- """ A function to Shape each element into several data structures.
- Args:
- - param element: 'Node' and'way' tags that are passed to this function from
- the get_element function; mainly called by process_map function.
- The function goes through node and way elements, defining the values for
- nodes, nodes_ways, ways, ways_tags, and ways_nodes dictionaties.
- The "node" field holds a dictionary of the following top level node
- attributes: id, user, uid, version, lat, lon, timestamp, changeset.
- The "ways" fields hold a dictionary of the following top level node
- attributes: id, users, uid, versiob, timestamp, changeset.
- The "node_tags" and "way_tags"field holds a list of dictionaries, one per
- secondary tag. Secondary tags are child tags of node which have the tag
- name/type: "tag". Each dictionary has the following fields from the secondary
- tag attributes:
- - id: the top level node id attribute value
- - key: the full tag "k" attribute value if no colon is present or the
- characters after the colon if one is.
- - value: the tag "v" attribute value
- - type: either the characters before the colon in the tag "k" value
- or "regular" if a colon is not present.
- For the value field, I call updates_name and update_postcode functions to
- clean problematic street names or postcodes. I call these functions on both
- node and way elements.
- Return:
- The following dictionaries will be returned:
- - node
- - node_tags
- - way
- - way_nodes
- - way_tags
- """
- node_attribs = {} # Handle the attributes in node element
- way_attribs = {} # Handle the attributes in way element
- way_nodes = [] # Handle the 'nd' tag in the way element
- tags = [] # Handle secondary tags the same way for both node and way elements
- # Handling node elements.
- if element.tag == 'node':
- for item in NODE_FIELDS:
- # If the 'uid' field was empty "9999999" is set as 'uid'.
- try:
- node_attribs[item] = element.attrib[item]
- except:
- node_attribs[item] = "9999999"
- # Iterating through the 'tag' tags in the node element.
- for tg in element.iter('tag'):
- #Ignoring values that contain problematic characters.
- if not problem_chars.search(tg.attrib['k']):
- tag_dict_node = {}
- tag_dict_node['id'] = element.attrib['id']
- # Calling the update_name function to clean up problematic
- # street names based on audit.py file.
- if is_street_name(tg):
- better_name = update_name(tg.attrib['v'], mapping)
- tag_dict_node['value'] = better_name
- # Calling the update_postcode function to clean up problematic
- # postcodes based on audit.py file.
- elif get_postcode(tg):
- better_postcode = update_postcode(tg.attrib['v'])
- tag_dict_node['value'] = better_postcode
- # For other values that are not street names or postcodes.
- else:
- tag_dict_node['value'] = tg.attrib['v']
- if ':' not in tg.attrib['k']:
- tag_dict_node['key'] = tg.attrib['k']
- tag_dict_node['type'] = 'regular'
- # Dividing words before and after a colon ':'
- else:
- character_before_colon = re.findall('^[a-zA-Z]*:', tg.attrib['k'])
- character_after_colon = re.findall(':[a-zA-Z_]+' , tg.attrib['k'])
- if len(character_after_colon) != 0: #If the key was an empty field
- tag_dict_node['key'] = character_after_colon[0][1:]
- else:
- tag_dict_node['key'] = 'regular'
- if len(character_before_colon) != 0: #If the type was an empty field
- tag_dict_node['type'] = character_before_colon[0][: -1]
- else:
- tag_dict_node['type'] = 'regular'
- tags.append(tag_dict_node)
- return {'node': node_attribs, 'node_tags': tags}
- # Handling way elements.
- elif element.tag == 'way':
- for item in WAY_FIELDS:
- # If the 'uid' field was empty "9999999" is set as 'uid'.
- try:
- way_attribs[item] = element.attrib[item]
- except:
- way_attribs[item] = "9999999"
- # Iterating through 'tag' tags in way element.
- for tg in element.iter('tag'):
- if not problem_chars.search(tg.attrib['k']):
- tag_dict_way = {}
- tag_dict_way['id'] = element.attrib['id']
- # Calling the update_name function to clean up problematic
- # street names based on audit.py file.
- if is_street_name(tg):
- better_name_way = update_name(tg.attrib['v'], mapping)
- tag_dict_way['value'] = better_name_way
- # Calling the update_postcode function to clean up problematic
- # postcodes based on audit.py file.
- if get_postcode(tg):
- better_postcode_way = update_postcode(tg.attrib['v'])
- tag_dict_way['value'] = better_postcode_way
- # For other values that are not street names or postcodes.
- else:
- tag_dict_way['value'] = tg.attrib['v']
- if ':' not in tg.attrib['k']:
- tag_dict_way['key'] = tg.attrib['k']
- tag_dict_way['type'] = 'regular'
- #Dividing words before and after a colon ':'
- else:
- character_before_colon = re.findall('^[a-zA-Z]*:', tg.attrib['k'])
- character_after_colon = re.findall(':[a-zA-Z_]+', tg.attrib['k'])
- if len(character_after_colon) == 1:
- tag_dict_way['key'] = character_after_colon[0][1:]
- if len(character_after_colon) > 1:
- tag_dict_way['key'] = character_after_colon[0][1: ] + character_after_colon[1]
- if len(character_before_colon) != 0: #If the type was an empty field
- tag_dict_way['type'] = character_before_colon[0][: -1]
- else:
- tag_dict_way['type'] = 'regular'
- tags.append(tag_dict_way)
- # Iterating through 'nd' tags in way element.
- count = 0
- for tg in element.iter('nd'):
- tag_dict_nd = {}
- tag_dict_nd['id'] = element.attrib['id']
- tag_dict_nd['node_id'] = tg.attrib['ref']
- tag_dict_nd['position'] = count
- count += 1
- way_nodes.append(tag_dict_nd)
- return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
- # Helper functions.
- def get_element(osm_file, tags=('node', 'way', 'relation')):
- """Yield element if it is the right type of tag"""
- context = ET.iterparse(osm_file, events=('start', 'end'))
- _, root = next(context)
- for event, elem in context:
- if event == 'end' and elem.tag in tags:
- yield elem
- root.clear()
- # Validating that during creation of CSV files the fields are all in accordance with
- # the columns that should be in the CSV files.
- def validate_element(element, validator, schema=SCHEMA):
- """Raise ValidationError if element does not match schema"""
- if validator.validate(element, schema) is not True:
- field, errors = next(validator.errors.items())
- message_string = "\nElement of type '{0}' has the following errors:\n{1}"
- error_string = pprint.pformat(errors)
- raise Exception(message_string.format(field, error_string))
- class UnicodeDictWriter(csv.DictWriter, object):
- """Extend csv.DictWriter to handle Unicode input"""
- def writerow(self, row):
- super(UnicodeDictWriter, self).writerow({
- k: (v.encode('utf-8') if isinstance(v, str) else v) for k, v in row.items()
- })
- def writerows(self, rows):
- for row in rows:
- self.writerow(row)
- # Creating CSV Files.
- def process_map(file_in, validate):
- """Iteratively process each XML element and write to csv(s)"""
- with codecs.open(NODES_PATH, 'w') as nodes_file, \
- codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
- codecs.open(WAYS_PATH, 'w') as ways_file, \
- codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
- codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:
- nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
- node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
- ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
- way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
- way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)
- nodes_writer.writeheader()
- node_tags_writer.writeheader()
- ways_writer.writeheader()
- way_nodes_writer.writeheader()
- way_tags_writer.writeheader()
- validator = cerberus.Validator()
- count = 1
- for element in get_element(file_in, tags=('node', 'way')):
- # Setting a counter to show how many rows the code has processed.
- if count % 10000 == 0:
- print (count)
- count += 1
- el = shape_element(element)
- if el:
- if validate is True:
- validate_element(el, validator)
- if element.tag == 'node':
- nodes_writer.writerow(el['node'])
- node_tags_writer.writerows(el['node_tags'])
- elif element.tag == 'way':
- ways_writer.writerow(el['way'])
- way_nodes_writer.writerows(el['way_nodes'])
- way_tags_writer.writerows(el['way_tags'])
- if __name__ == '__main__':
- # Note: If the validation is set to True,
- # the process takes much longer than when it is set to False.
- process_map(OSM_PATH, validate=False)
|