diff --git a/auth/providers/__init__.py b/auth/providers/__init__.py index 1b10234..4e96b99 100644 --- a/auth/providers/__init__.py +++ b/auth/providers/__init__.py @@ -1,9 +1,11 @@ from auth.providers.base import SSOProvider from auth.providers.kit import KITProvider +from auth.providers.tum import TUMProvider # Registry of available SSO providers PROVIDERS: dict[str, type[SSOProvider]] = { "kit": KITProvider, + "tum": TUMProvider, } diff --git a/auth/providers/tum.py b/auth/providers/tum.py new file mode 100644 index 0000000..2b3598d --- /dev/null +++ b/auth/providers/tum.py @@ -0,0 +1,79 @@ +import html,sys +from auth.providers.base import SSOProvider +from utils.helpers import extract_html_value + + +class TUMProvider(SSOProvider): + """SSO provider for Karlsruhe Institute of Technology (KIT).""" + + name = "TUM" + domain = "tum.de" + + def authenticate(self) -> str: + # Implement SAML authentication flow + # Use self.session, self.redirect_response, self.username, self.password + # Return HTML containing SAMLResponse + + # to get this done on tum we have to: + # 1. do one post to e1s1 + # 2. do post to e1s2 with login data + # -> + self.session.headers.pop('x-requested-with', None) + self.session.headers.pop('x-inertia', None) + self.session.headers.pop('x-inertia-version', None) + + csrf_token1 = extract_html_value( + self.redirect_response.text, + r'name="csrf_token" value="([^"]+)"' + ) + + response1 = self.session.post( + # 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1', + 'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1', + data={ + 'csrf_token': csrf_token1, + 'shib_idp_ls_exception.shib_idp_session_ss': '', + 'shib_idp_ls_success.shib_idp_session_ss': 'true', + 'shib_idp_ls_value.shib_idp_session_ss': '', + 'shib_idp_ls_exception.shib_idp_persistent_ss': '', + 'shib_idp_ls_success.shib_idp_persistent_ss': 'true', + 'shib_idp_ls_value.shib_idp_persistent_ss': '', + 'shib_idp_ls_supported': 'true', + '_eventId_proceed': '', + } + ) + + # print(response1.text) + csrf_token2 = extract_html_value( + response1.text, + r'name="csrf_token" value="([^"]+)"' + ) + + response2 = self.session.post( + 'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2', + data={ + 'csrf_token': csrf_token2, + 'j_username': self.username, + 'j_password': self.password, + 'donotcache': '1', + '_eventId_proceed': '', + } + ) + + # print(response2.text) + saml_response = extract_html_value( + response2.text, + r'name="SAMLResponse" value="([^"]+)"' + ) + + if len(saml_response)<3: + raise ValueError("TUM auth no work:(") + else: + print("nice we got saml response starting with: "+saml_response[0:49]) + return response2.text + # print(saml_response) + # sys.exit() + # if "/consume" not in html.unescape(response.text): + # raise ValueError("TUM authentication failed - invalid credentials or SSO error") + + # return response.text diff --git a/booking/client.py b/booking/client.py index da2f84f..fe6aff1 100644 --- a/booking/client.py +++ b/booking/client.py @@ -36,7 +36,8 @@ class BookingClient: try: resources = response.json().get('data', []) except (ValueError, JSONDecodeError): - print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}") + print(f"❌ Invalid JSON response when fetching resources: {response.text}") + # print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}") return None return resources[-1]['id'] if resources else None @@ -56,10 +57,13 @@ class BookingClient: "customer_note": "", "add_ons_by_service": {SERVICE_ID: [[]]}, "sub_bookings_by_service": {}, + "booking_quota_grant_id":"24735199", "strategy": "multi-resource" } + # data = '{"resource_id":["15994"],"service_id":{"601":1},"start_date":"2026-02-03T22:30:00+01:00","end_date":"2026-02-03T23:30:00+01:00","description":"","customer_note":"","add_ons_by_service":{"601":[[]]},"sub_bookings_by_service":{},"booking_quota_grant_id":"24735199","strategy":"multi-resource"}' ) + print("resource id: %s" % resource_id) if not booking.ok: print(f"❌ Booking failed: HTTP {booking.status_code}") return False @@ -67,7 +71,8 @@ class BookingClient: try: data = booking.json().get("data", {}) except (ValueError, JSONDecodeError): - print(f"❌ Invalid JSON response from booking request: {booking.text[:200]}") + # print(f"❌ Invalid JSON response from booking request: {booking.text[:200]}") + print(f"❌ Invalid JSON response from booking request: {booking.text}") return False oid = data.get("id") @@ -85,7 +90,8 @@ class BookingClient: try: customer = checkout.json().get("default", {}).get("customer", {}) except (ValueError, JSONDecodeError): - print(f"❌ Invalid JSON response from checkout form: {checkout.text[:200]}") + print(f"❌ Invalid JSON response from checkout form: {checkout.text}") + # print(f"❌ Invalid JSON response from checkout form: {checkout.text[:200]}") return False final = self.session.post( diff --git a/config/constants.py b/config/constants.py index edcf239..8dc9d2c 100644 --- a/config/constants.py +++ b/config/constants.py @@ -3,10 +3,13 @@ ANNY_BASE_URL = "https://anny.eu" BOOKING_API_BASE = "https://b.anny.eu/api/v1" CHECKOUT_FORM_API = "https://b.anny.eu/api/ui/checkout-form" RESOURCE_URL = f"{BOOKING_API_BASE}/resources/1-lehrbuchsammlung-eg-und-1-og/children" -SERVICE_ID = "449" +SERVICE_ID = "601" +# SERVICE_ID = "449" RESOURCE_ID = None # "5957" # Will be set dynamically if None, else use the given ID +RESOURCE_ID =15994 TIMEZONE = "Europe/Berlin" -SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/) +# SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/) +SSO_PROVIDER = "tum" # Available: kit (add more in auth/providers/) DEFAULT_HEADERS = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0', @@ -17,15 +20,19 @@ DEFAULT_HEADERS = { # Booking time slots (in order of priority) BOOKING_TIMES = [ { - 'start': '14:00:00', - 'end': '19:00:00' - }, - { - 'start': '09:00:00', - 'end': '13:00:00' - }, - { - 'start': '20:00:00', - 'end': '23:45:00' + 'start': '12:00:00', + 'end': '22:00:00' }, + # { + # 'start': '22:30:00', + # 'end': '23:30:00' + # }, + # { + # 'start': '09:00:00', + # 'end': '13:00:00' + # }, + # { + # 'start': '20:00:00', + # 'end': '23:45:00' + # }, ] \ No newline at end of file diff --git a/debug_sso_redirect.py b/debug_sso_redirect.py new file mode 100644 index 0000000..e72c7b6 --- /dev/null +++ b/debug_sso_redirect.py @@ -0,0 +1,169 @@ +import requests +import urllib.parse +import re +import html + +AUTH_BASE_URL = "https://auth.anny.eu" +DEFAULT_HEADERS = { + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0', + 'accept': 'application/vnd.api+json', + 'accept-encoding': 'plain' +} + +def extract_html_value(text, pattern): + match = re.search(pattern, text) + return match.group(1) if match else None + +def debug_sso(): + session = requests.Session() + # 1. Init Headers like AnnySession + session.headers.update({ + **DEFAULT_HEADERS, + 'accept': 'text/html, application/xhtml+xml', + 'referer': AUTH_BASE_URL + '/', + 'origin': AUTH_BASE_URL + }) + + print("--- 1. Initial Access ---") + r1 = session.get(f"{AUTH_BASE_URL}/login/sso") + + session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN']) + + page_data_match = re.search(r'data-page="(.*?)"', r1.text) + if not page_data_match: + print("Could not find data-page") + return + + page_data = page_data_match.group(1).replace('"', '"') + version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data) + x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93' + + session.headers.update({ + 'x-requested-with': 'XMLHttpRequest', + 'x-inertia': 'true', + 'x-inertia-version': x_inertia_version + }) + + print(f"\n--- 2. SSO Request (TUM) ---") + r2 = session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": "tum.de"}) + + if 'x-inertia-location' not in r2.headers: + print("No x-inertia-location header found") + print(r2.text[:500]) + return + + redirect_url = r2.headers['x-inertia-location'] + print(f"Redirect URL: {redirect_url}") + + print("\n--- 3. Following Redirect (Simulating Provider Handoff) ---") + headers_to_remove = ['x-requested-with', 'x-inertia', 'x-inertia-version'] + for h in headers_to_remove: + session.headers.pop(h, None) + print("Cleaned Anny-specific headers.") + + current_url = redirect_url + + # Loop to handle intermediate pages (e.g. cookie check) + for step in range(1, 6): + print(f"\n--- Step {step}: GET {current_url} ---") + if step == 1: + resp = session.get(current_url) + else: + # We are posting from previous step + pass + # Logic handled inside loop + + # We need to render the response from the previous action. + # But wait, the loop structure is easier if we just do "process page". + pass + + # Re-writing loop logic for clarity + resp = session.get(redirect_url) + + for step in range(1, 6): + print(f"\n--- Step {step}: Processing Page {resp.url} ---") + + # Check for login fields + if 'j_username' in resp.text: + print("FOUND: j_username field. This is the LOGIN PAGE.") + break + + # Check for error + if "Cookies" in resp.text and "disabled" in resp.text: + print("ERROR: Page says Cookies disabled.") + + # Extract form action + action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"') + if not action_url: + print("No form action found. End of flow?") + print(resp.text[:500]) + break + + if action_url.startswith('/'): + parsed = urllib.parse.urlparse(resp.url) + base = f"{parsed.scheme}://{parsed.netloc}" + action_url = base + action_url + + print(f"Form Action: {action_url}") + + # Extract inputs + inputs = re.findall(r']*name="([^"]+)"[^>]*value="([^"]*)"', resp.text) + data = {name: val for name, val in inputs} + print(f"Hidden inputs: {list(data.keys())}") + + # If it's the cookie check page, there might be specific JS that auto-submits. + # usually just posting the form works. + + if 'shib_idp_ls_success.shib_idp_session_ss' in data: + print("Detected Shibboleth LocalStorage check.") + + # Submit to move to next page + print("Submitting form to proceed...") + # Ensure csrf if not in inputs + if 'csrf_token' not in data: + csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') + if csrf: data['csrf_token'] = csrf + + # Add basic things that might be needed + data['_eventId_proceed'] = '' + + resp = session.post(action_url, data=data) + + else: + print("Max steps reached without finding login page.") + return + + # If we broke headers, we are at login page. + print("\n--- Attempting Login on Final Page ---") + action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"') + if action_url: + if action_url.startswith('/'): + parsed = urllib.parse.urlparse(resp.url) + base = f"{parsed.scheme}://{parsed.netloc}" + action_url = base + action_url + + inputs = re.findall(r']*name="([^"]+)"[^>]*value="([^"]*)"', resp.text) + data = {name: val for name, val in inputs} + + data['j_username'] = 'dummy_user' + data['j_password'] = 'dummy_pass' + data['_eventId_proceed'] = '' + data['donotcache'] = '1' + + if 'csrf_token' not in data: + csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') + if csrf: data['csrf_token'] = csrf + + print(f"Posting creds to {action_url}") + r4 = session.post(action_url, data=data) + print(f"Result Code: {r4.status_code}") + unescaped = html.unescape(r4.text) + if "Identifizierung gescheitert" in unescaped or "Authentication failed" in unescaped: + print("SUCCESS: Got expected 'Authentication failed' message.") + else: + print("Result unknown.") + print(unescaped[:500]) + + +if __name__ == "__main__": + debug_sso() diff --git a/utils/helpers.py b/utils/helpers.py index 8821547..08b321c 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -4,9 +4,9 @@ import datetime import pytz from config.constants import TIMEZONE -def get_future_datetime(days_ahead=3, hour="13:00:00"): +def get_future_datetime(days_ahead=4, hour="13:00:00"): dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(days=days_ahead) - return dt.strftime(f"%Y-%m-%dT{hour}+02:00") + return dt.strftime(f"%Y-%m-%dT{hour}+01:00") def extract_html_value(text, pattern): match = re.search(pattern, text)