Backend changes and sql convenience

- Added sql files to create and drop db and to feed example data.
- Added bridge to import (and later export) data from a gtfs.GTFS object
  to the database.
- Updated models and migrations to implement the whole GTFS reference.
This commit is contained in:
Johannes Randerath
2024-06-02 20:58:39 +02:00
parent e8d02517af
commit af5a2c862d
26 changed files with 31006 additions and 64 deletions

8
bin/calc-prorate Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from tempora import calculate_prorated_values
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(calculate_prorated_values())

8
bin/cheroot Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from cheroot.cli import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())

8
bin/cherryd Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from cherrypy.__main__ import run
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(run())

473
bin/dumppdf.py Executable file
View File

@@ -0,0 +1,473 @@
#!/home/johannes/code/transport-accessibility/bin/python
"""Extract pdf structure in XML format"""
import logging
import os.path
import re
import sys
from typing import Any, Container, Dict, Iterable, List, Optional, TextIO, Union, cast
from argparse import ArgumentParser
import pdfminer
from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines, PDFXRefFallback
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfparser import PDFParser
from pdfminer.pdftypes import PDFObjectNotFound, PDFValueError
from pdfminer.pdftypes import PDFStream, PDFObjRef, resolve1, stream_value
from pdfminer.psparser import PSKeyword, PSLiteral, LIT
from pdfminer.utils import isnumber
logging.basicConfig()
logger = logging.getLogger(__name__)
ESC_PAT = re.compile(r'[\000-\037&<>()"\042\047\134\177-\377]')
def escape(s: Union[str, bytes]) -> str:
if isinstance(s, bytes):
us = str(s, "latin-1")
else:
us = s
return ESC_PAT.sub(lambda m: "&#%d;" % ord(m.group(0)), us)
def dumpxml(out: TextIO, obj: object, codec: Optional[str] = None) -> None:
if obj is None:
out.write("<null />")
return
if isinstance(obj, dict):
out.write('<dict size="%d">\n' % len(obj))
for (k, v) in obj.items():
out.write("<key>%s</key>\n" % k)
out.write("<value>")
dumpxml(out, v)
out.write("</value>\n")
out.write("</dict>")
return
if isinstance(obj, list):
out.write('<list size="%d">\n' % len(obj))
for v in obj:
dumpxml(out, v)
out.write("\n")
out.write("</list>")
return
if isinstance(obj, (str, bytes)):
out.write('<string size="%d">%s</string>' % (len(obj), escape(obj)))
return
if isinstance(obj, PDFStream):
if codec == "raw":
# Bug: writing bytes to text I/O. This will raise TypeError.
out.write(obj.get_rawdata()) # type: ignore [arg-type]
elif codec == "binary":
# Bug: writing bytes to text I/O. This will raise TypeError.
out.write(obj.get_data()) # type: ignore [arg-type]
else:
out.write("<stream>\n<props>\n")
dumpxml(out, obj.attrs)
out.write("\n</props>\n")
if codec == "text":
data = obj.get_data()
out.write('<data size="%d">%s</data>\n' % (len(data), escape(data)))
out.write("</stream>")
return
if isinstance(obj, PDFObjRef):
out.write('<ref id="%d" />' % obj.objid)
return
if isinstance(obj, PSKeyword):
# Likely bug: obj.name is bytes, not str
out.write("<keyword>%s</keyword>" % obj.name) # type: ignore [str-bytes-safe]
return
if isinstance(obj, PSLiteral):
# Likely bug: obj.name may be bytes, not str
out.write("<literal>%s</literal>" % obj.name) # type: ignore [str-bytes-safe]
return
if isnumber(obj):
out.write("<number>%s</number>" % obj)
return
raise TypeError(obj)
def dumptrailers(
out: TextIO, doc: PDFDocument, show_fallback_xref: bool = False
) -> None:
for xref in doc.xrefs:
if not isinstance(xref, PDFXRefFallback) or show_fallback_xref:
out.write("<trailer>\n")
dumpxml(out, xref.get_trailer())
out.write("\n</trailer>\n\n")
no_xrefs = all(isinstance(xref, PDFXRefFallback) for xref in doc.xrefs)
if no_xrefs and not show_fallback_xref:
msg = (
"This PDF does not have an xref. Use --show-fallback-xref if "
"you want to display the content of a fallback xref that "
"contains all objects."
)
logger.warning(msg)
return
def dumpallobjs(
out: TextIO,
doc: PDFDocument,
codec: Optional[str] = None,
show_fallback_xref: bool = False,
) -> None:
visited = set()
out.write("<pdf>")
for xref in doc.xrefs:
for objid in xref.get_objids():
if objid in visited:
continue
visited.add(objid)
try:
obj = doc.getobj(objid)
if obj is None:
continue
out.write('<object id="%d">\n' % objid)
dumpxml(out, obj, codec=codec)
out.write("\n</object>\n\n")
except PDFObjectNotFound as e:
print("not found: %r" % e)
dumptrailers(out, doc, show_fallback_xref)
out.write("</pdf>")
return
def dumpoutline(
outfp: TextIO,
fname: str,
objids: Any,
pagenos: Container[int],
password: str = "",
dumpall: bool = False,
codec: Optional[str] = None,
extractdir: Optional[str] = None,
) -> None:
fp = open(fname, "rb")
parser = PDFParser(fp)
doc = PDFDocument(parser, password)
pages = {
page.pageid: pageno
for (pageno, page) in enumerate(PDFPage.create_pages(doc), 1)
}
def resolve_dest(dest: object) -> Any:
if isinstance(dest, (str, bytes)):
dest = resolve1(doc.get_dest(dest))
elif isinstance(dest, PSLiteral):
dest = resolve1(doc.get_dest(dest.name))
if isinstance(dest, dict):
dest = dest["D"]
if isinstance(dest, PDFObjRef):
dest = dest.resolve()
return dest
try:
outlines = doc.get_outlines()
outfp.write("<outlines>\n")
for (level, title, dest, a, se) in outlines:
pageno = None
if dest:
dest = resolve_dest(dest)
pageno = pages[dest[0].objid]
elif a:
action = a
if isinstance(action, dict):
subtype = action.get("S")
if subtype and repr(subtype) == "/'GoTo'" and action.get("D"):
dest = resolve_dest(action["D"])
pageno = pages[dest[0].objid]
s = escape(title)
outfp.write('<outline level="{!r}" title="{}">\n'.format(level, s))
if dest is not None:
outfp.write("<dest>")
dumpxml(outfp, dest)
outfp.write("</dest>\n")
if pageno is not None:
outfp.write("<pageno>%r</pageno>\n" % pageno)
outfp.write("</outline>\n")
outfp.write("</outlines>\n")
except PDFNoOutlines:
pass
parser.close()
fp.close()
return
LITERAL_FILESPEC = LIT("Filespec")
LITERAL_EMBEDDEDFILE = LIT("EmbeddedFile")
def extractembedded(fname: str, password: str, extractdir: str) -> None:
def extract1(objid: int, obj: Dict[str, Any]) -> None:
filename = os.path.basename(obj.get("UF") or cast(bytes, obj.get("F")).decode())
fileref = obj["EF"].get("UF") or obj["EF"].get("F")
fileobj = doc.getobj(fileref.objid)
if not isinstance(fileobj, PDFStream):
error_msg = (
"unable to process PDF: reference for %r is not a "
"PDFStream" % filename
)
raise PDFValueError(error_msg)
if fileobj.get("Type") is not LITERAL_EMBEDDEDFILE:
raise PDFValueError(
"unable to process PDF: reference for %r "
"is not an EmbeddedFile" % (filename)
)
path = os.path.join(extractdir, "%.6d-%s" % (objid, filename))
if os.path.exists(path):
raise IOError("file exists: %r" % path)
print("extracting: %r" % path)
os.makedirs(os.path.dirname(path), exist_ok=True)
out = open(path, "wb")
out.write(fileobj.get_data())
out.close()
return
with open(fname, "rb") as fp:
parser = PDFParser(fp)
doc = PDFDocument(parser, password)
extracted_objids = set()
for xref in doc.xrefs:
for objid in xref.get_objids():
obj = doc.getobj(objid)
if (
objid not in extracted_objids
and isinstance(obj, dict)
and obj.get("Type") is LITERAL_FILESPEC
):
extracted_objids.add(objid)
extract1(objid, obj)
return
def dumppdf(
outfp: TextIO,
fname: str,
objids: Iterable[int],
pagenos: Container[int],
password: str = "",
dumpall: bool = False,
codec: Optional[str] = None,
extractdir: Optional[str] = None,
show_fallback_xref: bool = False,
) -> None:
fp = open(fname, "rb")
parser = PDFParser(fp)
doc = PDFDocument(parser, password)
if objids:
for objid in objids:
obj = doc.getobj(objid)
dumpxml(outfp, obj, codec=codec)
if pagenos:
for (pageno, page) in enumerate(PDFPage.create_pages(doc)):
if pageno in pagenos:
if codec:
for obj in page.contents:
obj = stream_value(obj)
dumpxml(outfp, obj, codec=codec)
else:
dumpxml(outfp, page.attrs)
if dumpall:
dumpallobjs(outfp, doc, codec, show_fallback_xref)
if (not objids) and (not pagenos) and (not dumpall):
dumptrailers(outfp, doc, show_fallback_xref)
fp.close()
if codec not in ("raw", "binary"):
outfp.write("\n")
return
def create_parser() -> ArgumentParser:
parser = ArgumentParser(description=__doc__, add_help=True)
parser.add_argument(
"files",
type=str,
default=None,
nargs="+",
help="One or more paths to PDF files.",
)
parser.add_argument(
"--version",
"-v",
action="version",
version="pdfminer.six v{}".format(pdfminer.__version__),
)
parser.add_argument(
"--debug",
"-d",
default=False,
action="store_true",
help="Use debug logging level.",
)
procedure_parser = parser.add_mutually_exclusive_group()
procedure_parser.add_argument(
"--extract-toc",
"-T",
default=False,
action="store_true",
help="Extract structure of outline",
)
procedure_parser.add_argument(
"--extract-embedded", "-E", type=str, help="Extract embedded files"
)
parse_params = parser.add_argument_group(
"Parser", description="Used during PDF parsing"
)
parse_params.add_argument(
"--page-numbers",
type=int,
default=None,
nargs="+",
help="A space-seperated list of page numbers to parse.",
)
parse_params.add_argument(
"--pagenos",
"-p",
type=str,
help="A comma-separated list of page numbers to parse. Included for "
"legacy applications, use --page-numbers for more idiomatic "
"argument entry.",
)
parse_params.add_argument(
"--objects",
"-i",
type=str,
help="Comma separated list of object numbers to extract",
)
parse_params.add_argument(
"--all",
"-a",
default=False,
action="store_true",
help="If the structure of all objects should be extracted",
)
parse_params.add_argument(
"--show-fallback-xref",
action="store_true",
help="Additionally show the fallback xref. Use this if the PDF "
"has zero or only invalid xref's. This setting is ignored if "
"--extract-toc or --extract-embedded is used.",
)
parse_params.add_argument(
"--password",
"-P",
type=str,
default="",
help="The password to use for decrypting PDF file.",
)
output_params = parser.add_argument_group(
"Output", description="Used during output generation."
)
output_params.add_argument(
"--outfile",
"-o",
type=str,
default="-",
help='Path to file where output is written. Or "-" (default) to '
"write to stdout.",
)
codec_parser = output_params.add_mutually_exclusive_group()
codec_parser.add_argument(
"--raw-stream",
"-r",
default=False,
action="store_true",
help="Write stream objects without encoding",
)
codec_parser.add_argument(
"--binary-stream",
"-b",
default=False,
action="store_true",
help="Write stream objects with binary encoding",
)
codec_parser.add_argument(
"--text-stream",
"-t",
default=False,
action="store_true",
help="Write stream objects as plain text",
)
return parser
def main(argv: Optional[List[str]] = None) -> None:
parser = create_parser()
args = parser.parse_args(args=argv)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
if args.outfile == "-":
outfp = sys.stdout
else:
outfp = open(args.outfile, "w")
if args.objects:
objids = [int(x) for x in args.objects.split(",")]
else:
objids = []
if args.page_numbers:
pagenos = {x - 1 for x in args.page_numbers}
elif args.pagenos:
pagenos = {int(x) - 1 for x in args.pagenos.split(",")}
else:
pagenos = set()
password = args.password
if args.raw_stream:
codec: Optional[str] = "raw"
elif args.binary_stream:
codec = "binary"
elif args.text_stream:
codec = "text"
else:
codec = None
for fname in args.files:
if args.extract_toc:
dumpoutline(
outfp,
fname,
objids,
pagenos,
password=password,
dumpall=args.all,
codec=codec,
extractdir=None,
)
elif args.extract_embedded:
extractembedded(fname, password=password, extractdir=args.extract_embedded)
else:
dumppdf(
outfp,
fname,
objids,
pagenos,
password=password,
dumpall=args.all,
codec=codec,
extractdir=None,
show_fallback_xref=args.show_fallback_xref,
)
outfp.close()
if __name__ == "__main__":
main()

8
bin/futurize Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from libfuturize.main import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())

8
bin/nltk Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from nltk.cli import cli
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(cli())

8
bin/normalizer Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from charset_normalizer.cli import cli_detect
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(cli_detect())

8
bin/pasteurize Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from libpasteurize.main import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())

317
bin/pdf2txt.py Executable file
View File

@@ -0,0 +1,317 @@
#!/home/johannes/code/transport-accessibility/bin/python
"""A command line tool for extracting text and images from PDF and
output it to plain text, html, xml or tags."""
import argparse
import logging
import sys
from typing import Any, Container, Iterable, List, Optional
import pdfminer.high_level
from pdfminer.layout import LAParams
from pdfminer.utils import AnyIO
logging.basicConfig()
OUTPUT_TYPES = ((".htm", "html"), (".html", "html"), (".xml", "xml"), (".tag", "tag"))
def float_or_disabled(x: str) -> Optional[float]:
if x.lower().strip() == "disabled":
return None
try:
return float(x)
except ValueError:
raise argparse.ArgumentTypeError("invalid float value: {}".format(x))
def extract_text(
files: Iterable[str] = [],
outfile: str = "-",
laparams: Optional[LAParams] = None,
output_type: str = "text",
codec: str = "utf-8",
strip_control: bool = False,
maxpages: int = 0,
page_numbers: Optional[Container[int]] = None,
password: str = "",
scale: float = 1.0,
rotation: int = 0,
layoutmode: str = "normal",
output_dir: Optional[str] = None,
debug: bool = False,
disable_caching: bool = False,
**kwargs: Any
) -> AnyIO:
if not files:
raise ValueError("Must provide files to work upon!")
if output_type == "text" and outfile != "-":
for override, alttype in OUTPUT_TYPES:
if outfile.endswith(override):
output_type = alttype
if outfile == "-":
outfp: AnyIO = sys.stdout
if sys.stdout.encoding is not None:
codec = "utf-8"
else:
outfp = open(outfile, "wb")
for fname in files:
with open(fname, "rb") as fp:
pdfminer.high_level.extract_text_to_fp(fp, **locals())
return outfp
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__, add_help=True)
parser.add_argument(
"files",
type=str,
default=None,
nargs="+",
help="One or more paths to PDF files.",
)
parser.add_argument(
"--version",
"-v",
action="version",
version="pdfminer.six v{}".format(pdfminer.__version__),
)
parser.add_argument(
"--debug",
"-d",
default=False,
action="store_true",
help="Use debug logging level.",
)
parser.add_argument(
"--disable-caching",
"-C",
default=False,
action="store_true",
help="If caching or resources, such as fonts, should be disabled.",
)
parse_params = parser.add_argument_group(
"Parser", description="Used during PDF parsing"
)
parse_params.add_argument(
"--page-numbers",
type=int,
default=None,
nargs="+",
help="A space-seperated list of page numbers to parse.",
)
parse_params.add_argument(
"--pagenos",
"-p",
type=str,
help="A comma-separated list of page numbers to parse. "
"Included for legacy applications, use --page-numbers "
"for more idiomatic argument entry.",
)
parse_params.add_argument(
"--maxpages",
"-m",
type=int,
default=0,
help="The maximum number of pages to parse.",
)
parse_params.add_argument(
"--password",
"-P",
type=str,
default="",
help="The password to use for decrypting PDF file.",
)
parse_params.add_argument(
"--rotation",
"-R",
default=0,
type=int,
help="The number of degrees to rotate the PDF "
"before other types of processing.",
)
la_params = LAParams() # will be used for defaults
la_param_group = parser.add_argument_group(
"Layout analysis", description="Used during layout analysis."
)
la_param_group.add_argument(
"--no-laparams",
"-n",
default=False,
action="store_true",
help="If layout analysis parameters should be ignored.",
)
la_param_group.add_argument(
"--detect-vertical",
"-V",
default=la_params.detect_vertical,
action="store_true",
help="If vertical text should be considered during layout analysis",
)
la_param_group.add_argument(
"--line-overlap",
type=float,
default=la_params.line_overlap,
help="If two characters have more overlap than this they "
"are considered to be on the same line. The overlap is specified "
"relative to the minimum height of both characters.",
)
la_param_group.add_argument(
"--char-margin",
"-M",
type=float,
default=la_params.char_margin,
help="If two characters are closer together than this margin they "
"are considered to be part of the same line. The margin is "
"specified relative to the width of the character.",
)
la_param_group.add_argument(
"--word-margin",
"-W",
type=float,
default=la_params.word_margin,
help="If two characters on the same line are further apart than this "
"margin then they are considered to be two separate words, and "
"an intermediate space will be added for readability. The margin "
"is specified relative to the width of the character.",
)
la_param_group.add_argument(
"--line-margin",
"-L",
type=float,
default=la_params.line_margin,
help="If two lines are close together they are considered to "
"be part of the same paragraph. The margin is specified "
"relative to the height of a line.",
)
la_param_group.add_argument(
"--boxes-flow",
"-F",
type=float_or_disabled,
default=la_params.boxes_flow,
help="Specifies how much a horizontal and vertical position of a "
"text matters when determining the order of lines. The value "
"should be within the range of -1.0 (only horizontal position "
"matters) to +1.0 (only vertical position matters). You can also "
"pass `disabled` to disable advanced layout analysis, and "
"instead return text based on the position of the bottom left "
"corner of the text box.",
)
la_param_group.add_argument(
"--all-texts",
"-A",
default=la_params.all_texts,
action="store_true",
help="If layout analysis should be performed on text in figures.",
)
output_params = parser.add_argument_group(
"Output", description="Used during output generation."
)
output_params.add_argument(
"--outfile",
"-o",
type=str,
default="-",
help="Path to file where output is written. "
'Or "-" (default) to write to stdout.',
)
output_params.add_argument(
"--output_type",
"-t",
type=str,
default="text",
help="Type of output to generate {text,html,xml,tag}.",
)
output_params.add_argument(
"--codec",
"-c",
type=str,
default="utf-8",
help="Text encoding to use in output file.",
)
output_params.add_argument(
"--output-dir",
"-O",
default=None,
help="The output directory to put extracted images in. If not given, "
"images are not extracted.",
)
output_params.add_argument(
"--layoutmode",
"-Y",
default="normal",
type=str,
help="Type of layout to use when generating html "
"{normal,exact,loose}. If normal,each line is"
" positioned separately in the html. If exact"
", each character is positioned separately in"
" the html. If loose, same result as normal "
"but with an additional newline after each "
"text line. Only used when output_type is html.",
)
output_params.add_argument(
"--scale",
"-s",
type=float,
default=1.0,
help="The amount of zoom to use when generating html file. "
"Only used when output_type is html.",
)
output_params.add_argument(
"--strip-control",
"-S",
default=False,
action="store_true",
help="Remove control statement from text. "
"Only used when output_type is xml.",
)
return parser
def parse_args(args: Optional[List[str]]) -> argparse.Namespace:
parsed_args = create_parser().parse_args(args=args)
# Propagate parsed layout parameters to LAParams object
if parsed_args.no_laparams:
parsed_args.laparams = None
else:
parsed_args.laparams = LAParams(
line_overlap=parsed_args.line_overlap,
char_margin=parsed_args.char_margin,
line_margin=parsed_args.line_margin,
word_margin=parsed_args.word_margin,
boxes_flow=parsed_args.boxes_flow,
detect_vertical=parsed_args.detect_vertical,
all_texts=parsed_args.all_texts,
)
if parsed_args.page_numbers:
parsed_args.page_numbers = {x - 1 for x in parsed_args.page_numbers}
if parsed_args.pagenos:
parsed_args.page_numbers = {int(x) - 1 for x in parsed_args.pagenos.split(",")}
if parsed_args.output_type == "text" and parsed_args.outfile != "-":
for override, alttype in OUTPUT_TYPES:
if parsed_args.outfile.endswith(override):
parsed_args.output_type = alttype
return parsed_args
def main(args: Optional[List[str]] = None) -> int:
parsed_args = parse_args(args)
outfp = extract_text(**vars(parsed_args))
outfp.close()
return 0
if __name__ == "__main__":
sys.exit(main())

8
bin/tqdm Executable file
View File

@@ -0,0 +1,8 @@
#!/home/johannes/code/transport-accessibility/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from tqdm.cli import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())