only formatting hopefully

This commit is contained in:
i 2026-02-06 17:42:40 +01:00
parent 025c87216d
commit 8ad5f1964e
9 changed files with 241 additions and 198 deletions

View file

@ -10,27 +10,28 @@ class KITProvider(SSOProvider):
domain = "kit.edu" domain = "kit.edu"
def authenticate(self) -> str: def authenticate(self) -> str:
self.session.headers.pop('x-requested-with', None) self.session.headers.pop("x-requested-with", None)
self.session.headers.pop('x-inertia', None) self.session.headers.pop("x-inertia", None)
self.session.headers.pop('x-inertia-version', None) self.session.headers.pop("x-inertia-version", None)
csrf_token = extract_html_value( csrf_token = extract_html_value(
self.redirect_response.text, self.redirect_response.text, r'name="csrf_token" value="([^"]+)"'
r'name="csrf_token" value="([^"]+)"'
) )
response = self.session.post( response = self.session.post(
'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1', "https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1",
data={ data={
'csrf_token': csrf_token, "csrf_token": csrf_token,
'j_username': self.username, "j_username": self.username,
'j_password': self.password, "j_password": self.password,
'_eventId_proceed': '', "_eventId_proceed": "",
'fudis_web_authn_assertion_input': '', "fudis_web_authn_assertion_input": "",
} },
) )
if "/consume" not in html.unescape(response.text): if "/consume" not in html.unescape(response.text):
raise ValueError("KIT authentication failed - invalid credentials or SSO error") raise ValueError(
"KIT authentication failed - invalid credentials or SSO error"
)
return response.text return response.text

View file

@ -1,4 +1,3 @@
import html,sys
from auth.providers.base import SSOProvider from auth.providers.base import SSOProvider
from utils.helpers import extract_html_value from utils.helpers import extract_html_value
@ -18,58 +17,55 @@ class TUMProvider(SSOProvider):
# 1. do one post to e1s1 # 1. do one post to e1s1
# 2. do post to e1s2 with login data # 2. do post to e1s2 with login data
# -> # ->
self.session.headers.pop('x-requested-with', None) self.session.headers.pop("x-requested-with", None)
self.session.headers.pop('x-inertia', None) self.session.headers.pop("x-inertia", None)
self.session.headers.pop('x-inertia-version', None) self.session.headers.pop("x-inertia-version", None)
csrf_token1 = extract_html_value( csrf_token1 = extract_html_value(
self.redirect_response.text, self.redirect_response.text, r'name="csrf_token" value="([^"]+)"'
r'name="csrf_token" value="([^"]+)"'
) )
response1 = self.session.post( response1 = self.session.post(
# 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1', # 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1',
'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1', "https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1",
data={ data={
'csrf_token': csrf_token1, "csrf_token": csrf_token1,
'shib_idp_ls_exception.shib_idp_session_ss': '', "shib_idp_ls_exception.shib_idp_session_ss": "",
'shib_idp_ls_success.shib_idp_session_ss': 'true', "shib_idp_ls_success.shib_idp_session_ss": "true",
'shib_idp_ls_value.shib_idp_session_ss': '', "shib_idp_ls_value.shib_idp_session_ss": "",
'shib_idp_ls_exception.shib_idp_persistent_ss': '', "shib_idp_ls_exception.shib_idp_persistent_ss": "",
'shib_idp_ls_success.shib_idp_persistent_ss': 'true', "shib_idp_ls_success.shib_idp_persistent_ss": "true",
'shib_idp_ls_value.shib_idp_persistent_ss': '', "shib_idp_ls_value.shib_idp_persistent_ss": "",
'shib_idp_ls_supported': 'true', "shib_idp_ls_supported": "true",
'_eventId_proceed': '', "_eventId_proceed": "",
} },
) )
# print(response1.text) # print(response1.text)
csrf_token2 = extract_html_value( csrf_token2 = extract_html_value(
response1.text, response1.text, r'name="csrf_token" value="([^"]+)"'
r'name="csrf_token" value="([^"]+)"'
) )
response2 = self.session.post( response2 = self.session.post(
'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2', "https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2",
data={ data={
'csrf_token': csrf_token2, "csrf_token": csrf_token2,
'j_username': self.username, "j_username": self.username,
'j_password': self.password, "j_password": self.password,
'donotcache': '1', "donotcache": "1",
'_eventId_proceed': '', "_eventId_proceed": "",
} },
) )
# print(response2.text) # print(response2.text)
saml_response = extract_html_value( saml_response = extract_html_value(
response2.text, response2.text, r'name="SAMLResponse" value="([^"]+)"'
r'name="SAMLResponse" value="([^"]+)"'
) )
if len(saml_response)<3: if len(saml_response) < 3:
raise ValueError("TUM auth no work:(") raise ValueError("TUM auth no work:(")
else: else:
print("nice we got saml response starting with: "+saml_response[0:49]) print("nice we got saml response starting with: " + saml_response[0:49])
return response2.text return response2.text
# print(saml_response) # print(saml_response)
# sys.exit() # sys.exit()

View file

@ -35,29 +35,39 @@ class AnnySession:
return None return None
def _init_headers(self): def _init_headers(self):
self.session.headers.update({ self.session.headers.update(
{
**DEFAULT_HEADERS, **DEFAULT_HEADERS,
'accept': 'text/html, application/xhtml+xml', "accept": "text/html, application/xhtml+xml",
'referer': AUTH_BASE_URL + '/', "referer": AUTH_BASE_URL + "/",
'origin': AUTH_BASE_URL "origin": AUTH_BASE_URL,
}) }
)
def _sso_login(self): def _sso_login(self):
r1 = self.session.get(f"{AUTH_BASE_URL}/login/sso") r1 = self.session.get(f"{AUTH_BASE_URL}/login/sso")
self.session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN']) self.session.headers["X-XSRF-TOKEN"] = urllib.parse.unquote(
r1.cookies["XSRF-TOKEN"]
)
page_data = extract_html_value(r1.text, r'data-page="(.*?)"') page_data = extract_html_value(r1.text, r'data-page="(.*?)"')
version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data) version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data)
x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93' x_inertia_version = (
version.group(1) if version else "66b32acea13402d3aef4488ccd239c93"
)
self.session.headers.update({ self.session.headers.update(
'x-requested-with': 'XMLHttpRequest', {
'x-inertia': 'true', "x-requested-with": "XMLHttpRequest",
'x-inertia-version': x_inertia_version "x-inertia": "true",
}) "x-inertia-version": x_inertia_version,
}
)
r2 = self.session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": self.provider.domain}) r2 = self.session.post(
redirect_url = r2.headers['x-inertia-location'] f"{AUTH_BASE_URL}/login/sso", json={"domain": self.provider.domain}
)
redirect_url = r2.headers["x-inertia-location"]
redirect_response = self.session.get(redirect_url) redirect_response = self.session.get(redirect_url)
# Pass session and redirect response to provider # Pass session and redirect response to provider
@ -69,13 +79,20 @@ class AnnySession:
self.saml_response_html = self.provider.authenticate() self.saml_response_html = self.provider.authenticate()
def _consume_saml(self): def _consume_saml(self):
consume_url = extract_html_value(self.saml_response_html, r'form action="([^"]+)"') consume_url = extract_html_value(
relay_state = extract_html_value(self.saml_response_html, r'name="RelayState" value="([^"]+)"') self.saml_response_html, r'form action="([^"]+)"'
saml_response = extract_html_value(self.saml_response_html, r'name="SAMLResponse" value="([^"]+)"') )
relay_state = extract_html_value(
self.saml_response_html, r'name="RelayState" value="([^"]+)"'
)
saml_response = extract_html_value(
self.saml_response_html, r'name="SAMLResponse" value="([^"]+)"'
)
self.session.post(consume_url, data={ self.session.post(
'RelayState': relay_state, consume_url, data={"RelayState": relay_state, "SAMLResponse": saml_response}
'SAMLResponse': saml_response )
})
self.session.get(f"{ANNY_BASE_URL}/en-us/login?target=/en-us/home?withoutIntent=true") self.session.get(
f"{ANNY_BASE_URL}/en-us/login?target=/en-us/home?withoutIntent=true"
)

View file

@ -1,45 +1,57 @@
import requests import requests
from requests.exceptions import JSONDecodeError from requests.exceptions import JSONDecodeError
from config.constants import RESOURCE_URL, BOOKING_API_BASE, CHECKOUT_FORM_API, ANNY_BASE_URL, SERVICE_ID from config.constants import (
RESOURCE_URL,
BOOKING_API_BASE,
CHECKOUT_FORM_API,
ANNY_BASE_URL,
SERVICE_ID,
)
class BookingClient: class BookingClient:
def __init__(self, cookies): def __init__(self, cookies):
self.session = requests.Session() self.session = requests.Session()
self.session.cookies = cookies self.session.cookies = cookies
self.token = cookies.get('anny_shop_jwt') self.token = cookies.get("anny_shop_jwt")
self.session.headers.update({ self.session.headers.update(
'authorization': f'Bearer {self.token}', {
'content-type': 'application/vnd.api+json', "authorization": f"Bearer {self.token}",
'origin': ANNY_BASE_URL, "content-type": "application/vnd.api+json",
'referer': ANNY_BASE_URL + '/', "origin": ANNY_BASE_URL,
'user-agent': 'Mozilla/5.0' "referer": ANNY_BASE_URL + "/",
}) "user-agent": "Mozilla/5.0",
}
)
def find_available_resource(self, start, end): def find_available_resource(self, start, end):
response = self.session.get(RESOURCE_URL, params={ response = self.session.get(
'page[number]': 1, RESOURCE_URL,
'page[size]': 250, params={
'filter[available_from]': start, "page[number]": 1,
'filter[available_to]': end, "page[size]": 250,
'filter[availability_exact_match]': 1, "filter[available_from]": start,
'filter[exclude_hidden]': 0, "filter[available_to]": end,
'filter[exclude_child_resources]': 0, "filter[availability_exact_match]": 1,
'filter[availability_service_id]': int(SERVICE_ID), "filter[exclude_hidden]": 0,
'filter[include_unavailable]': 0, "filter[exclude_child_resources]": 0,
'filter[pre_order_ids]': '', "filter[availability_service_id]": int(SERVICE_ID),
'sort': 'name' "filter[include_unavailable]": 0,
}) "filter[pre_order_ids]": "",
"sort": "name",
},
)
if not response.ok: if not response.ok:
print(f"❌ Failed to fetch resources: HTTP {response.status_code}") print(f"❌ Failed to fetch resources: HTTP {response.status_code}")
return None return None
try: try:
resources = response.json().get('data', []) resources = response.json().get("data", [])
except (ValueError, JSONDecodeError): except (ValueError, JSONDecodeError):
print(f"❌ Invalid JSON response when fetching resources: {response.text}") print(f"❌ Invalid JSON response when fetching resources: {response.text}")
# print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}") # print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}")
return None return None
return resources[-1]['id'] if resources else None return resources[-1]["id"] if resources else None
def reserve(self, resource_id, start, end): def reserve(self, resource_id, start, end):
print(start) print(start)
@ -47,8 +59,8 @@ class BookingClient:
booking = self.session.post( booking = self.session.post(
f"{BOOKING_API_BASE}/order/bookings", f"{BOOKING_API_BASE}/order/bookings",
params={ params={
'stateless': '1', "stateless": "1",
'include': 'customer,voucher,bookings.booking_add_ons.add_on.cover_image,bookings.sub_bookings.resource,bookings.sub_bookings.service,bookings.customer,bookings.service.custom_forms.custom_fields,bookings.service.add_ons.cover_image,bookings.service.add_ons.group,bookings.cancellation_policy,bookings.resource.cover_image,bookings.resource.parent,bookings.resource.location,bookings.resource.category,bookings.reminders,bookings.booking_series,bookings.sequenced_bookings.resource,bookings.sequenced_bookings.service,bookings.sequenced_bookings.service.add_ons.cover_image,bookings.sequenced_bookings.service.add_ons.group,bookings.booking_participants,sub_orders.bookings,sub_orders.organization.legal_documents' "include": "customer,voucher,bookings.booking_add_ons.add_on.cover_image,bookings.sub_bookings.resource,bookings.sub_bookings.service,bookings.customer,bookings.service.custom_forms.custom_fields,bookings.service.add_ons.cover_image,bookings.service.add_ons.group,bookings.cancellation_policy,bookings.resource.cover_image,bookings.resource.parent,bookings.resource.location,bookings.resource.category,bookings.reminders,bookings.booking_series,bookings.sequenced_bookings.resource,bookings.sequenced_bookings.service,bookings.sequenced_bookings.service.add_ons.cover_image,bookings.sequenced_bookings.service.add_ons.group,bookings.booking_participants,sub_orders.bookings,sub_orders.organization.legal_documents",
}, },
json={ json={
"resource_id": [resource_id], "resource_id": [resource_id],
@ -60,9 +72,9 @@ class BookingClient:
"add_ons_by_service": {SERVICE_ID: [[]]}, "add_ons_by_service": {SERVICE_ID: [[]]},
"sub_bookings_by_service": {}, "sub_bookings_by_service": {},
# "booking_quota_grant_id":"24735199", # "booking_quota_grant_id":"24735199",
"booking_quota_grant_id":"24735202", "booking_quota_grant_id": "24735202",
"strategy": "multi-resource" "strategy": "multi-resource",
} },
# data = '{"resource_id":["15994"],"service_id":{"601":1},"start_date":"2026-02-03T22:30:00+01:00","end_date":"2026-02-03T23:30:00+01:00","description":"","customer_note":"","add_ons_by_service":{"601":[[]]},"sub_bookings_by_service":{},"booking_quota_grant_id":"24735199","strategy":"multi-resource"}' # data = '{"resource_id":["15994"],"service_id":{"601":1},"start_date":"2026-02-03T22:30:00+01:00","end_date":"2026-02-03T23:30:00+01:00","description":"","customer_note":"","add_ons_by_service":{"601":[[]]},"sub_bookings_by_service":{},"booking_quota_grant_id":"24735199","strategy":"multi-resource"}'
) )
@ -85,7 +97,9 @@ class BookingClient:
print("❌ Missing booking ID or access token in response") print("❌ Missing booking ID or access token in response")
return False return False
checkout = self.session.get(f"{CHECKOUT_FORM_API}?oid={oid}&oat={oat}&stateless=1") checkout = self.session.get(
f"{CHECKOUT_FORM_API}?oid={oid}&oat={oat}&stateless=1"
)
if not checkout.ok: if not checkout.ok:
print(f"❌ Checkout form failed: HTTP {checkout.status_code}") print(f"❌ Checkout form failed: HTTP {checkout.status_code}")
return False return False
@ -103,14 +117,14 @@ class BookingClient:
"customer": { "customer": {
"given_name": customer.get("given_name"), "given_name": customer.get("given_name"),
"family_name": customer.get("family_name"), "family_name": customer.get("family_name"),
"email": customer.get("email") "email": customer.get("email"),
}, },
"accept_terms": True, "accept_terms": True,
"payment_method": "", "payment_method": "",
"success_url": f"{ANNY_BASE_URL}/checkout/success?oids={oid}&oats={oat}", "success_url": f"{ANNY_BASE_URL}/checkout/success?oids={oid}&oats={oat}",
"cancel_url": f"{ANNY_BASE_URL}/checkout?step=checkout&childResource={resource_id}", "cancel_url": f"{ANNY_BASE_URL}/checkout?step=checkout&childResource={resource_id}",
"meta": {"timezone": "Europe/Berlin"} "meta": {"timezone": "Europe/Berlin"},
} },
) )
if final.ok: if final.ok:

View file

@ -10,16 +10,16 @@ SERVICE_ID = "601"
# RESOURCE_ID =15994 # 91 height adjustable desk # RESOURCE_ID =15994 # 91 height adjustable desk
# RESOURCE_ID =16402 # 222 study room # RESOURCE_ID =16402 # 222 study room
# RESOURCE_ID =15883 #66 second best # RESOURCE_ID =15883 #66 second best
RESOURCE_ID =15502 # 17 cool snipe with view RESOURCE_ID = 15502 # 17 cool snipe with view
TIMEZONE = "Europe/Berlin" TIMEZONE = "Europe/Berlin"
# SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/) # SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/)
SSO_PROVIDER = "tum" # Available: kit (add more in auth/providers/) SSO_PROVIDER = "tum" # Available: kit (add more in auth/providers/)
DEFAULT_HEADERS = { DEFAULT_HEADERS = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0', "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0",
'accept': 'application/vnd.api+json', "accept": "application/vnd.api+json",
'accept-encoding': 'plain' "accept-encoding": "plain",
} }
# Booking time slots (in order of priority) # Booking time slots (in order of priority)
@ -39,28 +39,22 @@ DEFAULT_HEADERS = {
BOOKING_TIMES = [ BOOKING_TIMES = [
# SNIPE # SNIPE
# { # {
# 'start': '13:00:00', # 'start': '13:00:00',
# 'end': '18:00:00' # 'end': '18:00:00'
# }, # },
# BEST # BEST
# {
# 'start': '12:00:00',
# 'end': '22:00:00'
# },
# OPTIMAL
# 40/7=5.7
{
'start': '12:00:00',
'end': '17:30:00'
},
# { # {
# 'start': '12:00:00', # 'start': '12:00:00',
# 'end': '22:00:00' # 'end': '22:00:00'
# }, # },
# OPTIMAL
# 40/7=5.7
{"start": "12:00:00", "end": "17:30:00"},
# {
# 'start': '12:00:00',
# 'end': '22:00:00'
# },
# { # {
# 'start': '22:30:00', # 'start': '22:30:00',
# 'end': '23:30:00' # 'end': '23:30:00'
@ -73,3 +67,4 @@ BOOKING_TIMES = [
# 'start': '20:00:00', # 'start': '20:00:00',
# 'end': '23:45:00' # 'end': '23:45:00'
# }, # },
]

View file

@ -5,58 +5,66 @@ import html
AUTH_BASE_URL = "https://auth.anny.eu" AUTH_BASE_URL = "https://auth.anny.eu"
DEFAULT_HEADERS = { DEFAULT_HEADERS = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0', "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0",
'accept': 'application/vnd.api+json', "accept": "application/vnd.api+json",
'accept-encoding': 'plain' "accept-encoding": "plain",
} }
def extract_html_value(text, pattern): def extract_html_value(text, pattern):
match = re.search(pattern, text) match = re.search(pattern, text)
return match.group(1) if match else None return match.group(1) if match else None
def debug_sso(): def debug_sso():
session = requests.Session() session = requests.Session()
# 1. Init Headers like AnnySession # 1. Init Headers like AnnySession
session.headers.update({ session.headers.update(
{
**DEFAULT_HEADERS, **DEFAULT_HEADERS,
'accept': 'text/html, application/xhtml+xml', "accept": "text/html, application/xhtml+xml",
'referer': AUTH_BASE_URL + '/', "referer": AUTH_BASE_URL + "/",
'origin': AUTH_BASE_URL "origin": AUTH_BASE_URL,
}) }
)
print("--- 1. Initial Access ---") print("--- 1. Initial Access ---")
r1 = session.get(f"{AUTH_BASE_URL}/login/sso") r1 = session.get(f"{AUTH_BASE_URL}/login/sso")
session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN']) session.headers["X-XSRF-TOKEN"] = urllib.parse.unquote(r1.cookies["XSRF-TOKEN"])
page_data_match = re.search(r'data-page="(.*?)"', r1.text) page_data_match = re.search(r'data-page="(.*?)"', r1.text)
if not page_data_match: if not page_data_match:
print("Could not find data-page") print("Could not find data-page")
return return
page_data = page_data_match.group(1).replace('&quot;', '"') page_data = page_data_match.group(1).replace("&quot;", '"')
version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data) version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data)
x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93' x_inertia_version = (
version.group(1) if version else "66b32acea13402d3aef4488ccd239c93"
)
session.headers.update({ session.headers.update(
'x-requested-with': 'XMLHttpRequest', {
'x-inertia': 'true', "x-requested-with": "XMLHttpRequest",
'x-inertia-version': x_inertia_version "x-inertia": "true",
}) "x-inertia-version": x_inertia_version,
}
)
print(f"\n--- 2. SSO Request (TUM) ---") print("\n--- 2. SSO Request (TUM) ---")
r2 = session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": "tum.de"}) r2 = session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": "tum.de"})
if 'x-inertia-location' not in r2.headers: if "x-inertia-location" not in r2.headers:
print("No x-inertia-location header found") print("No x-inertia-location header found")
print(r2.text[:500]) print(r2.text[:500])
return return
redirect_url = r2.headers['x-inertia-location'] redirect_url = r2.headers["x-inertia-location"]
print(f"Redirect URL: {redirect_url}") print(f"Redirect URL: {redirect_url}")
print("\n--- 3. Following Redirect (Simulating Provider Handoff) ---") print("\n--- 3. Following Redirect (Simulating Provider Handoff) ---")
headers_to_remove = ['x-requested-with', 'x-inertia', 'x-inertia-version'] headers_to_remove = ["x-requested-with", "x-inertia", "x-inertia-version"]
for h in headers_to_remove: for h in headers_to_remove:
session.headers.pop(h, None) session.headers.pop(h, None)
print("Cleaned Anny-specific headers.") print("Cleaned Anny-specific headers.")
@ -84,7 +92,7 @@ def debug_sso():
print(f"\n--- Step {step}: Processing Page {resp.url} ---") print(f"\n--- Step {step}: Processing Page {resp.url} ---")
# Check for login fields # Check for login fields
if 'j_username' in resp.text: if "j_username" in resp.text:
print("FOUND: j_username field. This is the LOGIN PAGE.") print("FOUND: j_username field. This is the LOGIN PAGE.")
break break
@ -99,7 +107,7 @@ def debug_sso():
print(resp.text[:500]) print(resp.text[:500])
break break
if action_url.startswith('/'): if action_url.startswith("/"):
parsed = urllib.parse.urlparse(resp.url) parsed = urllib.parse.urlparse(resp.url)
base = f"{parsed.scheme}://{parsed.netloc}" base = f"{parsed.scheme}://{parsed.netloc}"
action_url = base + action_url action_url = base + action_url
@ -114,18 +122,19 @@ def debug_sso():
# If it's the cookie check page, there might be specific JS that auto-submits. # If it's the cookie check page, there might be specific JS that auto-submits.
# usually just posting the form works. # usually just posting the form works.
if 'shib_idp_ls_success.shib_idp_session_ss' in data: if "shib_idp_ls_success.shib_idp_session_ss" in data:
print("Detected Shibboleth LocalStorage check.") print("Detected Shibboleth LocalStorage check.")
# Submit to move to next page # Submit to move to next page
print("Submitting form to proceed...") print("Submitting form to proceed...")
# Ensure csrf if not in inputs # Ensure csrf if not in inputs
if 'csrf_token' not in data: if "csrf_token" not in data:
csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
if csrf: data['csrf_token'] = csrf if csrf:
data["csrf_token"] = csrf
# Add basic things that might be needed # Add basic things that might be needed
data['_eventId_proceed'] = '' data["_eventId_proceed"] = ""
resp = session.post(action_url, data=data) resp = session.post(action_url, data=data)
@ -137,7 +146,7 @@ def debug_sso():
print("\n--- Attempting Login on Final Page ---") print("\n--- Attempting Login on Final Page ---")
action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"') action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"')
if action_url: if action_url:
if action_url.startswith('/'): if action_url.startswith("/"):
parsed = urllib.parse.urlparse(resp.url) parsed = urllib.parse.urlparse(resp.url)
base = f"{parsed.scheme}://{parsed.netloc}" base = f"{parsed.scheme}://{parsed.netloc}"
action_url = base + action_url action_url = base + action_url
@ -145,20 +154,24 @@ def debug_sso():
inputs = re.findall(r'<input[^>]*name="([^"]+)"[^>]*value="([^"]*)"', resp.text) inputs = re.findall(r'<input[^>]*name="([^"]+)"[^>]*value="([^"]*)"', resp.text)
data = {name: val for name, val in inputs} data = {name: val for name, val in inputs}
data['j_username'] = 'dummy_user' data["j_username"] = "dummy_user"
data['j_password'] = 'dummy_pass' data["j_password"] = "dummy_pass"
data['_eventId_proceed'] = '' data["_eventId_proceed"] = ""
data['donotcache'] = '1' data["donotcache"] = "1"
if 'csrf_token' not in data: if "csrf_token" not in data:
csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"') csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
if csrf: data['csrf_token'] = csrf if csrf:
data["csrf_token"] = csrf
print(f"Posting creds to {action_url}") print(f"Posting creds to {action_url}")
r4 = session.post(action_url, data=data) r4 = session.post(action_url, data=data)
print(f"Result Code: {r4.status_code}") print(f"Result Code: {r4.status_code}")
unescaped = html.unescape(r4.text) unescaped = html.unescape(r4.text)
if "Identifizierung gescheitert" in unescaped or "Authentication failed" in unescaped: if (
"Identifizierung gescheitert" in unescaped
or "Authentication failed" in unescaped
):
print("SUCCESS: Got expected 'Authentication failed' message.") print("SUCCESS: Got expected 'Authentication failed' message.")
else: else:
print("Result unknown.") print("Result unknown.")

16
main.py
View file

@ -9,8 +9,9 @@ from utils.helpers import get_future_datetime
import pytz import pytz
from config.constants import RESOURCE_ID, TIMEZONE, SSO_PROVIDER, BOOKING_TIMES from config.constants import RESOURCE_ID, TIMEZONE, SSO_PROVIDER, BOOKING_TIMES
def main(): def main():
load_dotenv('.env', override=True) load_dotenv(".env", override=True)
username = os.getenv("USERNAME") username = os.getenv("USERNAME")
password = os.getenv("PASSWORD") password = os.getenv("PASSWORD")
@ -30,20 +31,22 @@ def main():
# Only wait for midnight if within 10 minutes, otherwise execute immediately # Only wait for midnight if within 10 minutes, otherwise execute immediately
now = datetime.datetime.now(tz) now = datetime.datetime.now(tz)
midnight = (now + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) midnight = (now + datetime.timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
seconds_until_midnight = (midnight - now).total_seconds() seconds_until_midnight = (midnight - now).total_seconds()
max_wait_seconds = 10 * 60 # 10 minutes max_wait_seconds = 10 * 60 # 10 minutes
if 0 < seconds_until_midnight <= max_wait_seconds: if 0 < seconds_until_midnight <= max_wait_seconds:
print(f"⏳ Waiting {seconds_until_midnight:.0f} seconds until midnight...") print("⏳ Waiting {seconds_until_midnight:.0f} seconds until midnight...")
time.sleep(seconds_until_midnight) time.sleep(seconds_until_midnight)
elif seconds_until_midnight > max_wait_seconds: elif seconds_until_midnight > max_wait_seconds:
print(f"⚡ More than 10 min until midnight, executing immediately...") print("⚡ More than 10 min until midnight, executing immediately...")
for time_ in BOOKING_TIMES: for time_ in BOOKING_TIMES:
try: try:
start = get_future_datetime(hour=time_['start']) start = get_future_datetime(hour=time_["start"])
end = get_future_datetime(hour=time_['end']) end = get_future_datetime(hour=time_["end"])
if RESOURCE_ID: if RESOURCE_ID:
resource_id = RESOURCE_ID resource_id = RESOURCE_ID
@ -57,5 +60,6 @@ def main():
except Exception as e: except Exception as e:
print(f"❌ Error booking slot {time_['start']}-{time_['end']}: {e}") print(f"❌ Error booking slot {time_['start']}-{time_['end']}: {e}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -4,6 +4,5 @@ with open("nice.json", "r", encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
for i in data["data"]: for i in data["data"]:
print ("%-32s id: %s" % (i["attributes"]["name"], i["id"])) print("%-32s id: %s" % (i["attributes"]["name"], i["id"]))
print(type(data)) # usually <class 'dict'> print(type(data)) # usually <class 'dict'>

View file

@ -4,10 +4,14 @@ import datetime
import pytz import pytz
from config.constants import TIMEZONE from config.constants import TIMEZONE
def get_future_datetime(days_ahead=4, hour="13:00:00"): def get_future_datetime(days_ahead=4, hour="13:00:00"):
dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(days=days_ahead) dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(
days=days_ahead
)
return dt.strftime(f"%Y-%m-%dT{hour}+01:00") return dt.strftime(f"%Y-%m-%dT{hour}+01:00")
def extract_html_value(text, pattern): def extract_html_value(text, pattern):
match = re.search(pattern, text) match = re.search(pattern, text)
if not match: if not match: