builder added, removed old files

This commit is contained in:
Hadrian Burkhardt
2023-11-30 02:32:40 +01:00
committed by f4lco
parent f714fc4c79
commit 58a53e608f
10 changed files with 447 additions and 306 deletions
+316
View File
@@ -0,0 +1,316 @@
from xml.dom import minidom
from datetime import date
class Builder:
def __init__(self):
self._doc = minidom.Document()
self._om = self._doc.createElement("openmensa")
self._om.setAttribute("version", "2.1")
self._om.setAttribute("xmlns", "http://openmensa.org/open-mensa-v2")
self._om.setAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
self._om.setAttribute(
"xsi:schemaLocation",
"http://openmensa.org/open-mensa-v2 http://openmensa.org/open-mensa-v2.xsd",
)
self._version = None
self._name = None
self._address = None
self._city = None
self._phone = None
self._email = None
self._location = None
self._availability = None
self._times = None
self._feed = None
self._day = None
self._days: dict[date, dict[str, list]] = {}
@property
def version(self):
return self._version
@version.setter
def version(self, value: str):
self._version = value
@version.deleter
def version(self):
del self._version
@property
def name(self):
return self._name
@name.setter
def name(self, value: str):
self._name = value
@name.deleter
def name(self):
del self._name
@property
def address(self):
return self._address
@address.setter
def address(self, value: tuple[str, str, str]):
street_nr, zip, city = value
self._address = f"{street_nr}, {zip} {city}"
@address.deleter
def address(self):
del self._address
@property
def city(self):
return self._city
@city.setter
def city(self, value: str):
self._city = value
@city.deleter
def city(self):
del self._city
@property
def phone(self):
return self._phone
@phone.setter
def phone(self, value: str):
self._phone = value
@phone.deleter
def phone(self):
del self._phone
@property
def email(self):
return self._email
@email.setter
def email(self, value: str):
self._email = value
@email.deleter
def email(self):
del self._email
@property
def location(self):
return (self._longitude, self._latitude)
@location.setter
def location(self, value: tuple[float, float]):
self._longitude = value[0]
self._latitude = value[1]
@location.deleter
def location(self):
del self._longitude
del self._latitude
@property
def availability(self):
return self._availability
@availability.setter
def availability(self, value: str):
if value == "pulbic" or value == "restricted":
self._availability = value
else:
raise ValueError("only 'public' or 'restricted are allowed.")
@availability.deleter
def availability(self):
del self._availability
@property
def times(self):
return self._times
@times.setter
def times(self, value: dict[str, str]):
def attach_weekday(tag: str, value: str):
if value == "":
return
d = self._doc.createElement(tag)
if value == "geschlossen":
d.setAttribute("closed", "true")
else:
d.setAttribute("open", value)
self._times.appendChild(d)
weekdays = (
"monday",
"tuesday",
"wednesday",
"thursday",
"friday",
"saturday",
"sunday",
)
self._times = self._doc.createElement("times")
self._times.setAttribute("type", "opening")
for weekday in weekdays:
v = value.get(weekday)
if v:
attach_weekday(weekday, v)
@times.deleter
def times(self):
del self._times
@property
def feed(self):
return self._feed
@feed.setter
def feed(self, value: dict):
name: str = value.get("name")
priority: int = value.get("priority")
url: str = value.get("url")
source: str = value.get("source")
hour: int = value.get("hour")
dayOfMonth: int | str = value.get("dayOfMonth") if value.get("dayOfMonth") else "*"
dayOfWeek: int | str = value.get("dayOfWeek") if value.get("dayOfWeek") else "*"
month: int | str = value.get("month") if value.get("month") else "*"
minute: int = value.get("minute") if value.get("minute") else 0
retry: str = value.get("retry")
self._feed = self._doc.createElement("feed")
self._feed.setAttribute("name", name)
self._feed.setAttribute("priority", str(priority))
schedule = self._doc.createElement("schedule")
self._feed.appendChild(schedule)
schedule.setAttribute("dayOfMonth", str(dayOfMonth))
schedule.setAttribute("dayOfWeek", str(dayOfWeek))
schedule.setAttribute("month", str(month))
schedule.setAttribute("hour", str(hour))
schedule.setAttribute("minute", str(minute))
if retry:
schedule.setAttribute("retry", retry)
el = self._doc.createElement("url")
self._feed.appendChild(el)
node = self._doc.createTextNode(url)
el.appendChild(node)
el = self._doc.createElement("source")
self._feed.appendChild(el)
node = self._doc.createTextNode(source)
el.appendChild(node)
@feed.deleter
def feed(self):
del self._feed
@property
def day(self):
return self._day
@day.setter
def day(self, value):
self._day = value
@day.deleter
def day(self):
del self._day
def add_meal(
self,
date: date,
category: str,
name: str,
prices: dict[str, float],
note: str = None,
):
meal = self._doc.createElement("meal")
node = self._doc.createElement("name")
meal.appendChild(node)
data = self._doc.createTextNode(name)
node.appendChild(data)
if note:
node = self._doc.createElement("note")
meal.appendChild(node)
data = self._doc.createTextNode(note)
node.appendChild(data)
if prices["student"]:
node = self._doc.createElement("price")
meal.appendChild(node)
node.setAttribute("role", "student")
data = self._doc.createTextNode(f'{prices["student"]:.2f}')
node.appendChild(data)
if prices["employee"]:
node = self._doc.createElement("price")
meal.appendChild(node)
node.setAttribute("role", "employee")
data = self._doc.createTextNode(f'{prices["employee"]:.2f}')
node.appendChild(data)
if prices["other"]:
node = self._doc.createElement("price")
meal.appendChild(node)
node.setAttribute("role", "other")
data = self._doc.createTextNode(f'{prices["other"]:.2f}')
node.appendChild(data)
category_dict = self._days.get(date, dict())
if not category_dict:
self._days[date] = category_dict
meal_list = category_dict.get(category, list())
if not meal_list:
category_dict[category] = meal_list
meal_list.append(meal)
def __append_node(self, tag: str, value: str):
elem = self._doc.createElement(tag)
self._canteen.appendChild(elem)
node = self._doc.createTextNode(value)
elem.appendChild(node)
def toXML(self):
self._doc.appendChild(self._om)
if self.version:
self.__append_node("version", self.version)
self._canteen = self._doc.createElement("canteen")
self._om.appendChild(self._canteen)
if self.name:
self.__append_node("name", self.name)
if self.address:
self.__append_node("address", self.address)
if self.city:
self.__append_node("city", self.city)
if self.phone:
self.__append_node("phone", self.phone)
if self.email:
self.__append_node("email", self.email)
if self._longitude and self._latitude:
location = self._doc.createElement("location")
self._canteen.appendChild(location)
location.setAttribute("latitude", str(self._latitude))
location.setAttribute("longitude", str(self._longitude))
if self.availability:
self.__append_node("availability", self.availability)
if self.times:
self._canteen.appendChild(self.times)
if self.feed:
self._canteen.appendChild(self.feed)
for date, category_dict in sorted(self._days.items()):
day = self._doc.createElement("day")
self._canteen.appendChild(day)
day.setAttribute("date", str(date))
for category_name, meals in sorted(category_dict.items()):
category = self._doc.createElement("category")
day.appendChild(category)
category.setAttribute("name", category_name)
for meal in meals:
category.appendChild(meal)
return self._doc.toprettyxml(encoding="UTF-8")
-46
View File
@@ -1,46 +0,0 @@
# -*- encoding: utf-8 -*-
import json
import logging
from collections import namedtuple
import requests
MenuParams = namedtuple('MenuParams', ('canteen_id', 'chash'))
URL = 'https://www.studentenwerk-potsdam.de' + \
'/essen/unsere-mensen/detailinfos/'
LOG = logging.getLogger(__name__)
def _param_json(to_serialize):
"""Obtain JSON string of an object without whitespace on delimiters.
:param dict it: The data structure to serialize
:return: JSON string, no whitespace between separators
"""
return json.dumps(to_serialize, separators=(',', ':'))
def download_menu(menu_params):
"""Download the menu for a specific canteen.
:param MenuParams menu_params: the target canteen
"""
params = {
'tx_ddfmensa_ddfmensajson[interneid]': menu_params.canteen_id,
'type': 14529821235,
'cHash': menu_params.chash
}
body = {
'data': False
}
request = requests.post(URL, params=params, json=body, timeout=30)
# urllib3 does not log response bodies - requests no longer supports it:
# https://2.python-requests.org//en/master/api/#api-changes
LOG.debug('Response:\n>>>>>\n%s\n<<<<<', request.text)
return request.json()
-114
View File
@@ -1,114 +0,0 @@
# -*- encoding: utf-8 -*-
from pyopenmensa.feed import LazyBuilder
PRICE_ROLE_MAPPING = {
'student': 'preis_s',
'other': 'preis_g',
'employee': 'preis_m'
}
def _active_days(menu):
for container in menu['wochentage']:
day = container['datum']
active = 'angebote' in day
if active:
yield day
def _notes(offer):
result = []
for label in offer['labels']:
result.append(label['name'].capitalize())
return result
def _prices(offer):
result = {}
for role, api_role in PRICE_ROLE_MAPPING.items():
if api_role not in offer:
continue
price = offer[api_role]
# When no price is set, this can be empty dict
if isinstance(price, str) and price.strip():
result[role] = price
return result
def _process_day(builder, day):
for offer in _offers(day):
builder.addMeal(date=day['data'],
category=_category(offer),
name=offer['beschreibung'],
notes=_notes(offer),
prices=_prices(offer),
roles=None)
def _category(offer):
# 'Angebot' is just a placeholder. We cannot tell if 'Angebot' or 'Info' is
# more appropriate because offers lacking the 'titel' attribute are real
# meals or informational texts.
return offer['titel'] or 'Angebot'
def _offers(day):
offers = day['angebote']
if isinstance(offers, list):
return offers
if isinstance(offers, dict):
# allows for the following structure:
# {'-1': <garbage>, '0': first_offer, ...}
# This case is degenerate and occurs only on semi-regular basis
# as of 2020-10-20. The assumption that offers at logical index -1
# are garbage can be challenged, it is simply a result of observing
# the API responses over several months.
return [offer for index, offer in offers.items() if int(index) >= 0]
raise AssertionError(f'cannot handle offers of type {type(offers)}')
def render_menu(menu):
"""Render the menu for a canteen into an OpenMensa XML feed.
:param dict menu: the Python representation of the API JSON response
:return: the XML feed as string
"""
builder = LazyBuilder()
if menu:
for day in _active_days(menu):
_process_day(builder, day)
return builder.toXMLFeed()
def render_meta(canteen, menu_feed_url):
"""Render a OpenMensa XML meta feed for a given canteen.
:param Canteen canteen: the canteen
:param menu_feed_url: the canteen menu URL
:return: the XML meta feed as string
"""
builder = LazyBuilder()
builder.name = canteen.name
builder.address = canteen.street
builder.city = canteen.city
builder.define(name='full',
priority='0',
url=menu_feed_url,
source=None,
dayOfWeek='*',
dayOfMonth='*',
hour='8-18',
minute='0',
retry='30 1')
return builder.toXMLFeed()
+6 -5
View File
@@ -6,10 +6,11 @@ import json
class SWP_Webspeiseplan_API:
URL_BASE = "https://swp.webspeiseplan.de"
def __init__(self):
logging.basicConfig()
self.logger = logging.getLogger(__name__)
self.url_base = "https://swp.webspeiseplan.de"
self.__parse_token()
params = {
"token": self.proxy_token,
@@ -25,7 +26,7 @@ class SWP_Webspeiseplan_API:
for outlet in self.outlets.values():
params["model"] = "menu"
params["location"] = outlet["standortID"]
params["languagetype"] = 2
params["languagetype"] = 1
params["_"] = int(time.time() * 1000)
menu = self.__parse_model(params)
self.menus[outlet["name"]] = menu
@@ -37,12 +38,12 @@ class SWP_Webspeiseplan_API:
self.meal_categories[outlet["name"]] = id2cat
def __parse_token(self):
req = urllib.request.Request(self.url_base)
req = urllib.request.Request(self.URL_BASE)
with urllib.request.urlopen(req) as resp:
txt = resp.read().decode("utf-8")
match = re.findall(r"/main.[0-9a-f]+.js", txt)[0]
self.logger.debug(f"__parse_token: downloading script {match}")
req = urllib.request.Request(f"{self.url_base}{match}")
req = urllib.request.Request(f"{self.URL_BASE}{match}")
with urllib.request.urlopen(req) as resp:
txt = resp.read().decode("utf-8")
self.proxy_token = re.findall(r"PROXY_TOKEN:\"([0-9a-f]+)\"", txt)[0]
@@ -70,7 +71,7 @@ class SWP_Webspeiseplan_API:
req.add_header("X-Requested-With", "XMLHttpRequest")
def __parse_model(self, params: dict):
url = f"{self.url_base}/index.php?" + "&".join(
url = f"{self.URL_BASE}/index.php?" + "&".join(
[f"{k}={v}" for k, v in params.items()]
)
self.logger.debug(f"__parse_model: {url}")
+55 -33
View File
@@ -1,70 +1,92 @@
import logging
from pyopenmensa.feed import LazyBuilder
from datetime import datetime
from stw_potsdam.builder import Builder
from stw_potsdam.swp_webspeiseplan_api import SWP_Webspeiseplan_API
class SWP_Webspeiseplan_Parser:
def __init__(
self, menu_data: list[dict], meal_categories: list[dict], outlet_data: dict
self,
menu_data: list[dict],
meal_categories: list[dict],
outlet_data: dict,
url: str,
):
logging.basicConfig()
self.logger = logging.getLogger(__name__)
self.menu_data = menu_data
self.meal_categories = meal_categories
self.outlet_data = outlet_data
self.canteen = None
self.url = url
self._builder = Builder()
self.__parse_canteen(outlet_data)
self.__parse_feed()
self.__parse_meals()
def __parse_canteen(self, outlet: dict):
builder = LazyBuilder()
builder.name = outlet["name"]
builder.address = outlet["addressInfo"]["street"]
builder.city = (
f'{outlet["addressInfo"]["postalCode"]} {outlet["addressInfo"]["city"]}'
canteen = self._builder
canteen.name = outlet["name"]
canteen.address = (
outlet["addressInfo"]["street"],
outlet["addressInfo"]["postalCode"],
outlet["addressInfo"]["city"],
)
builder.phone = outlet["contactInfo"][0]["phone"]
builder.email = outlet["contactInfo"][0]["email"]
canteen.city = outlet["addressInfo"]["city"]
canteen.phone = outlet["contactInfo"][0]["phone"]
canteen.email = outlet["contactInfo"][0]["email"]
if outlet["positionInfo"]:
builder.location(
str(outlet["positionInfo"]["longitude"]),
str(outlet["positionInfo"]["latitude"]),
canteen.location = (
outlet["positionInfo"]["longitude"],
outlet["positionInfo"]["latitude"],
)
builder.availability = f"Montag: {outlet['moZeit1']}, {outlet['moZeit2']}\n"
builder.availability += f"Dienstag: {outlet['diZeit1']}, {outlet['diZeit2']}\n"
builder.availability += f"Mittwoch: {outlet['miZeit1']}, {outlet['miZeit2']}\n"
builder.availability += (
f"Donnerstag: {outlet['doZeit1']}, {outlet['doZeit2']}\n"
)
builder.availability += f"Freitag: {outlet['frZeit1']}, {outlet['frZeit2']}\n"
builder.availability += f"Samstag: {outlet['saZeit1']}, {outlet['saZeit2']}\n"
builder.availability += f"Sonntag: {outlet['soZeit1']}, {outlet['soZeit2']}"
builder.availability = (
builder.availability.replace("None, None", "")
.replace("None,", "")
.replace(", None", "")
)
# TODO: availability via locations isPublic
self.canteen = builder
times = {
"monday": f"{outlet['moZeit1']}, {outlet['moZeit2']}",
"tuesday": f"{outlet['diZeit1']}, {outlet['diZeit2']}",
"wednesday": f"{outlet['miZeit1']}, {outlet['miZeit2']}",
"thursday": f"{outlet['doZeit1']}, {outlet['doZeit2']}",
"friday": f"{outlet['frZeit1']}, {outlet['frZeit2']}",
"saturday": f"{outlet['saZeit1']}, {outlet['saZeit2']}",
"sunday": f"{outlet['soZeit1']}, {outlet['soZeit2']}",
}
times = {
k: v.replace("None, None", "").replace("None,", "").replace(", None", "")
for k, v in times.items()
}
canteen.times = times
def __parse_feed(self):
feed = {
"name": "full",
"priority": 0,
"hour": "8-14",
"retry": "30 1",
"url": self.url,
"source": SWP_Webspeiseplan_API.URL_BASE,
}
self._builder.feed = feed
def __parse_meals(self):
for menu in self.menu_data:
for meal in menu["speiseplanGerichtData"]:
info = meal["speiseplanAdvancedGericht"]
date = datetime.fromisoformat(info["datum"]).date()
additional_info = meal["zusatzinformationen"]
self.canteen.addMeal(
self._builder.add_meal(
date=date,
category=self.meal_categories[info["gerichtkategorieID"]]["name"],
name=info["gerichtname"],
prices={
"employee": f'{additional_info["mitarbeiterpreisDecimal2"]:.2f}',
"other": f'{additional_info["gaestepreisDecimal2"]:.2f}',
"student": additional_info["mitarbeiterpreisDecimal2"],
"employee": additional_info["price3Decimal2"],
"other": additional_info["gaestepreisDecimal2"],
},
)
@property
def xml_feed(self):
return self.canteen.toXMLFeed()
return self._builder.toXML()
+29 -62
View File
@@ -8,9 +8,7 @@ import cachetools as ct
from flask import Flask, jsonify, make_response, url_for
from flask.logging import create_logger
from stw_potsdam import feed
from stw_potsdam.config import read_canteen_config
from stw_potsdam.canteen_api import MenuParams, download_menu
from stw_potsdam.swp_webspeiseplan_api import SWP_Webspeiseplan_API
from stw_potsdam.swp_webspeiseplan_parser import SWP_Webspeiseplan_Parser
@@ -23,94 +21,63 @@ app.url_map.strict_slashes = False
log = create_logger(app)
if 'BASE_URL' in os.environ: # pragma: no cover
base_url = urllib.parse.urlparse(os.environ.get('BASE_URL'))
if "BASE_URL" in os.environ: # pragma: no cover
base_url = urllib.parse.urlparse(os.environ.get("BASE_URL"))
if base_url.scheme:
app.config['PREFERRED_URL_SCHEME'] = base_url.scheme
app.config["PREFERRED_URL_SCHEME"] = base_url.scheme
if base_url.netloc:
app.config['SERVER_NAME'] = base_url.netloc
app.config["SERVER_NAME"] = base_url.netloc
if base_url.path:
app.config['APPLICATION_ROOT'] = base_url.path
app.config["APPLICATION_ROOT"] = base_url.path
cache = ct.TTLCache(maxsize=30, ttl=CACHE_TIMEOUT)
swp_api = SWP_Webspeiseplan_API()
def canteen_not_found(config, canteen_name):
log.warning('Canteen %s not found', canteen_name)
configured = ', '.join(f"'{c}'" for c in config.keys())
log.warning("Canteen %s not found", canteen_name)
configured = ", ".join(f"'{c}'" for c in config.keys())
message = f"Canteen '{canteen_name}' not found, available: {configured}"
return make_response(message, 404)
def _menu_params(canteen):
return MenuParams(canteen_id=canteen.id, chash=canteen.chash)
@ct.cached(cache=cache)
def get_menu():
log.debug("Downloading menu for SWP")
return SWP_Webspeiseplan_API()
@ct.cached(cache=cache, key=_menu_params)
def get_menu(canteen):
log.info('Downloading menu for %s', canteen)
params = _menu_params(canteen)
return download_menu(params)
def _canteen_feed_xml(xml):
response = make_response(xml)
response.mimetype = 'text/xml'
return response
def canteen_menu_feed_xml(menu):
xml = feed.render_menu(menu)
return _canteen_feed_xml(xml)
def canteen_meta_feed_xml(canteen):
menu_feed_url = url_for('canteen_menu_feed',
canteen_name=canteen.key,
_external=True)
xml = feed.render_meta(canteen, menu_feed_url)
return _canteen_feed_xml(xml)
@app.route('/canteens/<canteen_name>')
@app.route('/canteens/<canteen_name>/meta')
def canteen_meta_feed(canteen_name):
config = read_canteen_config()
if canteen_name not in config:
return canteen_not_found(config, canteen_name)
canteen = config[canteen_name]
return canteen_meta_feed_xml(canteen)
@app.route('/canteens/<canteen_name>/menu')
def canteen_menu_feed(canteen_name):
@app.route("/canteens/<canteen_name>")
@app.route("/canteens/<canteen_name>/xml")
def canteen_xml_feed(canteen_name):
config = read_canteen_config()
if canteen_name not in config:
return canteen_not_found(config, canteen_name)
canteen = config[canteen_name]
swp_api = get_menu()
swp_parser = SWP_Webspeiseplan_Parser(
swp_api.menus[canteen.name],
swp_api.meal_categories[canteen.name],
swp_api.outlets[canteen.name],
url_for("canteen_xml_feed", canteen_name=canteen.key, _external=True),
)
return _canteen_feed_xml(swp_parser.xml_feed)
xml = swp_parser.xml_feed.decode()
response = make_response(xml)
response.mimetype = 'text/xml'
return response
@app.route('/')
@app.route('/canteens')
@app.route("/")
@app.route("/canteens")
def canteen_index():
config = read_canteen_config()
return jsonify({
key: url_for('canteen_meta_feed', canteen_name=key, _external=True)
for key in config
})
return jsonify(
{
key: url_for("canteen_xml_feed", canteen_name=key, _external=True)
for key in config
}
)
@app.route('/health_check')
@app.route("/health_check")
def health_check():
return make_response("OK", 200)