transport-accessibility/transport_accessibility/pt_map/views.py
2024-06-20 01:26:53 +02:00

189 lines
9.2 KiB
Python

"""
Views
=====
Views reacting to Http Requests by interfacing between backend and frontend.
Functions
---------
index(request)
Home page
"""
from django.shortcuts import render
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseNotAllowed, HttpRequest
from django.core.exceptions import BadRequest, ObjectDoesNotExist
from django.core import serializers
import django.db.models
from MySQLdb import IntegrityError
from .models import *
from .forms import *
import json
from datetime import datetime, date
from django.views.decorators.csrf import csrf_exempt
from .class_names import *
class GTFSSerializer(serializers.json.Serializer):
def serialize(self, queryset, **options):
return json.dumps([{field: obj['fields'][field] for field in obj['fields'] if obj['fields'][field] == 0 or obj['fields'][field]} for obj in json.loads(super().serialize(queryset, **options))])
def get_timetable(r, trips_r, stop_sequence):
"""
Given a pt_map.models.Route, calculate the timetable for all its stops.
Parameters
----------
r : pt_map.models.Route
Route, the timetable should be calculated for
trips : dict(str, list(pt_map.Trip))
Dictionary mapping all trips to route_ids they travel on
stop_sequences : dict(str, list(str))
Dict mapping route_ids to lists of stop_ids they serve. Currently the first trip is taken as reference for stops and sequence.
Returns
-------
dict{"stop_sequence": list(str), "stop_times": dict(str, list(str)}
Dict containing two elements:
"stop_sequence" : list(str)
list of stop_ids the route serves
"stop_times" : dict(str, list(str))
dict mapping stop_ids from stop_sequence to time strings the route is serving the stop at
"""
timetable = {"stop_sequence": stop_sequence}
sts = {}
for stop in stop_sequence:
times = []
for t in trips_r:
for st in StopTime.objects.filter(trip_id=t.trip_id):
times.append(st.departure_time.strftime("%H:%M"))
sts[stop] = times
timetable["stop_times"] = sts
return timetable
def index(request):
stops = {s.stop_id: {name: getattr(s, name) for name in ['stop_name', 'stop_lat', 'stop_lon']} for s in Stop.objects.all()}
route_name = lambda r : r.route_short_name if r.route_short_name else r.route_long_name
routes = [{"route_id": r.route_id, "route_type": r.route_type, "route_name": route_name(r), "agency_id": r.agency_id.agency_id} for r in Route.objects.all()]
trips = {r["route_id"]: [t for t in Trip.objects.filter(route_id_id=r["route_id"])] for r in routes}
stop_sequences = {}
for r in routes:
seq = []
t = trips[r["route_id"]]
for s in StopTime.objects.filter(trip_id_id__exact=t[0].trip_id):
seq.append(s)
stop_sequences[r["route_id"]] = [s.stop_id.stop_id for s in sorted(seq, key=lambda st : st.stop_sequence)]
timetable = {}
if request.GET.get("timetable"):
try:
r = Route.objects.get(route_id=request.GET.get("timetable"))
timetable = get_timetable(r, trips[r.route_id], stop_sequences[r.route_id])
except Route.DoesNotExist:
print(f"Invalid request for Route with id {request.GET['timetable']}")
context = {"stops": json.dumps(stops), "routes": json.dumps(routes), "timetable": json.dumps(timetable)}
return render(request,"map.html", context)
def get_field_names(model: models.Model):
return [field.name for field in model._meta.fields]
def timetable(request):
if request.method == "GET":
try:
r = Route.objects.get(route_id=request.GET["route_id"])
trips_r = [t for t in Trip.objects.filter(route_id_id=r.route_id)]
stop_sequence = [s.stop_id.stop_id for s in sorted([s for s in StopTime.objects.filter(trip_id_id__exact=trips_r[0].trip_id)], key=lambda st : st.stop_sequence)]
timetable = get_timetable(r, trips_r, stop_sequence)
return HttpResponse(json.dumps(timetable), content_type="text/json")
except KeyError:
return HttpResponseBadRequest("route_id missing or malformed.")
except Route.DoesNotExist:
return HttpResponseBadRequest("Route not found.")
return HttpResponseNotAllowed(["GET"])
def get_pks_from_get(req_get):
result = {}
for k in req_get.keys():
if k in classes_by_primary_keys.keys():
result[classes_by_primary_keys[k]] = req_get.getlist(k)
if not result:
raise ValueError("No pks found.")
return result
def get_obj_by_pk(mdl: models.Model, pks: list[str]):
return [obj for obj in [mdl.objects.get(**{primary_keys[mdl]: pk}) for pk in pks] if obj]
def obj_from_get(req_get) -> str:
print({mdl: get_obj_by_pk(mdl, keys) for mdl, keys in get_pks_from_get(req_get).items()})
return {mdl: get_obj_by_pk(mdl, keys) for mdl, keys in get_pks_from_get(req_get).items()}
@csrf_exempt
def data(request):
"""
Handle database requests from the frontend. Using Http semantics to specify what to do with the data.
Request
-------
PUT
Create a new object if no object with the given primary key exists in the database or delete and replace an existing object.
Body must be a json dict of lists of fully specified, valid models. Primary keys can be omitted and will be ignored if the element does not exist in the database.
If primary keys are given, the elements are deleted and replaced. Note that if there is an error in creating the new object, the object to replace will still probably already have been deleted.
Successful response is 200 with a list of primary keys of the created and replaced objects.
PATCH
Modify an existing objects given the instructions in the body.
Body must be a json dict of lists of fields to change and their valid values existing objects in the database, identified by their valid primary keys.
Responds 400 if any of the primary keys given does not exist.
Successful response is 200 with a list of the primary keys of the modified objects.
GET
Return json of models identified by primary keys.
Responds 400 if any of the requested pks does not exist.
DELETE
Delete models with given primary keys if they exist.
Responds 400 if any of the primary keys given does not exist in the database.
Successful response is 200 and the number of deleted models.
"""
if request.method in ["PUT", "PATCH", "DELETE"]:
new = 0
modified = 0
if not request.META["CONTENT_TYPE"] == 'application/json':
HttpResponseBadRequest('Request must be JSON.')
try:
for o in serializers.deserialize('json', request.body):
try:
obj = o.object.__class__.objects.get(pk=o.object.pk)
modified += 1
if request.method == "PATCH":
for f,v in o.object.__dict__.items():
if v:
setattr(obj, f, v)
obj.save()
else:
obj.delete()
if request.method == "PUT":
o.save()
except ObjectDoesNotExist:
if not request.method == "PUT":
return HttpResponseBadRequest("Object(s) not found. {modified} objects touched.")
new += 1
o.save()
except django.db.IntegrityError:
return HttpResponseBadRequest("Could not write to database. Probably the objects you were trying to create or update where not compliant with GTFS's or the API's specification.")
except IntegrityError:
return HttpResponseBadRequest("There was an error while trying to write to the database. Probably a foreign key could not be resolved. Did you specify the objects in the correct order, if they depend on each other?")
except serializers.base.DeserializationError:
return HttpResponseBadRequest("Could not process the JSON you sent me correctly. Did you comply with the format required by the API and used valid JSON?")
except json.JSONDecodeError:
return HttpResponseBadRequest("Invalid JSON.")
except django.db.utils.DataError:
return HttpResponseBadRequest("One of your objects has fields that do not fit in their corresponding fields in the database. Did you comply to all data type requirements?")
return HttpResponse(f"OK. {new} new objects, {modified} replaced.") if request.method == "PUT" else HttpResponse(f"OK. {'Patched' if request.method == 'PATCH' else 'Deleted'} {modified} objects.")
elif request.method == "GET":
try:
pks = get_pks_from_get(request.GET)
except ValueError:
return HttpResponseBadRequest("No valid pks given.")
try:
return HttpResponse(json.dumps({mdl._meta.object_name: GTFSSerializer().serialize(v) for mdl,v in obj_from_get(request.GET).items()}), content_type='application/json')
except ObjectDoesNotExist:
return HttpResponseBadRequest("Object(s) not found.")
return HttpResponseNotAllowed(['PUT', 'PATCH', 'GET'])