transport-accessibility/transport_accessibility/api/io.py
Johannes Randerath f314bfb396 Got rid of bridge and fixed GTFS compliance
- Ported the bridge that was using a custom GTFS class and Pandas
  Dataframes to a native Django solution for and interface between db
  and csv (see api/io.py)
- Fixed some issues regarding the compliance of the exported csv files
  with the GTFS reference. I.e. now allowing times 24:00:00 <= t >=
  24:59:59
2024-06-24 14:21:53 +02:00

80 lines
3.3 KiB
Python

import csv
import io
import os
from pt_map.models import *
import pt_map
import pt_map.class_names
from django.db import models
import inspect
import datetime
import sys
def csv_queryset(q: models.query.QuerySet) -> str:
fields = [field.name for field in q.model._meta.fields]
result = io.StringIO()
csv_writer = csv.DictWriter(result, fields)
csv_writer.writeheader()
for model in q:
row = {}
for field_name in fields:
field = model._meta.get_field(field_name)
value = model.__getattribute__(field.name)
if value is None:
continue
if isinstance(field, models.DateField):
row[field.name] = value.strftime("%Y%m%d")
elif isinstance(field, models.ForeignKey):
row[field.name] = value.pk if not model == Shape else value.shape_id
elif isinstance(field, models.ManyToManyField):
row[field.name] = value.all().first().__getattribute__(field.name)
elif isinstance(field, models.BooleanField):
row[field.name] = int(value)
else:
row[field.name] = value
csv_writer.writerow(row)
return result.getvalue()
def models_csv(path: str) -> list[models.Model]:
assume_compliance = True
if assume_compliance:
os.chdir(path)
feed = None
order = []
for m in [*pt_map.class_names.fks.values(), *pt_map.class_names.mtm.values(), *[m for _,m in inspect.getmembers(pt_map.models, inspect.isclass)]]:
if m not in order:
order.append(m)
for m in order:
if os.path.exists(pt_map.class_names.file_names[m]):
with open(pt_map.class_names.file_names[m], 'r') as f:
csvreader = csv.DictReader(f)
mtm = {}
for row in csvreader:
for field in [field for field in m._meta.fields if field.name in csvreader.fieldnames]:
if not row[field.name]:
del row[field.name]
continue
if isinstance(field, models.ForeignKey):
row[field.name] = pt_map.class_names.fks[field.name].objects.get(pk=f"{feed.pk}_{row[field.name]}")
elif isinstance(field, models.DateField):
row[field.name] = datetime.datetime.fromisoformat(row[field.name])
elif (field.primary_key and feed) or field.name == 'service_id':
row[field.name] = f"{feed.pk}_{row[field.name]}"
for field in m._meta.many_to_many:
mtm[field.name] = pt_map.class_names.mtm[field.name].objects.filter(**{field.name: row[field.name]})
del row[field.name]
if feed:
row['feed_info_id'] = feed
if m == pt_map.models.Shape:
row['shape_id'] = f"{feed.pk}_{row['shape_id']}"
obj = m.objects.create(**row)
for name, value in mtm.items():
getattr(obj, name).set(value)
if not feed:
feed = obj