transport-accessibility/transport_accessibility/pt_map/bridge.py
2024-06-17 14:26:23 +02:00

230 lines
7.0 KiB
Python

"""
Bridge
======
Bridge between Django with its models and database and gtfs.GTFS as intermediate object for File IO.
Contents
--------
Constants
---------
reversed_file_mapping : dict(str,str)
Map CamelCased filenames to '_'-separated
class_names : dict{str,str}
Map CamelCase, singularized class names to pluralized, snake_cased file names
primary_keys : dict{str, (str or None)}
For all pt_map.models, map primary keys if applicable
foreign_keys
For all pt_map.models, map foreign keys if any. Also ordered for model creation without foreign reference conflicts.
time_delta : int
Unix time for Jan 1, 2024. To be used to calculate time prefix strings.
Functions
---------
to_camel_case(s):
Converts '_'-separated str to CamelCase with capital first letter
standardize_time(time_str):
Converts str in unicode time format to %H:%M:%S format with normalized 24 hour time
is_NaN(v):
Checks if given variable is either a str expressing NaN or NaN as object
stdz(v):
Standardize date and time formats
gtfs_to_db(g):
Write an existing gtfs.GTFS object to the database using the GTFS compliant models
db_to_gtfs(q, folder_path):
Convert list of query sets to gtfs.GTFS object and write to specified folder if validation for GTFS compliance passes.
"""
import pt_map.gtfs
import pt_map.models
import pandas as pd
from pattern.text.en import singularize, pluralize
import math
import numbers
import email.utils
import time
import datetime
import django.db.models
import time
from pt_map.gtfs_schema import gtfs_schema
from.class_names import *
def toCamelCase(s: str):
"""
Convert '_'-separated str to CamelCase with the first letter capitalized.
Parameters
----------
s : str
'_'-separated string
Returns
-------
str
CamelCased str, first letter capitalized
"""
return ''.join(word.capitalize() for word in s.split('_'))
def standardize_time(time_str: str):
"""
Convert time str to standardized %H:%M:%S format.
Parameters
----------
time_str: str
str encoding time
Returns
-------
str in format '%H:%M:%S'
"""
date_str = f"Jan 19, 1999 {time_str}"
ntuple=email.utils.parsedate(date_str)
timestamp=time.mktime(ntuple)
date=datetime.datetime.fromtimestamp(timestamp)
return date.strftime('%H:%M:%S')
def is_NaN(v):
"""
Returns
-------
True
If v is either a str representing NaN or NaN as an object
False
Otherwise
"""
return (isinstance(v, str) and v.lower() == "nan") or (isinstance(v, numbers.Number) and math.isnan(v))
def stdz(v, m: django.db.models.Model, f: str):
"""
If f is a time or date field, convert to a format our db can easily work with.
If f is a foreign key
Parameters
----------
v : object
object to be standardized
m : django.db.models.Model
model to be written to
f : str
field name in question
Returns
-------
Converted str
If m.f is a DateField or a TimeField
Unchanged str
Otherwise
"""
if m._meta.get_field(f).get_internal_type() == 'DateField':
return str(v)
if m._meta.get_field(f).get_internal_type() == 'TimeField':
return standardize_time(v)
if m._meta.get_field(f).get_internal_type() == 'ForeignKey':
pass
return v
def to_snake_case(name):
"""
Convert CamelCase to snake_case.
Parameters
----------
name : str
str in CamelCase
Returns
-------
Str in snake_case
"""
name = name[0].lower() + name[1:]
for c in name[1:]:
if c.isupper():
name.insert(i,'_')
else:
c.lower()
return name
def unqfk(ts, fk):
"""
Primary keys of imported data and in the database are likely to overlap. To avoid this, the current time in seconds since Jan 1, 2024 is added as a prefix.
Foreign key references must know of this new key so they are processed in the same way. To make this possible, we use the same time in seconds for all objects.
Parameters
----------
ts : str
time in seconds to be prepended
fk : primary or foreign key to be processed.
Returns
-------
Str with prefix
"""
if not isinstance(fk, str):
fk = str(int(fk))
return f"{ts}{fk}".strip()
def gtfs_to_db(g: pt_map.gtfs.GTFS):
"""
Given a gtfs.GTFS object, write GTFS-compliantly to db by creating the correct models
Parameters
----------
g : gtfs.GTFS
GTFS object to be saved to db
"""
ts = str(int(time.time())-time_delta) # Prepend the current time in seconds since Jan 1, 2024 to ids to make them more or less unique
for model in foreign_keys:
m = model[0]
df = getattr(g, class_names[m.__name__]).data # Extract dataframe for each model from gtfs.GTFS object
if not df.empty: # Only process GTFS files actually present
v = gtfs_schema[class_names[m.__name__]] # field names
for _, row in df.iterrows(): # the rows of the dataframe are the individual entries in the GTFS file and should be the individual instances of the db model
for fk in model[1]: # Map foreign_keys to objects of the foreign model
if row.get(fk[1]):
row[fk[1]] = fk[0].objects.get(**{primary_keys[fk[0]]: unqfk(ts, row[fk[1]])})
defaults = {field: stdz(row.get(field), m, field) for field in v if row.get(field) and not is_NaN(row[field])} # dict of fields and values of current model object to create
if primary_keys[m]:
row[primary_keys[m]] = unqfk(ts, row[primary_keys[m]]) # primary_keys should be unique, use current time in seconds as a prefix
defaults[primary_keys[m]] = row[primary_keys[m]]
try:
m.objects.get(**{primary_keys[m]: row[primary_keys[m]]}) # Make sure there is no object with identical primary_key, exception is expected to be risen
except m.DoesNotExist:
m.objects.update_or_create(
defaults = defaults,
#**kw_args,
**{primary_keys[m]: row[primary_keys[m]]}
)
else:
m.objects.create(**defaults)
def db_to_gtfs(q: list[django.db.models.query.QuerySet], folder_path: str = ""):
"""
Convert given list of query sets to gtfs.GTFS object
Parameters
----------
q : list[django.db.models.query.QuerySet]
List of QuerySets containing the retrieved data to be Converted
folder_path : str
path to be set as the results folder_path instance variable
Returns
-------
gtfs.GTFS
object containing the queried data
"""
dfs = {reversed_file_mapping[m.model.__name__]: (pd.DataFrame(list(m.values())) if m else pd.DataFrame()) for m in q}
g = pt_map.gtfs.GTFS(folder_path, dfs)
g.validate()
return g