Got rid of bridge and fixed GTFS compliance
- Ported the bridge that was using a custom GTFS class and Pandas Dataframes to a native Django solution for and interface between db and csv (see api/io.py) - Fixed some issues regarding the compliance of the exported csv files with the GTFS reference. I.e. now allowing times 24:00:00 <= t >= 24:59:59
This commit is contained in:
79
transport_accessibility/api/io.py
Normal file
79
transport_accessibility/api/io.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
from pt_map.models import *
|
||||
import pt_map
|
||||
import pt_map.class_names
|
||||
from django.db import models
|
||||
import inspect
|
||||
import datetime
|
||||
import sys
|
||||
|
||||
def csv_queryset(q: models.query.QuerySet) -> str:
|
||||
fields = [field.name for field in q.model._meta.fields]
|
||||
result = io.StringIO()
|
||||
csv_writer = csv.DictWriter(result, fields)
|
||||
csv_writer.writeheader()
|
||||
|
||||
for model in q:
|
||||
row = {}
|
||||
for field_name in fields:
|
||||
field = model._meta.get_field(field_name)
|
||||
value = model.__getattribute__(field.name)
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(field, models.DateField):
|
||||
row[field.name] = value.strftime("%Y%m%d")
|
||||
elif isinstance(field, models.ForeignKey):
|
||||
row[field.name] = value.pk if not model == Shape else value.shape_id
|
||||
elif isinstance(field, models.ManyToManyField):
|
||||
row[field.name] = value.all().first().__getattribute__(field.name)
|
||||
elif isinstance(field, models.BooleanField):
|
||||
row[field.name] = int(value)
|
||||
else:
|
||||
row[field.name] = value
|
||||
csv_writer.writerow(row)
|
||||
return result.getvalue()
|
||||
|
||||
def models_csv(path: str) -> list[models.Model]:
|
||||
assume_compliance = True
|
||||
if assume_compliance:
|
||||
os.chdir(path)
|
||||
feed = None
|
||||
order = []
|
||||
for m in [*pt_map.class_names.fks.values(), *pt_map.class_names.mtm.values(), *[m for _,m in inspect.getmembers(pt_map.models, inspect.isclass)]]:
|
||||
if m not in order:
|
||||
order.append(m)
|
||||
for m in order:
|
||||
if os.path.exists(pt_map.class_names.file_names[m]):
|
||||
with open(pt_map.class_names.file_names[m], 'r') as f:
|
||||
csvreader = csv.DictReader(f)
|
||||
mtm = {}
|
||||
for row in csvreader:
|
||||
for field in [field for field in m._meta.fields if field.name in csvreader.fieldnames]:
|
||||
if not row[field.name]:
|
||||
del row[field.name]
|
||||
continue
|
||||
if isinstance(field, models.ForeignKey):
|
||||
row[field.name] = pt_map.class_names.fks[field.name].objects.get(pk=f"{feed.pk}_{row[field.name]}")
|
||||
elif isinstance(field, models.DateField):
|
||||
row[field.name] = datetime.datetime.fromisoformat(row[field.name])
|
||||
elif (field.primary_key and feed) or field.name == 'service_id':
|
||||
row[field.name] = f"{feed.pk}_{row[field.name]}"
|
||||
for field in m._meta.many_to_many:
|
||||
mtm[field.name] = pt_map.class_names.mtm[field.name].objects.filter(**{field.name: row[field.name]})
|
||||
del row[field.name]
|
||||
if feed:
|
||||
row['feed_info_id'] = feed
|
||||
if m == pt_map.models.Shape:
|
||||
row['shape_id'] = f"{feed.pk}_{row['shape_id']}"
|
||||
obj = m.objects.create(**row)
|
||||
for name, value in mtm.items():
|
||||
getattr(obj, name).set(value)
|
||||
if not feed:
|
||||
feed = obj
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user