transport-accessibility/transport_accessibility/pt_map/bridge.py
2024-06-11 17:20:25 +02:00

543 lines
16 KiB
Python

"""
Bridge
======
Bridge between Django with its models and database and gtfs.GTFS as intermediate object for File IO.
Contents
--------
Constants
---------
gtfs_schema : dir{str,list[str]}
Maps GTFS file names (without filename extension) to fields described by the GTFS Reference
reversed_file_mapping : dict(str,str)
Map CamelCased filenames to '_'-separated
Functions
---------
to_camel_case(s):
Converts '_'-separated str to CamelCase with capital first letter
standardize_time(time_str):
Converts str in unicode time format to %H:%M:%S format with normalized 24 hour time
is_NaN(v):
Checks if given variable is either a str expressing NaN or NaN as object
stdz(v):
Standardize date and time formats
gtfs_to_db(g):
Write an existing gtfs.GTFS object to the database using the GTFS compliant models
db_to_gtfs(q, folder_path):
Convert list of query sets to gtfs.GTFS object and write to specified folder if validation for GTFS compliance passes.
"""
import pt_map.gtfs
import pt_map.models
import pandas as pd
from pattern.text.en import singularize, pluralize
import math
import numbers
import email.utils
import time
import datetime
import django.db.models
import time
time_delta = int(datetime.datetime(2024,1,1).timestamp())
gtfs_schema = {
"agency": [
"agency_id",
"agency_name",
"agency_url",
"agency_timezone",
"agency_lang",
"agency_phone",
"agency_email",
"agency_fare_url"
],
"stops": [
"stop_id",
"stop_code",
"stop_name",
"stop_desc",
"stop_lat",
"stop_lon",
"zone_id",
"stop_url",
"location_type",
"parent_station",
"stop_timezone",
"wheelchair_boarding",
"level_id",
"platform_code"
],
"routes": [
"route_id",
"agency_id",
"route_short_name",
"route_long_name",
"route_desc",
"route_type",
"route_url",
"route_color",
"route_text_color",
"route_sort_order",
"continuous_pickup",
"continuous_drop_off"
],
"trips": [
"trip_id",
"route_id",
"service_id",
"trip_headsign",
"trip_short_name",
"direction_id",
"block_id",
"shape_id",
"wheelchair_accessible",
"bikes_allowed"
],
"stop_times": [
"trip_id",
"arrival_time",
"departure_time",
"stop_id",
"stop_sequence",
"stop_headsign",
"pickup_type",
"drop_off_type",
"shape_dist_traveled",
"timepoint"
],
"calendar": [
"service_id",
"monday",
"tuesday",
"wednesday",
"thursday",
"friday",
"saturday",
"sunday",
"start_date",
"end_date"
],
"calendar_dates": [
"service_id",
"date",
"exception_type"
],
"fare_attributes": [
"fare_id",
"price",
"currency_type",
"payment_method",
"transfers",
"transfer_duration"
],
"fare_rules": [
"fare_id",
"route_id",
"origin_id",
"destination_id",
"contains_id"
],
"timeframes": [
"timeframe_id",
"start_time",
"end_time",
"headway_sec",
"exact_times"
],
"fare_media": [
"media_id",
"agency_id",
"fare_id",
"seat_type",
"price"
],
"fare_products": [
"product_id",
"agency_id",
"product_type",
"fare_id",
"product_name",
"short_name",
"description",
"duration",
"transfers"
],
"fare_leg_rules": [
"fare_id",
"route_id",
"origin_id",
"destination_id",
"contains_id"
],
"fare_transfer_rules": [
"from_fare_id",
"to_fare_id",
"transfer_type",
"min_transfer_time"
],
"areas": [
"area_id",
"area_name",
"area_description"
],
"stop_areas": [
"stop_area_id",
"stop_id",
"area_id",
"location_type",
"parent_station",
"fare_zone_id"
],
"networks": [
"network_id",
"network_name",
"network_description"
],
"route_networks": [
"route_id",
"network_id"
],
"shapes": [
"shape_id",
"shape_pt_lat",
"shape_pt_lon",
"shape_pt_sequence",
"shape_dist_traveled"
],
"frequencies": [
"trip_id",
"start_time",
"end_time",
"headway_secs",
"exact_times"
],
"transfers": [
"from_stop_id",
"to_stop_id",
"transfer_type",
"min_transfer_time"
],
"pathways": [
"pathway_id",
"from_stop_id",
"to_stop_id",
"pathway_mode",
"is_bidirectional",
"length",
"traversal_time",
"stair_count",
"max_slope",
"min_width",
"signposted_as",
"reversed_signposted_as"
],
"levels": [
"level_id",
"level_index",
"level_name"
],
"location_groups": [
"location_group_id",
"location_group_name"
],
"location_group_stops": [
"location_group_id",
"stop_id"
],
"locations_geojson": [
"type",
"features"
],
"booking_rules": [
"rule_id",
"stop_id",
"rule_type",
"booking_url",
"admission_rules",
"admission_requirements"
],
"translations": [
"table_name",
"field_name",
"language",
"translation"
],
"feed_info": [
"feed_publisher_name",
"feed_publisher_url",
"feed_lang",
"default_lang",
"feed_start_date",
"feed_end_date",
"feed_version",
"feed_contact_email",
"feed_contact_url"
],
"attributions": [
"attribution_id",
"organization_name",
"is_producer"
]
}
primary_keys = { pt_map.models.Agency: "agency_id",
pt_map.models.Level: "level_id",
pt_map.models.Stop: "stop_id",
pt_map.models.Route: "route_id",
pt_map.models.Shape: "shape_id",
pt_map.models.Calendar: "service_id",
pt_map.models.CalendarDate: None,
pt_map.models.Trip: "trip_id",
pt_map.models.LocationGroup: "location_group_id",
pt_map.models.LocationsGeojson: None,
pt_map.models.StopTime: None,
pt_map.models.FareAttribute: "fare_id",
pt_map.models.FareRule: None,
pt_map.models.Frequency: None,
pt_map.models.Transfer: None,
pt_map.models.Pathway: "pathway_id",
pt_map.models.FeedInfo: None,
pt_map.models.BookingRule: "booking_rule_id",
pt_map.models.Translation: None,
pt_map.models.Attribution: "attribution_id",
pt_map.models.LocationGroupStop: None,
pt_map.models.Network: "network_id",
pt_map.models.RouteNetwork: None,
pt_map.models.Area: None,
pt_map.models.StopArea: None,
pt_map.models.FareMedium: "fare_media_id",
pt_map.models.FareProduct: None,
pt_map.models.Timeframe: None,
pt_map.models.FareLegRule: None,
pt_map.models.FareTransferRule: None,
}
foreign_keys = [
(pt_map.models.Agency, []),
(pt_map.models.Level, []),
(pt_map.models.Stop, [(pt_map.models.Stop, 'parent_station'), (pt_map.models.Level, 'level_id'), ]),
(pt_map.models.Route, [(pt_map.models.Agency, 'agency_id'), ]),
(pt_map.models.Shape, []),
(pt_map.models.Calendar, []),
(pt_map.models.CalendarDate, []),
(pt_map.models.Trip, [(pt_map.models.Route, 'route_id'), (pt_map.models.Shape, 'shape_id'), ]),
(pt_map.models.LocationGroup, []),
(pt_map.models.LocationsGeojson, []),
(pt_map.models.StopTime, [(pt_map.models.Trip, 'trip_id'), (pt_map.models.Stop, 'stop_id'), (pt_map.models.LocationGroup, 'location_group_id'), (pt_map.models.LocationsGeojson, 'location_id'), ]),
(pt_map.models.FareAttribute, [(pt_map.models.Agency, 'agency_id'), ]),
(pt_map.models.FareRule, [(pt_map.models.FareAttribute, 'fare_id'), (pt_map.models.Route, 'route_id'), ]),
(pt_map.models.Frequency, [(pt_map.models.Trip, 'trip_id'), ]),
(pt_map.models.Transfer, [(pt_map.models.Stop, 'from_stop_id'), (pt_map.models.Stop, 'to_stop_id'), (pt_map.models.Route, 'from_route_id'), (pt_map.models.Route, 'to_route_id'), (pt_map.models.Trip, 'from_trip_id'), (pt_map.models.Trip, 'to_trip_id'), ]),
(pt_map.models.Pathway, [(pt_map.models.Stop, 'from_stop_id'), (pt_map.models.Stop, 'to_stop_id'), ]),
(pt_map.models.FeedInfo, []),
(pt_map.models.BookingRule, [(pt_map.models.Trip, 'trip_id'), ]),
(pt_map.models.Translation, []),
(pt_map.models.Attribution, [(pt_map.models.Agency, 'agency_id'), (pt_map.models.Route, 'route_id'), (pt_map.models.Trip, 'trip_id'), ]),
(pt_map.models.LocationGroupStop, [(pt_map.models.LocationGroup, 'location_group_id'), (pt_map.models.Stop, 'stop_id'), ]),
(pt_map.models.Network, []),
(pt_map.models.RouteNetwork, [(pt_map.models.Network, 'network_id'), (pt_map.models.Route, 'route_id'), ]),
(pt_map.models.Area, []),
(pt_map.models.StopArea, [(pt_map.models.Area, 'area_id'), (pt_map.models.Stop, 'stop_id'), ]),
(pt_map.models.FareMedium, []),
(pt_map.models.FareProduct, []),
(pt_map.models.Timeframe, []),
(pt_map.models.FareLegRule, [(pt_map.models.Network, 'network_id'), (pt_map.models.Area, 'from_area_id'), (pt_map.models.Area, 'to_area_id'), (pt_map.models.Timeframe, 'from_timeframe_group_id'), (pt_map.models.Timeframe, 'to_timeframe_group_id'), (pt_map.models.FareProduct, 'fare_product_id'), ]),
(pt_map.models.FareTransferRule, [(pt_map.models.FareProduct, 'fare_product_id'), ]),
]
class_names = {'Agency': 'agency', 'Stop': 'stops', 'Route': 'routes', 'Trip': 'trips', 'StopTime': 'stop_times', 'Calendar': 'calendar', 'CalendarDate': 'calendar_dates', 'FareAttribute': 'fare_attributes', 'FareRule': 'fare_rules', 'Timeframe': 'timeframes', 'FareMedium': 'fare_media', 'FareProduct': 'fare_products', 'FareLegRule': 'fare_leg_rules', 'FareTransferRule': 'fare_transfer_rules', 'Area': 'areas', 'StopArea': 'stop_areas', 'Network': 'networks', 'RouteNetwork': 'route_networks', 'Shape': 'shapes', 'Frequency': 'frequencies', 'Transfer': 'transfers', 'Pathway': 'pathways', 'Level': 'levels', 'LocationGroup': 'location_groups', 'LocationGroupStop': 'location_group_stops', 'LocationsGeojson': 'locations_geojson', 'BookingRule': 'booking_rules', 'Translation': 'translations', 'FeedInfo': 'feed_info', 'Attribution': 'attributions'}
def toCamelCase(s: str):
"""
Convert '_'-separated str to CamelCase with the first letter capitalized.
Parameters
----------
s : str
'_'-separated string
Returns
-------
str
CamelCased str, first letter capitalized
"""
return ''.join(word.capitalize() for word in s.split('_'))
def standardize_time(time_str: str):
"""
Convert time str to standardized %H:%M:%S format.
Parameters
----------
time_str: str
str encoding time
Returns
-------
str in format '%H:%M:%S'
"""
date_str = f"Jan 19, 1999 {time_str}"
ntuple=email.utils.parsedate(date_str)
timestamp=time.mktime(ntuple)
date=datetime.datetime.fromtimestamp(timestamp)
return date.strftime('%H:%M:%S')
def is_NaN(v):
"""
Returns
-------
True
If v is either a str representing NaN or NaN as an object
False
Otherwise
"""
return (isinstance(v, str) and v.lower() == "nan") or (isinstance(v, numbers.Number) and math.isnan(v))
def stdz(v, m: django.db.models.Model, f: str):
"""
If f is a time or date field, convert to a format our db can easily work with.
If f is a foreign key
Parameters
----------
v : object
object to be standardized
m : django.db.models.Model
model to be written to
f : str
field name in question
Returns
-------
Converted str
If m.f is a DateField or a TimeField
Unchanged str
Otherwise
"""
if m._meta.get_field(f).get_internal_type() == 'DateField':
return str(v)
if m._meta.get_field(f).get_internal_type() == 'TimeField':
return standardize_time(v)
if m._meta.get_field(f).get_internal_type() == 'ForeignKey':
pass
return v
def to_snake_case(name):
name = name[0].lower() + name[1:]
for c in name[1:]:
if c.isupper():
name.insert(i,'_')
else:
c.lower()
return name
def unqfk(ts, fk):
if not isinstance(fk, str):
fk = str(int(fk))
return f"{ts}{fk}".strip()
def gtfs_to_db(g: pt_map.gtfs.GTFS):
"""
Given a gtfs.GTFS object, write GTFS-compliantly to db by creating the correct models
Parameters
----------
g : gtfs.GTFS
GTFS object to be saved to db
"""
ts = str(int(time.time())-time_delta)
for model in foreign_keys:
if model[0] in [pt_map.models.Calendar, pt_map.models.CalendarDate, ]:
continue
m = model[0]
df = getattr(g, class_names[m.__name__]).data
if not df.empty:
v = gtfs_schema[class_names[m.__name__]]
for _, row in df.iterrows():
for fk in model[1]:
if row.get(fk[1]):
row[fk[1]] = fk[0].objects.get(**{primary_keys[fk[0]]: unqfk(ts, row[fk[1]])})
defaults = {field: stdz(row.get(field), m, field) for field in v if row.get(field) and not is_NaN(row[field])}
print(model[0])
if model[0] == pt_map.models.StopTime:
print(row)
if primary_keys[m]:
row[primary_keys[m]] = unqfk(ts, row[primary_keys[m]])
defaults[primary_keys[m]] = row[primary_keys[m]]
try:
m.objects.get(**{primary_keys[m]: row[primary_keys[m]]})
except m.DoesNotExist:
m.objects.update_or_create(
defaults = defaults,
#**kw_args,
**{primary_keys[m]: row[primary_keys[m]]}
)
else:
m.objects.create(**defaults)
reversed_file_mapping = {
"Agency": "agency",
"Stop": "stops",
"Route": "routes",
"Trip": "trips",
"StopTime": "stop_times",
"Calendar": "calendar",
"CalendarDate": "calendar_dates",
"FareAttribute": "fare_attributes",
"FareRule": "fare_rules",
"Timeframe": "timeframes",
"FareMedium": "fare_media",
"FareProduct": "fare_products",
"FareLegRule": "fare_leg_rules",
"FareTransferRule": "fare_transfer_rules",
"Area": "areas",
"StopArea": "stop_areas",
"Network": "networks",
"RouteNetwork": "route_networks",
"Shape": "shapes",
"Frequency": "frequencies",
"Transfer": "transfers",
"Pathway": "pathways",
"Level": "levels",
"LocationGroup": "location_groups",
"LocationGroupStop": "location_group_stops",
"LocationsGeojson": "locations.geojson",
"BookingRule": "booking_rules",
"Translation": "translations",
"FeedInfo": "feed_info",
"Attribution": "attributions"
}
def db_to_gtfs(q: list[django.db.models.query.QuerySet], folder_path: str = ""):
"""
Convert given list of query sets to gtfs.GTFS object
Parameters
----------
q : list[django.db.models.query.QuerySet]
List of QuerySets containing the retrieved data to be Converted
folder_path : str
path to be set as the results folder_path instance variable
Returns
-------
gtfs.GTFS
object containing the queried data
"""
dfs = {reversed_file_mapping[m.model.__name__]: (pd.DataFrame(list(m.values())) if m else pd.DataFrame()) for m in q}
g = pt_map.gtfs.GTFS(folder_path, dfs)
g.validate()
return g