""" Bridge ====== Bridge between Django with its models and database and gtfs.GTFS as intermediate object for File IO. Contents -------- Constants --------- gtfs_schema : dir{str,list[str]} Maps GTFS file names (without filename extension) to fields described by the GTFS Reference reversed_file_mapping : dict(str,str) Map CamelCased filenames to '_'-separated Functions --------- to_camel_case(s): Converts '_'-separated str to CamelCase with capital first letter standardize_time(time_str): Converts str in unicode time format to %H:%M:%S format with normalized 24 hour time is_NaN(v): Checks if given variable is either a str expressing NaN or NaN as object stdz(v): Standardize date and time formats gtfs_to_db(g): Write an existing gtfs.GTFS object to the database using the GTFS compliant models db_to_gtfs(q, folder_path): Convert list of query sets to gtfs.GTFS object and write to specified folder if validation for GTFS compliance passes. """ import pt_map.gtfs import pt_map.models import pandas as pd from pattern.text.en import singularize, pluralize import math import numbers import email.utils import time import datetime import django.db.models import time time_delta = int(datetime.datetime(2024,1,1).timestamp()) gtfs_schema = { "agency": [ "agency_id", "agency_name", "agency_url", "agency_timezone", "agency_lang", "agency_phone", "agency_email", "agency_fare_url" ], "stops": [ "stop_id", "stop_code", "stop_name", "stop_desc", "stop_lat", "stop_lon", "zone_id", "stop_url", "location_type", "parent_station", "stop_timezone", "wheelchair_boarding", "level_id", "platform_code" ], "routes": [ "route_id", "agency_id", "route_short_name", "route_long_name", "route_desc", "route_type", "route_url", "route_color", "route_text_color", "route_sort_order", "continuous_pickup", "continuous_drop_off" ], "trips": [ "trip_id", "route_id", "service_id", "trip_headsign", "trip_short_name", "direction_id", "block_id", "shape_id", "wheelchair_accessible", "bikes_allowed" ], "stop_times": [ "trip_id", "arrival_time", "departure_time", "stop_id", "stop_sequence", "stop_headsign", "pickup_type", "drop_off_type", "shape_dist_traveled", "timepoint" ], "calendar": [ "service_id", "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday", "start_date", "end_date" ], "calendar_dates": [ "service_id", "date", "exception_type" ], "fare_attributes": [ "fare_id", "price", "currency_type", "payment_method", "transfers", "transfer_duration" ], "fare_rules": [ "fare_id", "route_id", "origin_id", "destination_id", "contains_id" ], "timeframes": [ "timeframe_id", "start_time", "end_time", "headway_sec", "exact_times" ], "fare_media": [ "media_id", "agency_id", "fare_id", "seat_type", "price" ], "fare_products": [ "product_id", "agency_id", "product_type", "fare_id", "product_name", "short_name", "description", "duration", "transfers" ], "fare_leg_rules": [ "fare_id", "route_id", "origin_id", "destination_id", "contains_id" ], "fare_transfer_rules": [ "from_fare_id", "to_fare_id", "transfer_type", "min_transfer_time" ], "areas": [ "area_id", "area_name", "area_description" ], "stop_areas": [ "stop_area_id", "stop_id", "area_id", "location_type", "parent_station", "fare_zone_id" ], "networks": [ "network_id", "network_name", "network_description" ], "route_networks": [ "route_id", "network_id" ], "shapes": [ "shape_id", "shape_pt_lat", "shape_pt_lon", "shape_pt_sequence", "shape_dist_traveled" ], "frequencies": [ "trip_id", "start_time", "end_time", "headway_secs", "exact_times" ], "transfers": [ "from_stop_id", "to_stop_id", "transfer_type", "min_transfer_time" ], "pathways": [ "pathway_id", "from_stop_id", "to_stop_id", "pathway_mode", "is_bidirectional", "length", "traversal_time", "stair_count", "max_slope", "min_width", "signposted_as", "reversed_signposted_as" ], "levels": [ "level_id", "level_index", "level_name" ], "location_groups": [ "location_group_id", "location_group_name" ], "location_group_stops": [ "location_group_id", "stop_id" ], "locations_geojson": [ "type", "features" ], "booking_rules": [ "rule_id", "stop_id", "rule_type", "booking_url", "admission_rules", "admission_requirements" ], "translations": [ "table_name", "field_name", "language", "translation" ], "feed_info": [ "feed_publisher_name", "feed_publisher_url", "feed_lang", "default_lang", "feed_start_date", "feed_end_date", "feed_version", "feed_contact_email", "feed_contact_url" ], "attributions": [ "attribution_id", "organization_name", "is_producer" ] } primary_keys = { pt_map.models.Agency: "agency_id", pt_map.models.Level: "level_id", pt_map.models.Stop: "stop_id", pt_map.models.Route: "route_id", pt_map.models.Shape: "shape_id", pt_map.models.Calendar: "service_id", pt_map.models.CalendarDate: None, pt_map.models.Trip: "trip_id", pt_map.models.LocationGroup: "location_group_id", pt_map.models.LocationsGeojson: None, pt_map.models.StopTime: None, pt_map.models.FareAttribute: "fare_id", pt_map.models.FareRule: None, pt_map.models.Frequency: None, pt_map.models.Transfer: None, pt_map.models.Pathway: "pathway_id", pt_map.models.FeedInfo: None, pt_map.models.BookingRule: "booking_rule_id", pt_map.models.Translation: None, pt_map.models.Attribution: "attribution_id", pt_map.models.LocationGroupStop: None, pt_map.models.Network: "network_id", pt_map.models.RouteNetwork: None, pt_map.models.Area: None, pt_map.models.StopArea: None, pt_map.models.FareMedium: "fare_media_id", pt_map.models.FareProduct: None, pt_map.models.Timeframe: None, pt_map.models.FareLegRule: None, pt_map.models.FareTransferRule: None, } foreign_keys = [ (pt_map.models.Agency, []), (pt_map.models.Level, []), (pt_map.models.Stop, [(pt_map.models.Stop, 'parent_station'), (pt_map.models.Level, 'level_id'), ]), (pt_map.models.Route, [(pt_map.models.Agency, 'agency_id'), ]), (pt_map.models.Shape, []), (pt_map.models.Calendar, []), (pt_map.models.CalendarDate, []), (pt_map.models.Trip, [(pt_map.models.Route, 'route_id'), (pt_map.models.Shape, 'shape_id'), ]), (pt_map.models.LocationGroup, []), (pt_map.models.LocationsGeojson, []), (pt_map.models.StopTime, [(pt_map.models.Trip, 'trip_id'), (pt_map.models.Stop, 'stop_id'), (pt_map.models.LocationGroup, 'location_group_id'), (pt_map.models.LocationsGeojson, 'location_id'), ]), (pt_map.models.FareAttribute, [(pt_map.models.Agency, 'agency_id'), ]), (pt_map.models.FareRule, [(pt_map.models.FareAttribute, 'fare_id'), (pt_map.models.Route, 'route_id'), ]), (pt_map.models.Frequency, [(pt_map.models.Trip, 'trip_id'), ]), (pt_map.models.Transfer, [(pt_map.models.Stop, 'from_stop_id'), (pt_map.models.Stop, 'to_stop_id'), (pt_map.models.Route, 'from_route_id'), (pt_map.models.Route, 'to_route_id'), (pt_map.models.Trip, 'from_trip_id'), (pt_map.models.Trip, 'to_trip_id'), ]), (pt_map.models.Pathway, [(pt_map.models.Stop, 'from_stop_id'), (pt_map.models.Stop, 'to_stop_id'), ]), (pt_map.models.FeedInfo, []), (pt_map.models.BookingRule, [(pt_map.models.Trip, 'trip_id'), ]), (pt_map.models.Translation, []), (pt_map.models.Attribution, [(pt_map.models.Agency, 'agency_id'), (pt_map.models.Route, 'route_id'), (pt_map.models.Trip, 'trip_id'), ]), (pt_map.models.LocationGroupStop, [(pt_map.models.LocationGroup, 'location_group_id'), (pt_map.models.Stop, 'stop_id'), ]), (pt_map.models.Network, []), (pt_map.models.RouteNetwork, [(pt_map.models.Network, 'network_id'), (pt_map.models.Route, 'route_id'), ]), (pt_map.models.Area, []), (pt_map.models.StopArea, [(pt_map.models.Area, 'area_id'), (pt_map.models.Stop, 'stop_id'), ]), (pt_map.models.FareMedium, []), (pt_map.models.FareProduct, []), (pt_map.models.Timeframe, []), (pt_map.models.FareLegRule, [(pt_map.models.Network, 'network_id'), (pt_map.models.Area, 'from_area_id'), (pt_map.models.Area, 'to_area_id'), (pt_map.models.Timeframe, 'from_timeframe_group_id'), (pt_map.models.Timeframe, 'to_timeframe_group_id'), (pt_map.models.FareProduct, 'fare_product_id'), ]), (pt_map.models.FareTransferRule, [(pt_map.models.FareProduct, 'fare_product_id'), ]), ] class_names = {'Agency': 'agency', 'Stop': 'stops', 'Route': 'routes', 'Trip': 'trips', 'StopTime': 'stop_times', 'Calendar': 'calendar', 'CalendarDate': 'calendar_dates', 'FareAttribute': 'fare_attributes', 'FareRule': 'fare_rules', 'Timeframe': 'timeframes', 'FareMedium': 'fare_media', 'FareProduct': 'fare_products', 'FareLegRule': 'fare_leg_rules', 'FareTransferRule': 'fare_transfer_rules', 'Area': 'areas', 'StopArea': 'stop_areas', 'Network': 'networks', 'RouteNetwork': 'route_networks', 'Shape': 'shapes', 'Frequency': 'frequencies', 'Transfer': 'transfers', 'Pathway': 'pathways', 'Level': 'levels', 'LocationGroup': 'location_groups', 'LocationGroupStop': 'location_group_stops', 'LocationsGeojson': 'locations_geojson', 'BookingRule': 'booking_rules', 'Translation': 'translations', 'FeedInfo': 'feed_info', 'Attribution': 'attributions'} def toCamelCase(s: str): """ Convert '_'-separated str to CamelCase with the first letter capitalized. Parameters ---------- s : str '_'-separated string Returns ------- str CamelCased str, first letter capitalized """ return ''.join(word.capitalize() for word in s.split('_')) def standardize_time(time_str: str): """ Convert time str to standardized %H:%M:%S format. Parameters ---------- time_str: str str encoding time Returns ------- str in format '%H:%M:%S' """ date_str = f"Jan 19, 1999 {time_str}" ntuple=email.utils.parsedate(date_str) timestamp=time.mktime(ntuple) date=datetime.datetime.fromtimestamp(timestamp) return date.strftime('%H:%M:%S') def is_NaN(v): """ Returns ------- True If v is either a str representing NaN or NaN as an object False Otherwise """ return (isinstance(v, str) and v.lower() == "nan") or (isinstance(v, numbers.Number) and math.isnan(v)) def stdz(v, m: django.db.models.Model, f: str): """ If f is a time or date field, convert to a format our db can easily work with. If f is a foreign key Parameters ---------- v : object object to be standardized m : django.db.models.Model model to be written to f : str field name in question Returns ------- Converted str If m.f is a DateField or a TimeField Unchanged str Otherwise """ if m._meta.get_field(f).get_internal_type() == 'DateField': return str(v) if m._meta.get_field(f).get_internal_type() == 'TimeField': return standardize_time(v) if m._meta.get_field(f).get_internal_type() == 'ForeignKey': pass return v def to_snake_case(name): name = name[0].lower() + name[1:] for c in name[1:]: if c.isupper(): name.insert(i,'_') else: c.lower() return name def unqfk(ts, fk): if not isinstance(fk, str): fk = str(int(fk)) print(f"fk: {fk}") return f"{ts}{fk}".strip() def gtfs_to_db(g: pt_map.gtfs.GTFS): """ Given a gtfs.GTFS object, write GTFS-compliantly to db by creating the correct models Parameters ---------- g : gtfs.GTFS GTFS object to be saved to db """ ts = str(int(time.time())-time_delta) for model in foreign_keys: if model[0] in [pt_map.models.Calendar, pt_map.models.CalendarDate, ]: continue m = model[0] df = getattr(g, class_names[m.__name__]).data if not df.empty: v = gtfs_schema[class_names[m.__name__]] for _, row in df.iterrows(): for fk in model[1]: if row.get(fk[1]): row[fk[1]] = fk[0].objects.get(**{primary_keys[fk[0]]: unqfk(ts, row[fk[1]])}) defaults = {field: stdz(row.get(field), m, field) for field in v if row.get(field) and not is_NaN(row[field])} if primary_keys[m]: row[primary_keys[m]] = unqfk(ts, row[primary_keys[m]]) defaults[primary_keys[m]] = row[primary_keys[m]] try: m.objects.get(**{primary_keys[m]: row[primary_keys[m]]}) except m.DoesNotExist: m.objects.update_or_create( defaults = defaults, #**kw_args, **{primary_keys[m]: row[primary_keys[m]]} ) else: m.objects.update_or_create(defaults=defaults) reversed_file_mapping = { "Agency": "agency", "Stop": "stops", "Route": "routes", "Trip": "trips", "StopTime": "stop_times", "Calendar": "calendar", "CalendarDate": "calendar_dates", "FareAttribute": "fare_attributes", "FareRule": "fare_rules", "Timeframe": "timeframes", "FareMedium": "fare_media", "FareProduct": "fare_products", "FareLegRule": "fare_leg_rules", "FareTransferRule": "fare_transfer_rules", "Area": "areas", "StopArea": "stop_areas", "Network": "networks", "RouteNetwork": "route_networks", "Shape": "shapes", "Frequency": "frequencies", "Transfer": "transfers", "Pathway": "pathways", "Level": "levels", "LocationGroup": "location_groups", "LocationGroupStop": "location_group_stops", "LocationsGeojson": "locations.geojson", "BookingRule": "booking_rules", "Translation": "translations", "FeedInfo": "feed_info", "Attribution": "attributions" } def db_to_gtfs(q: list[django.db.models.query.QuerySet], folder_path: str = ""): """ Convert given list of query sets to gtfs.GTFS object Parameters ---------- q : list[django.db.models.query.QuerySet] List of QuerySets containing the retrieved data to be Converted folder_path : str path to be set as the results folder_path instance variable Returns ------- gtfs.GTFS object containing the queried data """ dfs = {reversed_file_mapping[m.model.__name__]: (pd.DataFrame(list(m.values())) if m else pd.DataFrame()) for m in q} g = pt_map.gtfs.GTFS(folder_path, dfs) g.validate() return g