This commit is contained in:
Fyodor Alyokhin 2026-02-02 13:21:04 +01:00
parent 22e996640d
commit 75c374dedf
6 changed files with 280 additions and 17 deletions

View file

@ -1,9 +1,11 @@
from auth.providers.base import SSOProvider from auth.providers.base import SSOProvider
from auth.providers.kit import KITProvider from auth.providers.kit import KITProvider
from auth.providers.tum import TUMProvider
# Registry of available SSO providers # Registry of available SSO providers
PROVIDERS: dict[str, type[SSOProvider]] = { PROVIDERS: dict[str, type[SSOProvider]] = {
"kit": KITProvider, "kit": KITProvider,
"tum": TUMProvider,
} }

79
auth/providers/tum.py Normal file
View file

@ -0,0 +1,79 @@
import html,sys
from auth.providers.base import SSOProvider
from utils.helpers import extract_html_value
class TUMProvider(SSOProvider):
"""SSO provider for Karlsruhe Institute of Technology (KIT)."""
name = "TUM"
domain = "tum.de"
def authenticate(self) -> str:
# Implement SAML authentication flow
# Use self.session, self.redirect_response, self.username, self.password
# Return HTML containing SAMLResponse
# to get this done on tum we have to:
# 1. do one post to e1s1
# 2. do post to e1s2 with login data
# ->
self.session.headers.pop('x-requested-with', None)
self.session.headers.pop('x-inertia', None)
self.session.headers.pop('x-inertia-version', None)
csrf_token1 = extract_html_value(
self.redirect_response.text,
r'name="csrf_token" value="([^"]+)"'
)
response1 = self.session.post(
# 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1',
'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1',
data={
'csrf_token': csrf_token1,
'shib_idp_ls_exception.shib_idp_session_ss': '',
'shib_idp_ls_success.shib_idp_session_ss': 'true',
'shib_idp_ls_value.shib_idp_session_ss': '',
'shib_idp_ls_exception.shib_idp_persistent_ss': '',
'shib_idp_ls_success.shib_idp_persistent_ss': 'true',
'shib_idp_ls_value.shib_idp_persistent_ss': '',
'shib_idp_ls_supported': 'true',
'_eventId_proceed': '',
}
)
# print(response1.text)
csrf_token2 = extract_html_value(
response1.text,
r'name="csrf_token" value="([^"]+)"'
)
response2 = self.session.post(
'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2',
data={
'csrf_token': csrf_token2,
'j_username': self.username,
'j_password': self.password,
'donotcache': '1',
'_eventId_proceed': '',
}
)
# print(response2.text)
saml_response = extract_html_value(
response2.text,
r'name="SAMLResponse" value="([^"]+)"'
)
if len(saml_response)<3:
raise ValueError("TUM auth no work:(")
else:
print("nice we got saml response starting with: "+saml_response[0:49])
return response2.text
# print(saml_response)
# sys.exit()
# if "/consume" not in html.unescape(response.text):
# raise ValueError("TUM authentication failed - invalid credentials or SSO error")
# return response.text

View file

@ -36,7 +36,8 @@ class BookingClient:
try: try:
resources = response.json().get('data', []) resources = response.json().get('data', [])
except (ValueError, JSONDecodeError): except (ValueError, JSONDecodeError):
print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}") print(f"❌ Invalid JSON response when fetching resources: {response.text}")
# print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}")
return None return None
return resources[-1]['id'] if resources else None return resources[-1]['id'] if resources else None
@ -56,10 +57,13 @@ class BookingClient:
"customer_note": "", "customer_note": "",
"add_ons_by_service": {SERVICE_ID: [[]]}, "add_ons_by_service": {SERVICE_ID: [[]]},
"sub_bookings_by_service": {}, "sub_bookings_by_service": {},
"booking_quota_grant_id":"24735199",
"strategy": "multi-resource" "strategy": "multi-resource"
} }
# data = '{"resource_id":["15994"],"service_id":{"601":1},"start_date":"2026-02-03T22:30:00+01:00","end_date":"2026-02-03T23:30:00+01:00","description":"","customer_note":"","add_ons_by_service":{"601":[[]]},"sub_bookings_by_service":{},"booking_quota_grant_id":"24735199","strategy":"multi-resource"}'
) )
print("resource id: %s" % resource_id)
if not booking.ok: if not booking.ok:
print(f"❌ Booking failed: HTTP {booking.status_code}") print(f"❌ Booking failed: HTTP {booking.status_code}")
return False return False
@ -67,7 +71,8 @@ class BookingClient:
try: try:
data = booking.json().get("data", {}) data = booking.json().get("data", {})
except (ValueError, JSONDecodeError): except (ValueError, JSONDecodeError):
print(f"❌ Invalid JSON response from booking request: {booking.text[:200]}") # print(f"❌ Invalid JSON response from booking request: {booking.text[:200]}")
print(f"❌ Invalid JSON response from booking request: {booking.text}")
return False return False
oid = data.get("id") oid = data.get("id")
@ -85,7 +90,8 @@ class BookingClient:
try: try:
customer = checkout.json().get("default", {}).get("customer", {}) customer = checkout.json().get("default", {}).get("customer", {})
except (ValueError, JSONDecodeError): except (ValueError, JSONDecodeError):
print(f"❌ Invalid JSON response from checkout form: {checkout.text[:200]}") print(f"❌ Invalid JSON response from checkout form: {checkout.text}")
# print(f"❌ Invalid JSON response from checkout form: {checkout.text[:200]}")
return False return False
final = self.session.post( final = self.session.post(

View file

@ -3,10 +3,13 @@ ANNY_BASE_URL = "https://anny.eu"
BOOKING_API_BASE = "https://b.anny.eu/api/v1" BOOKING_API_BASE = "https://b.anny.eu/api/v1"
CHECKOUT_FORM_API = "https://b.anny.eu/api/ui/checkout-form" CHECKOUT_FORM_API = "https://b.anny.eu/api/ui/checkout-form"
RESOURCE_URL = f"{BOOKING_API_BASE}/resources/1-lehrbuchsammlung-eg-und-1-og/children" RESOURCE_URL = f"{BOOKING_API_BASE}/resources/1-lehrbuchsammlung-eg-und-1-og/children"
SERVICE_ID = "449" SERVICE_ID = "601"
# SERVICE_ID = "449"
RESOURCE_ID = None # "5957" # Will be set dynamically if None, else use the given ID RESOURCE_ID = None # "5957" # Will be set dynamically if None, else use the given ID
RESOURCE_ID =15994
TIMEZONE = "Europe/Berlin" TIMEZONE = "Europe/Berlin"
SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/) # SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/)
SSO_PROVIDER = "tum" # Available: kit (add more in auth/providers/)
DEFAULT_HEADERS = { DEFAULT_HEADERS = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0',
@ -17,15 +20,19 @@ DEFAULT_HEADERS = {
# Booking time slots (in order of priority) # Booking time slots (in order of priority)
BOOKING_TIMES = [ BOOKING_TIMES = [
{ {
'start': '14:00:00', 'start': '12:00:00',
'end': '19:00:00' 'end': '22:00:00'
},
{
'start': '09:00:00',
'end': '13:00:00'
},
{
'start': '20:00:00',
'end': '23:45:00'
}, },
# {
# 'start': '22:30:00',
# 'end': '23:30:00'
# },
# {
# 'start': '09:00:00',
# 'end': '13:00:00'
# },
# {
# 'start': '20:00:00',
# 'end': '23:45:00'
# },
] ]

169
debug_sso_redirect.py Normal file
View file

@ -0,0 +1,169 @@
import requests
import urllib.parse
import re
import html
AUTH_BASE_URL = "https://auth.anny.eu"
DEFAULT_HEADERS = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0',
'accept': 'application/vnd.api+json',
'accept-encoding': 'plain'
}
def extract_html_value(text, pattern):
match = re.search(pattern, text)
return match.group(1) if match else None
def debug_sso():
session = requests.Session()
# 1. Init Headers like AnnySession
session.headers.update({
**DEFAULT_HEADERS,
'accept': 'text/html, application/xhtml+xml',
'referer': AUTH_BASE_URL + '/',
'origin': AUTH_BASE_URL
})
print("--- 1. Initial Access ---")
r1 = session.get(f"{AUTH_BASE_URL}/login/sso")
session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN'])
page_data_match = re.search(r'data-page="(.*?)"', r1.text)
if not page_data_match:
print("Could not find data-page")
return
page_data = page_data_match.group(1).replace('&quot;', '"')
version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data)
x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93'
session.headers.update({
'x-requested-with': 'XMLHttpRequest',
'x-inertia': 'true',
'x-inertia-version': x_inertia_version
})
print(f"\n--- 2. SSO Request (TUM) ---")
r2 = session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": "tum.de"})
if 'x-inertia-location' not in r2.headers:
print("No x-inertia-location header found")
print(r2.text[:500])
return
redirect_url = r2.headers['x-inertia-location']
print(f"Redirect URL: {redirect_url}")
print("\n--- 3. Following Redirect (Simulating Provider Handoff) ---")
headers_to_remove = ['x-requested-with', 'x-inertia', 'x-inertia-version']
for h in headers_to_remove:
session.headers.pop(h, None)
print("Cleaned Anny-specific headers.")
current_url = redirect_url
# Loop to handle intermediate pages (e.g. cookie check)
for step in range(1, 6):
print(f"\n--- Step {step}: GET {current_url} ---")
if step == 1:
resp = session.get(current_url)
else:
# We are posting from previous step
pass
# Logic handled inside loop
# We need to render the response from the previous action.
# But wait, the loop structure is easier if we just do "process page".
pass
# Re-writing loop logic for clarity
resp = session.get(redirect_url)
for step in range(1, 6):
print(f"\n--- Step {step}: Processing Page {resp.url} ---")
# Check for login fields
if 'j_username' in resp.text:
print("FOUND: j_username field. This is the LOGIN PAGE.")
break
# Check for error
if "Cookies" in resp.text and "disabled" in resp.text:
print("ERROR: Page says Cookies disabled.")
# Extract form action
action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"')
if not action_url:
print("No form action found. End of flow?")
print(resp.text[:500])
break
if action_url.startswith('/'):
parsed = urllib.parse.urlparse(resp.url)
base = f"{parsed.scheme}://{parsed.netloc}"
action_url = base + action_url
print(f"Form Action: {action_url}")
# Extract inputs
inputs = re.findall(r'<input[^>]*name="([^"]+)"[^>]*value="([^"]*)"', resp.text)
data = {name: val for name, val in inputs}
print(f"Hidden inputs: {list(data.keys())}")
# If it's the cookie check page, there might be specific JS that auto-submits.
# usually just posting the form works.
if 'shib_idp_ls_success.shib_idp_session_ss' in data:
print("Detected Shibboleth LocalStorage check.")
# Submit to move to next page
print("Submitting form to proceed...")
# Ensure csrf if not in inputs
if 'csrf_token' not in data:
csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
if csrf: data['csrf_token'] = csrf
# Add basic things that might be needed
data['_eventId_proceed'] = ''
resp = session.post(action_url, data=data)
else:
print("Max steps reached without finding login page.")
return
# If we broke headers, we are at login page.
print("\n--- Attempting Login on Final Page ---")
action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"')
if action_url:
if action_url.startswith('/'):
parsed = urllib.parse.urlparse(resp.url)
base = f"{parsed.scheme}://{parsed.netloc}"
action_url = base + action_url
inputs = re.findall(r'<input[^>]*name="([^"]+)"[^>]*value="([^"]*)"', resp.text)
data = {name: val for name, val in inputs}
data['j_username'] = 'dummy_user'
data['j_password'] = 'dummy_pass'
data['_eventId_proceed'] = ''
data['donotcache'] = '1'
if 'csrf_token' not in data:
csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
if csrf: data['csrf_token'] = csrf
print(f"Posting creds to {action_url}")
r4 = session.post(action_url, data=data)
print(f"Result Code: {r4.status_code}")
unescaped = html.unescape(r4.text)
if "Identifizierung gescheitert" in unescaped or "Authentication failed" in unescaped:
print("SUCCESS: Got expected 'Authentication failed' message.")
else:
print("Result unknown.")
print(unescaped[:500])
if __name__ == "__main__":
debug_sso()

View file

@ -4,9 +4,9 @@ import datetime
import pytz import pytz
from config.constants import TIMEZONE from config.constants import TIMEZONE
def get_future_datetime(days_ahead=3, hour="13:00:00"): def get_future_datetime(days_ahead=4, hour="13:00:00"):
dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(days=days_ahead) dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(days=days_ahead)
return dt.strftime(f"%Y-%m-%dT{hour}+02:00") return dt.strftime(f"%Y-%m-%dT{hour}+01:00")
def extract_html_value(text, pattern): def extract_html_value(text, pattern):
match = re.search(pattern, text) match = re.search(pattern, text)