From 8ad5f1964ef4353ead6d9e40f3f9823f7ead801e Mon Sep 17 00:00:00 2001 From: i Date: Fri, 6 Feb 2026 17:42:40 +0100 Subject: [PATCH] only formatting hopefully --- auth/providers/kit.py | 27 ++++---- auth/providers/tum.py | 56 +++++++-------- auth/session.py | 63 ++++++++++------- booking/client.py | 80 +++++++++++++--------- config/constants.py | 33 ++++----- debug_sso_redirect.py | 155 +++++++++++++++++++++++------------------- main.py | 16 +++-- resource_id_helper.py | 3 +- utils/helpers.py | 6 +- 9 files changed, 241 insertions(+), 198 deletions(-) diff --git a/auth/providers/kit.py b/auth/providers/kit.py index 71f06bb..f98c02b 100644 --- a/auth/providers/kit.py +++ b/auth/providers/kit.py @@ -10,27 +10,28 @@ class KITProvider(SSOProvider): domain = "kit.edu" def authenticate(self) -> str: - self.session.headers.pop('x-requested-with', None) - self.session.headers.pop('x-inertia', None) - self.session.headers.pop('x-inertia-version', None) + self.session.headers.pop("x-requested-with", None) + self.session.headers.pop("x-inertia", None) + self.session.headers.pop("x-inertia-version", None) csrf_token = extract_html_value( - self.redirect_response.text, - r'name="csrf_token" value="([^"]+)"' + self.redirect_response.text, r'name="csrf_token" value="([^"]+)"' ) response = self.session.post( - 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1', + "https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1", data={ - 'csrf_token': csrf_token, - 'j_username': self.username, - 'j_password': self.password, - '_eventId_proceed': '', - 'fudis_web_authn_assertion_input': '', - } + "csrf_token": csrf_token, + "j_username": self.username, + "j_password": self.password, + "_eventId_proceed": "", + "fudis_web_authn_assertion_input": "", + }, ) if "/consume" not in html.unescape(response.text): - raise ValueError("KIT authentication failed - invalid credentials or SSO error") + raise ValueError( + "KIT authentication failed - invalid credentials or SSO error" + ) return response.text diff --git a/auth/providers/tum.py b/auth/providers/tum.py index 2b3598d..a80228d 100644 --- a/auth/providers/tum.py +++ b/auth/providers/tum.py @@ -1,4 +1,3 @@ -import html,sys from auth.providers.base import SSOProvider from utils.helpers import extract_html_value @@ -18,58 +17,55 @@ class TUMProvider(SSOProvider): # 1. do one post to e1s1 # 2. do post to e1s2 with login data # -> - self.session.headers.pop('x-requested-with', None) - self.session.headers.pop('x-inertia', None) - self.session.headers.pop('x-inertia-version', None) + self.session.headers.pop("x-requested-with", None) + self.session.headers.pop("x-inertia", None) + self.session.headers.pop("x-inertia-version", None) csrf_token1 = extract_html_value( - self.redirect_response.text, - r'name="csrf_token" value="([^"]+)"' + self.redirect_response.text, r'name="csrf_token" value="([^"]+)"' ) response1 = self.session.post( # 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1', - 'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1', + "https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1", data={ - 'csrf_token': csrf_token1, - 'shib_idp_ls_exception.shib_idp_session_ss': '', - 'shib_idp_ls_success.shib_idp_session_ss': 'true', - 'shib_idp_ls_value.shib_idp_session_ss': '', - 'shib_idp_ls_exception.shib_idp_persistent_ss': '', - 'shib_idp_ls_success.shib_idp_persistent_ss': 'true', - 'shib_idp_ls_value.shib_idp_persistent_ss': '', - 'shib_idp_ls_supported': 'true', - '_eventId_proceed': '', - } + "csrf_token": csrf_token1, + "shib_idp_ls_exception.shib_idp_session_ss": "", + "shib_idp_ls_success.shib_idp_session_ss": "true", + "shib_idp_ls_value.shib_idp_session_ss": "", + "shib_idp_ls_exception.shib_idp_persistent_ss": "", + "shib_idp_ls_success.shib_idp_persistent_ss": "true", + "shib_idp_ls_value.shib_idp_persistent_ss": "", + "shib_idp_ls_supported": "true", + "_eventId_proceed": "", + }, ) # print(response1.text) csrf_token2 = extract_html_value( - response1.text, - r'name="csrf_token" value="([^"]+)"' + response1.text, r'name="csrf_token" value="([^"]+)"' ) response2 = self.session.post( - 'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2', + "https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2", data={ - 'csrf_token': csrf_token2, - 'j_username': self.username, - 'j_password': self.password, - 'donotcache': '1', - '_eventId_proceed': '', - } + "csrf_token": csrf_token2, + "j_username": self.username, + "j_password": self.password, + "donotcache": "1", + "_eventId_proceed": "", + }, ) # print(response2.text) saml_response = extract_html_value( - response2.text, - r'name="SAMLResponse" value="([^"]+)"' + response2.text, r'name="SAMLResponse" value="([^"]+)"' ) - if len(saml_response)<3: + if len(saml_response) < 3: raise ValueError("TUM auth no work:(") else: - print("nice we got saml response starting with: "+saml_response[0:49]) + print("nice we got saml response starting with: " + saml_response[0:49]) return response2.text # print(saml_response) # sys.exit() diff --git a/auth/session.py b/auth/session.py index 38dd2ea..77d3b57 100644 --- a/auth/session.py +++ b/auth/session.py @@ -35,29 +35,39 @@ class AnnySession: return None def _init_headers(self): - self.session.headers.update({ - **DEFAULT_HEADERS, - 'accept': 'text/html, application/xhtml+xml', - 'referer': AUTH_BASE_URL + '/', - 'origin': AUTH_BASE_URL - }) + self.session.headers.update( + { + **DEFAULT_HEADERS, + "accept": "text/html, application/xhtml+xml", + "referer": AUTH_BASE_URL + "/", + "origin": AUTH_BASE_URL, + } + ) def _sso_login(self): r1 = self.session.get(f"{AUTH_BASE_URL}/login/sso") - self.session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN']) + self.session.headers["X-XSRF-TOKEN"] = urllib.parse.unquote( + r1.cookies["XSRF-TOKEN"] + ) page_data = extract_html_value(r1.text, r'data-page="(.*?)"') version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data) - x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93' + x_inertia_version = ( + version.group(1) if version else "66b32acea13402d3aef4488ccd239c93" + ) - self.session.headers.update({ - 'x-requested-with': 'XMLHttpRequest', - 'x-inertia': 'true', - 'x-inertia-version': x_inertia_version - }) + self.session.headers.update( + { + "x-requested-with": "XMLHttpRequest", + "x-inertia": "true", + "x-inertia-version": x_inertia_version, + } + ) - r2 = self.session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": self.provider.domain}) - redirect_url = r2.headers['x-inertia-location'] + r2 = self.session.post( + f"{AUTH_BASE_URL}/login/sso", json={"domain": self.provider.domain} + ) + redirect_url = r2.headers["x-inertia-location"] redirect_response = self.session.get(redirect_url) # Pass session and redirect response to provider @@ -69,13 +79,20 @@ class AnnySession: self.saml_response_html = self.provider.authenticate() def _consume_saml(self): - consume_url = extract_html_value(self.saml_response_html, r'form action="([^"]+)"') - relay_state = extract_html_value(self.saml_response_html, r'name="RelayState" value="([^"]+)"') - saml_response = extract_html_value(self.saml_response_html, r'name="SAMLResponse" value="([^"]+)"') + consume_url = extract_html_value( + self.saml_response_html, r'form action="([^"]+)"' + ) + relay_state = extract_html_value( + self.saml_response_html, r'name="RelayState" value="([^"]+)"' + ) + saml_response = extract_html_value( + self.saml_response_html, r'name="SAMLResponse" value="([^"]+)"' + ) - self.session.post(consume_url, data={ - 'RelayState': relay_state, - 'SAMLResponse': saml_response - }) + self.session.post( + consume_url, data={"RelayState": relay_state, "SAMLResponse": saml_response} + ) - self.session.get(f"{ANNY_BASE_URL}/en-us/login?target=/en-us/home?withoutIntent=true") + self.session.get( + f"{ANNY_BASE_URL}/en-us/login?target=/en-us/home?withoutIntent=true" + ) diff --git a/booking/client.py b/booking/client.py index 42b955e..424667c 100644 --- a/booking/client.py +++ b/booking/client.py @@ -1,45 +1,57 @@ import requests from requests.exceptions import JSONDecodeError -from config.constants import RESOURCE_URL, BOOKING_API_BASE, CHECKOUT_FORM_API, ANNY_BASE_URL, SERVICE_ID +from config.constants import ( + RESOURCE_URL, + BOOKING_API_BASE, + CHECKOUT_FORM_API, + ANNY_BASE_URL, + SERVICE_ID, +) + class BookingClient: def __init__(self, cookies): self.session = requests.Session() self.session.cookies = cookies - self.token = cookies.get('anny_shop_jwt') + self.token = cookies.get("anny_shop_jwt") - self.session.headers.update({ - 'authorization': f'Bearer {self.token}', - 'content-type': 'application/vnd.api+json', - 'origin': ANNY_BASE_URL, - 'referer': ANNY_BASE_URL + '/', - 'user-agent': 'Mozilla/5.0' - }) + self.session.headers.update( + { + "authorization": f"Bearer {self.token}", + "content-type": "application/vnd.api+json", + "origin": ANNY_BASE_URL, + "referer": ANNY_BASE_URL + "/", + "user-agent": "Mozilla/5.0", + } + ) def find_available_resource(self, start, end): - response = self.session.get(RESOURCE_URL, params={ - 'page[number]': 1, - 'page[size]': 250, - 'filter[available_from]': start, - 'filter[available_to]': end, - 'filter[availability_exact_match]': 1, - 'filter[exclude_hidden]': 0, - 'filter[exclude_child_resources]': 0, - 'filter[availability_service_id]': int(SERVICE_ID), - 'filter[include_unavailable]': 0, - 'filter[pre_order_ids]': '', - 'sort': 'name' - }) + response = self.session.get( + RESOURCE_URL, + params={ + "page[number]": 1, + "page[size]": 250, + "filter[available_from]": start, + "filter[available_to]": end, + "filter[availability_exact_match]": 1, + "filter[exclude_hidden]": 0, + "filter[exclude_child_resources]": 0, + "filter[availability_service_id]": int(SERVICE_ID), + "filter[include_unavailable]": 0, + "filter[pre_order_ids]": "", + "sort": "name", + }, + ) if not response.ok: print(f"❌ Failed to fetch resources: HTTP {response.status_code}") return None try: - resources = response.json().get('data', []) + resources = response.json().get("data", []) except (ValueError, JSONDecodeError): print(f"❌ Invalid JSON response when fetching resources: {response.text}") # print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}") return None - return resources[-1]['id'] if resources else None + return resources[-1]["id"] if resources else None def reserve(self, resource_id, start, end): print(start) @@ -47,8 +59,8 @@ class BookingClient: booking = self.session.post( f"{BOOKING_API_BASE}/order/bookings", params={ - 'stateless': '1', - 'include': 'customer,voucher,bookings.booking_add_ons.add_on.cover_image,bookings.sub_bookings.resource,bookings.sub_bookings.service,bookings.customer,bookings.service.custom_forms.custom_fields,bookings.service.add_ons.cover_image,bookings.service.add_ons.group,bookings.cancellation_policy,bookings.resource.cover_image,bookings.resource.parent,bookings.resource.location,bookings.resource.category,bookings.reminders,bookings.booking_series,bookings.sequenced_bookings.resource,bookings.sequenced_bookings.service,bookings.sequenced_bookings.service.add_ons.cover_image,bookings.sequenced_bookings.service.add_ons.group,bookings.booking_participants,sub_orders.bookings,sub_orders.organization.legal_documents' + "stateless": "1", + "include": "customer,voucher,bookings.booking_add_ons.add_on.cover_image,bookings.sub_bookings.resource,bookings.sub_bookings.service,bookings.customer,bookings.service.custom_forms.custom_fields,bookings.service.add_ons.cover_image,bookings.service.add_ons.group,bookings.cancellation_policy,bookings.resource.cover_image,bookings.resource.parent,bookings.resource.location,bookings.resource.category,bookings.reminders,bookings.booking_series,bookings.sequenced_bookings.resource,bookings.sequenced_bookings.service,bookings.sequenced_bookings.service.add_ons.cover_image,bookings.sequenced_bookings.service.add_ons.group,bookings.booking_participants,sub_orders.bookings,sub_orders.organization.legal_documents", }, json={ "resource_id": [resource_id], @@ -60,9 +72,9 @@ class BookingClient: "add_ons_by_service": {SERVICE_ID: [[]]}, "sub_bookings_by_service": {}, # "booking_quota_grant_id":"24735199", - "booking_quota_grant_id":"24735202", - "strategy": "multi-resource" - } + "booking_quota_grant_id": "24735202", + "strategy": "multi-resource", + }, # data = '{"resource_id":["15994"],"service_id":{"601":1},"start_date":"2026-02-03T22:30:00+01:00","end_date":"2026-02-03T23:30:00+01:00","description":"","customer_note":"","add_ons_by_service":{"601":[[]]},"sub_bookings_by_service":{},"booking_quota_grant_id":"24735199","strategy":"multi-resource"}' ) @@ -85,7 +97,9 @@ class BookingClient: print("❌ Missing booking ID or access token in response") return False - checkout = self.session.get(f"{CHECKOUT_FORM_API}?oid={oid}&oat={oat}&stateless=1") + checkout = self.session.get( + f"{CHECKOUT_FORM_API}?oid={oid}&oat={oat}&stateless=1" + ) if not checkout.ok: print(f"❌ Checkout form failed: HTTP {checkout.status_code}") return False @@ -103,14 +117,14 @@ class BookingClient: "customer": { "given_name": customer.get("given_name"), "family_name": customer.get("family_name"), - "email": customer.get("email") + "email": customer.get("email"), }, "accept_terms": True, "payment_method": "", "success_url": f"{ANNY_BASE_URL}/checkout/success?oids={oid}&oats={oat}", "cancel_url": f"{ANNY_BASE_URL}/checkout?step=checkout&childResource={resource_id}", - "meta": {"timezone": "Europe/Berlin"} - } + "meta": {"timezone": "Europe/Berlin"}, + }, ) if final.ok: diff --git a/config/constants.py b/config/constants.py index 945bb83..75bf241 100644 --- a/config/constants.py +++ b/config/constants.py @@ -10,16 +10,16 @@ SERVICE_ID = "601" # RESOURCE_ID =15994 # 91 height adjustable desk # RESOURCE_ID =16402 # 222 study room # RESOURCE_ID =15883 #66 second best -RESOURCE_ID =15502 # 17 cool snipe with view +RESOURCE_ID = 15502 # 17 cool snipe with view TIMEZONE = "Europe/Berlin" # SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/) SSO_PROVIDER = "tum" # Available: kit (add more in auth/providers/) DEFAULT_HEADERS = { - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0', - 'accept': 'application/vnd.api+json', - 'accept-encoding': 'plain' + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0", + "accept": "application/vnd.api+json", + "accept-encoding": "plain", } # Booking time slots (in order of priority) @@ -39,28 +39,22 @@ DEFAULT_HEADERS = { BOOKING_TIMES = [ # SNIPE -# { -# 'start': '13:00:00', -# 'end': '18:00:00' -# }, + # { + # 'start': '13:00:00', + # 'end': '18:00:00' + # }, # BEST -# { -# 'start': '12:00:00', -# 'end': '22:00:00' -# }, + # { + # 'start': '12:00:00', + # 'end': '22:00:00' + # }, # OPTIMAL # 40/7=5.7 - { - 'start': '12:00:00', - 'end': '17:30:00' - }, - - + {"start": "12:00:00", "end": "17:30:00"}, # { # 'start': '12:00:00', # 'end': '22:00:00' # }, - # { # 'start': '22:30:00', # 'end': '23:30:00' @@ -73,3 +67,4 @@ BOOKING_TIMES = [ # 'start': '20:00:00', # 'end': '23:45:00' # }, +] diff --git a/debug_sso_redirect.py b/debug_sso_redirect.py index e72c7b6..4acb275 100644 --- a/debug_sso_redirect.py +++ b/debug_sso_redirect.py @@ -5,64 +5,72 @@ import html AUTH_BASE_URL = "https://auth.anny.eu" DEFAULT_HEADERS = { - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0', - 'accept': 'application/vnd.api+json', - 'accept-encoding': 'plain' + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0", + "accept": "application/vnd.api+json", + "accept-encoding": "plain", } + def extract_html_value(text, pattern): match = re.search(pattern, text) return match.group(1) if match else None + def debug_sso(): session = requests.Session() # 1. Init Headers like AnnySession - session.headers.update({ - **DEFAULT_HEADERS, - 'accept': 'text/html, application/xhtml+xml', - 'referer': AUTH_BASE_URL + '/', - 'origin': AUTH_BASE_URL - }) + session.headers.update( + { + **DEFAULT_HEADERS, + "accept": "text/html, application/xhtml+xml", + "referer": AUTH_BASE_URL + "/", + "origin": AUTH_BASE_URL, + } + ) print("--- 1. Initial Access ---") r1 = session.get(f"{AUTH_BASE_URL}/login/sso") - - session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN']) - + + session.headers["X-XSRF-TOKEN"] = urllib.parse.unquote(r1.cookies["XSRF-TOKEN"]) + page_data_match = re.search(r'data-page="(.*?)"', r1.text) if not page_data_match: print("Could not find data-page") return - - page_data = page_data_match.group(1).replace('"', '"') + + page_data = page_data_match.group(1).replace(""", '"') version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data) - x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93' - - session.headers.update({ - 'x-requested-with': 'XMLHttpRequest', - 'x-inertia': 'true', - 'x-inertia-version': x_inertia_version - }) - - print(f"\n--- 2. SSO Request (TUM) ---") + x_inertia_version = ( + version.group(1) if version else "66b32acea13402d3aef4488ccd239c93" + ) + + session.headers.update( + { + "x-requested-with": "XMLHttpRequest", + "x-inertia": "true", + "x-inertia-version": x_inertia_version, + } + ) + + print("\n--- 2. SSO Request (TUM) ---") r2 = session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": "tum.de"}) - - if 'x-inertia-location' not in r2.headers: + + if "x-inertia-location" not in r2.headers: print("No x-inertia-location header found") print(r2.text[:500]) return - redirect_url = r2.headers['x-inertia-location'] + redirect_url = r2.headers["x-inertia-location"] print(f"Redirect URL: {redirect_url}") - + print("\n--- 3. Following Redirect (Simulating Provider Handoff) ---") - headers_to_remove = ['x-requested-with', 'x-inertia', 'x-inertia-version'] + headers_to_remove = ["x-requested-with", "x-inertia", "x-inertia-version"] for h in headers_to_remove: session.headers.pop(h, None) print("Cleaned Anny-specific headers.") current_url = redirect_url - + # Loop to handle intermediate pages (e.g. cookie check) for step in range(1, 6): print(f"\n--- Step {step}: GET {current_url} ---") @@ -70,65 +78,66 @@ def debug_sso(): resp = session.get(current_url) else: # We are posting from previous step - pass + pass # Logic handled inside loop - - # We need to render the response from the previous action. + + # We need to render the response from the previous action. # But wait, the loop structure is easier if we just do "process page". - pass + pass # Re-writing loop logic for clarity resp = session.get(redirect_url) - + for step in range(1, 6): print(f"\n--- Step {step}: Processing Page {resp.url} ---") - + # Check for login fields - if 'j_username' in resp.text: + if "j_username" in resp.text: print("FOUND: j_username field. This is the LOGIN PAGE.") break - + # Check for error if "Cookies" in resp.text and "disabled" in resp.text: print("ERROR: Page says Cookies disabled.") - + # Extract form action action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"') if not action_url: print("No form action found. End of flow?") print(resp.text[:500]) break - - if action_url.startswith('/'): + + if action_url.startswith("/"): parsed = urllib.parse.urlparse(resp.url) base = f"{parsed.scheme}://{parsed.netloc}" action_url = base + action_url - + print(f"Form Action: {action_url}") - + # Extract inputs inputs = re.findall(r']*name="([^"]+)"[^>]*value="([^"]*)"', resp.text) data = {name: val for name, val in inputs} print(f"Hidden inputs: {list(data.keys())}") - + # If it's the cookie check page, there might be specific JS that auto-submits. # usually just posting the form works. - - if 'shib_idp_ls_success.shib_idp_session_ss' in data: + + if "shib_idp_ls_success.shib_idp_session_ss" in data: print("Detected Shibboleth LocalStorage check.") - + # Submit to move to next page print("Submitting form to proceed...") # Ensure csrf if not in inputs - if 'csrf_token' not in data: - csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') - if csrf: data['csrf_token'] = csrf - + if "csrf_token" not in data: + csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') + if csrf: + data["csrf_token"] = csrf + # Add basic things that might be needed - data['_eventId_proceed'] = '' - + data["_eventId_proceed"] = "" + resp = session.post(action_url, data=data) - + else: print("Max steps reached without finding login page.") return @@ -137,32 +146,36 @@ def debug_sso(): print("\n--- Attempting Login on Final Page ---") action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"') if action_url: - if action_url.startswith('/'): - parsed = urllib.parse.urlparse(resp.url) - base = f"{parsed.scheme}://{parsed.netloc}" - action_url = base + action_url - + if action_url.startswith("/"): + parsed = urllib.parse.urlparse(resp.url) + base = f"{parsed.scheme}://{parsed.netloc}" + action_url = base + action_url + inputs = re.findall(r']*name="([^"]+)"[^>]*value="([^"]*)"', resp.text) data = {name: val for name, val in inputs} - - data['j_username'] = 'dummy_user' - data['j_password'] = 'dummy_pass' - data['_eventId_proceed'] = '' - data['donotcache'] = '1' - - if 'csrf_token' not in data: - csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') - if csrf: data['csrf_token'] = csrf - + + data["j_username"] = "dummy_user" + data["j_password"] = "dummy_pass" + data["_eventId_proceed"] = "" + data["donotcache"] = "1" + + if "csrf_token" not in data: + csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') + if csrf: + data["csrf_token"] = csrf + print(f"Posting creds to {action_url}") r4 = session.post(action_url, data=data) print(f"Result Code: {r4.status_code}") unescaped = html.unescape(r4.text) - if "Identifizierung gescheitert" in unescaped or "Authentication failed" in unescaped: + if ( + "Identifizierung gescheitert" in unescaped + or "Authentication failed" in unescaped + ): print("SUCCESS: Got expected 'Authentication failed' message.") else: - print("Result unknown.") - print(unescaped[:500]) + print("Result unknown.") + print(unescaped[:500]) if __name__ == "__main__": diff --git a/main.py b/main.py index dc37466..e21798b 100644 --- a/main.py +++ b/main.py @@ -9,8 +9,9 @@ from utils.helpers import get_future_datetime import pytz from config.constants import RESOURCE_ID, TIMEZONE, SSO_PROVIDER, BOOKING_TIMES + def main(): - load_dotenv('.env', override=True) + load_dotenv(".env", override=True) username = os.getenv("USERNAME") password = os.getenv("PASSWORD") @@ -30,20 +31,22 @@ def main(): # Only wait for midnight if within 10 minutes, otherwise execute immediately now = datetime.datetime.now(tz) - midnight = (now + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + midnight = (now + datetime.timedelta(days=1)).replace( + hour=0, minute=0, second=0, microsecond=0 + ) seconds_until_midnight = (midnight - now).total_seconds() max_wait_seconds = 10 * 60 # 10 minutes if 0 < seconds_until_midnight <= max_wait_seconds: - print(f"⏳ Waiting {seconds_until_midnight:.0f} seconds until midnight...") + print("⏳ Waiting {seconds_until_midnight:.0f} seconds until midnight...") time.sleep(seconds_until_midnight) elif seconds_until_midnight > max_wait_seconds: - print(f"⚡ More than 10 min until midnight, executing immediately...") + print("⚡ More than 10 min until midnight, executing immediately...") for time_ in BOOKING_TIMES: try: - start = get_future_datetime(hour=time_['start']) - end = get_future_datetime(hour=time_['end']) + start = get_future_datetime(hour=time_["start"]) + end = get_future_datetime(hour=time_["end"]) if RESOURCE_ID: resource_id = RESOURCE_ID @@ -57,5 +60,6 @@ def main(): except Exception as e: print(f"❌ Error booking slot {time_['start']}-{time_['end']}: {e}") + if __name__ == "__main__": main() diff --git a/resource_id_helper.py b/resource_id_helper.py index d05caaf..f8e9055 100644 --- a/resource_id_helper.py +++ b/resource_id_helper.py @@ -4,6 +4,5 @@ with open("nice.json", "r", encoding="utf-8") as f: data = json.load(f) for i in data["data"]: - print ("%-32s id: %s" % (i["attributes"]["name"], i["id"])) + print("%-32s id: %s" % (i["attributes"]["name"], i["id"])) print(type(data)) # usually - diff --git a/utils/helpers.py b/utils/helpers.py index 08b321c..5fd5ad1 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -4,10 +4,14 @@ import datetime import pytz from config.constants import TIMEZONE + def get_future_datetime(days_ahead=4, hour="13:00:00"): - dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(days=days_ahead) + dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta( + days=days_ahead + ) return dt.strftime(f"%Y-%m-%dT{hour}+01:00") + def extract_html_value(text, pattern): match = re.search(pattern, text) if not match: