diff --git a/auth/providers/kit.py b/auth/providers/kit.py
index 71f06bb..f98c02b 100644
--- a/auth/providers/kit.py
+++ b/auth/providers/kit.py
@@ -10,27 +10,28 @@ class KITProvider(SSOProvider):
domain = "kit.edu"
def authenticate(self) -> str:
- self.session.headers.pop('x-requested-with', None)
- self.session.headers.pop('x-inertia', None)
- self.session.headers.pop('x-inertia-version', None)
+ self.session.headers.pop("x-requested-with", None)
+ self.session.headers.pop("x-inertia", None)
+ self.session.headers.pop("x-inertia-version", None)
csrf_token = extract_html_value(
- self.redirect_response.text,
- r'name="csrf_token" value="([^"]+)"'
+ self.redirect_response.text, r'name="csrf_token" value="([^"]+)"'
)
response = self.session.post(
- 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1',
+ "https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1",
data={
- 'csrf_token': csrf_token,
- 'j_username': self.username,
- 'j_password': self.password,
- '_eventId_proceed': '',
- 'fudis_web_authn_assertion_input': '',
- }
+ "csrf_token": csrf_token,
+ "j_username": self.username,
+ "j_password": self.password,
+ "_eventId_proceed": "",
+ "fudis_web_authn_assertion_input": "",
+ },
)
if "/consume" not in html.unescape(response.text):
- raise ValueError("KIT authentication failed - invalid credentials or SSO error")
+ raise ValueError(
+ "KIT authentication failed - invalid credentials or SSO error"
+ )
return response.text
diff --git a/auth/providers/tum.py b/auth/providers/tum.py
index 2b3598d..a80228d 100644
--- a/auth/providers/tum.py
+++ b/auth/providers/tum.py
@@ -1,4 +1,3 @@
-import html,sys
from auth.providers.base import SSOProvider
from utils.helpers import extract_html_value
@@ -18,58 +17,55 @@ class TUMProvider(SSOProvider):
# 1. do one post to e1s1
# 2. do post to e1s2 with login data
# ->
- self.session.headers.pop('x-requested-with', None)
- self.session.headers.pop('x-inertia', None)
- self.session.headers.pop('x-inertia-version', None)
+ self.session.headers.pop("x-requested-with", None)
+ self.session.headers.pop("x-inertia", None)
+ self.session.headers.pop("x-inertia-version", None)
csrf_token1 = extract_html_value(
- self.redirect_response.text,
- r'name="csrf_token" value="([^"]+)"'
+ self.redirect_response.text, r'name="csrf_token" value="([^"]+)"'
)
response1 = self.session.post(
# 'https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO?execution=e1s1',
- 'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1',
+ "https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s1",
data={
- 'csrf_token': csrf_token1,
- 'shib_idp_ls_exception.shib_idp_session_ss': '',
- 'shib_idp_ls_success.shib_idp_session_ss': 'true',
- 'shib_idp_ls_value.shib_idp_session_ss': '',
- 'shib_idp_ls_exception.shib_idp_persistent_ss': '',
- 'shib_idp_ls_success.shib_idp_persistent_ss': 'true',
- 'shib_idp_ls_value.shib_idp_persistent_ss': '',
- 'shib_idp_ls_supported': 'true',
- '_eventId_proceed': '',
- }
+ "csrf_token": csrf_token1,
+ "shib_idp_ls_exception.shib_idp_session_ss": "",
+ "shib_idp_ls_success.shib_idp_session_ss": "true",
+ "shib_idp_ls_value.shib_idp_session_ss": "",
+ "shib_idp_ls_exception.shib_idp_persistent_ss": "",
+ "shib_idp_ls_success.shib_idp_persistent_ss": "true",
+ "shib_idp_ls_value.shib_idp_persistent_ss": "",
+ "shib_idp_ls_supported": "true",
+ "_eventId_proceed": "",
+ },
)
# print(response1.text)
csrf_token2 = extract_html_value(
- response1.text,
- r'name="csrf_token" value="([^"]+)"'
+ response1.text, r'name="csrf_token" value="([^"]+)"'
)
response2 = self.session.post(
- 'https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2',
+ "https://login.tum.de/idp/profile/SAML2/Redirect/SSO?execution=e1s2",
data={
- 'csrf_token': csrf_token2,
- 'j_username': self.username,
- 'j_password': self.password,
- 'donotcache': '1',
- '_eventId_proceed': '',
- }
+ "csrf_token": csrf_token2,
+ "j_username": self.username,
+ "j_password": self.password,
+ "donotcache": "1",
+ "_eventId_proceed": "",
+ },
)
# print(response2.text)
saml_response = extract_html_value(
- response2.text,
- r'name="SAMLResponse" value="([^"]+)"'
+ response2.text, r'name="SAMLResponse" value="([^"]+)"'
)
- if len(saml_response)<3:
+ if len(saml_response) < 3:
raise ValueError("TUM auth no work:(")
else:
- print("nice we got saml response starting with: "+saml_response[0:49])
+ print("nice we got saml response starting with: " + saml_response[0:49])
return response2.text
# print(saml_response)
# sys.exit()
diff --git a/auth/session.py b/auth/session.py
index 38dd2ea..77d3b57 100644
--- a/auth/session.py
+++ b/auth/session.py
@@ -35,29 +35,39 @@ class AnnySession:
return None
def _init_headers(self):
- self.session.headers.update({
- **DEFAULT_HEADERS,
- 'accept': 'text/html, application/xhtml+xml',
- 'referer': AUTH_BASE_URL + '/',
- 'origin': AUTH_BASE_URL
- })
+ self.session.headers.update(
+ {
+ **DEFAULT_HEADERS,
+ "accept": "text/html, application/xhtml+xml",
+ "referer": AUTH_BASE_URL + "/",
+ "origin": AUTH_BASE_URL,
+ }
+ )
def _sso_login(self):
r1 = self.session.get(f"{AUTH_BASE_URL}/login/sso")
- self.session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN'])
+ self.session.headers["X-XSRF-TOKEN"] = urllib.parse.unquote(
+ r1.cookies["XSRF-TOKEN"]
+ )
page_data = extract_html_value(r1.text, r'data-page="(.*?)"')
version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data)
- x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93'
+ x_inertia_version = (
+ version.group(1) if version else "66b32acea13402d3aef4488ccd239c93"
+ )
- self.session.headers.update({
- 'x-requested-with': 'XMLHttpRequest',
- 'x-inertia': 'true',
- 'x-inertia-version': x_inertia_version
- })
+ self.session.headers.update(
+ {
+ "x-requested-with": "XMLHttpRequest",
+ "x-inertia": "true",
+ "x-inertia-version": x_inertia_version,
+ }
+ )
- r2 = self.session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": self.provider.domain})
- redirect_url = r2.headers['x-inertia-location']
+ r2 = self.session.post(
+ f"{AUTH_BASE_URL}/login/sso", json={"domain": self.provider.domain}
+ )
+ redirect_url = r2.headers["x-inertia-location"]
redirect_response = self.session.get(redirect_url)
# Pass session and redirect response to provider
@@ -69,13 +79,20 @@ class AnnySession:
self.saml_response_html = self.provider.authenticate()
def _consume_saml(self):
- consume_url = extract_html_value(self.saml_response_html, r'form action="([^"]+)"')
- relay_state = extract_html_value(self.saml_response_html, r'name="RelayState" value="([^"]+)"')
- saml_response = extract_html_value(self.saml_response_html, r'name="SAMLResponse" value="([^"]+)"')
+ consume_url = extract_html_value(
+ self.saml_response_html, r'form action="([^"]+)"'
+ )
+ relay_state = extract_html_value(
+ self.saml_response_html, r'name="RelayState" value="([^"]+)"'
+ )
+ saml_response = extract_html_value(
+ self.saml_response_html, r'name="SAMLResponse" value="([^"]+)"'
+ )
- self.session.post(consume_url, data={
- 'RelayState': relay_state,
- 'SAMLResponse': saml_response
- })
+ self.session.post(
+ consume_url, data={"RelayState": relay_state, "SAMLResponse": saml_response}
+ )
- self.session.get(f"{ANNY_BASE_URL}/en-us/login?target=/en-us/home?withoutIntent=true")
+ self.session.get(
+ f"{ANNY_BASE_URL}/en-us/login?target=/en-us/home?withoutIntent=true"
+ )
diff --git a/booking/client.py b/booking/client.py
index 42b955e..424667c 100644
--- a/booking/client.py
+++ b/booking/client.py
@@ -1,45 +1,57 @@
import requests
from requests.exceptions import JSONDecodeError
-from config.constants import RESOURCE_URL, BOOKING_API_BASE, CHECKOUT_FORM_API, ANNY_BASE_URL, SERVICE_ID
+from config.constants import (
+ RESOURCE_URL,
+ BOOKING_API_BASE,
+ CHECKOUT_FORM_API,
+ ANNY_BASE_URL,
+ SERVICE_ID,
+)
+
class BookingClient:
def __init__(self, cookies):
self.session = requests.Session()
self.session.cookies = cookies
- self.token = cookies.get('anny_shop_jwt')
+ self.token = cookies.get("anny_shop_jwt")
- self.session.headers.update({
- 'authorization': f'Bearer {self.token}',
- 'content-type': 'application/vnd.api+json',
- 'origin': ANNY_BASE_URL,
- 'referer': ANNY_BASE_URL + '/',
- 'user-agent': 'Mozilla/5.0'
- })
+ self.session.headers.update(
+ {
+ "authorization": f"Bearer {self.token}",
+ "content-type": "application/vnd.api+json",
+ "origin": ANNY_BASE_URL,
+ "referer": ANNY_BASE_URL + "/",
+ "user-agent": "Mozilla/5.0",
+ }
+ )
def find_available_resource(self, start, end):
- response = self.session.get(RESOURCE_URL, params={
- 'page[number]': 1,
- 'page[size]': 250,
- 'filter[available_from]': start,
- 'filter[available_to]': end,
- 'filter[availability_exact_match]': 1,
- 'filter[exclude_hidden]': 0,
- 'filter[exclude_child_resources]': 0,
- 'filter[availability_service_id]': int(SERVICE_ID),
- 'filter[include_unavailable]': 0,
- 'filter[pre_order_ids]': '',
- 'sort': 'name'
- })
+ response = self.session.get(
+ RESOURCE_URL,
+ params={
+ "page[number]": 1,
+ "page[size]": 250,
+ "filter[available_from]": start,
+ "filter[available_to]": end,
+ "filter[availability_exact_match]": 1,
+ "filter[exclude_hidden]": 0,
+ "filter[exclude_child_resources]": 0,
+ "filter[availability_service_id]": int(SERVICE_ID),
+ "filter[include_unavailable]": 0,
+ "filter[pre_order_ids]": "",
+ "sort": "name",
+ },
+ )
if not response.ok:
print(f"❌ Failed to fetch resources: HTTP {response.status_code}")
return None
try:
- resources = response.json().get('data', [])
+ resources = response.json().get("data", [])
except (ValueError, JSONDecodeError):
print(f"❌ Invalid JSON response when fetching resources: {response.text}")
# print(f"❌ Invalid JSON response when fetching resources: {response.text[:200]}")
return None
- return resources[-1]['id'] if resources else None
+ return resources[-1]["id"] if resources else None
def reserve(self, resource_id, start, end):
print(start)
@@ -47,8 +59,8 @@ class BookingClient:
booking = self.session.post(
f"{BOOKING_API_BASE}/order/bookings",
params={
- 'stateless': '1',
- 'include': 'customer,voucher,bookings.booking_add_ons.add_on.cover_image,bookings.sub_bookings.resource,bookings.sub_bookings.service,bookings.customer,bookings.service.custom_forms.custom_fields,bookings.service.add_ons.cover_image,bookings.service.add_ons.group,bookings.cancellation_policy,bookings.resource.cover_image,bookings.resource.parent,bookings.resource.location,bookings.resource.category,bookings.reminders,bookings.booking_series,bookings.sequenced_bookings.resource,bookings.sequenced_bookings.service,bookings.sequenced_bookings.service.add_ons.cover_image,bookings.sequenced_bookings.service.add_ons.group,bookings.booking_participants,sub_orders.bookings,sub_orders.organization.legal_documents'
+ "stateless": "1",
+ "include": "customer,voucher,bookings.booking_add_ons.add_on.cover_image,bookings.sub_bookings.resource,bookings.sub_bookings.service,bookings.customer,bookings.service.custom_forms.custom_fields,bookings.service.add_ons.cover_image,bookings.service.add_ons.group,bookings.cancellation_policy,bookings.resource.cover_image,bookings.resource.parent,bookings.resource.location,bookings.resource.category,bookings.reminders,bookings.booking_series,bookings.sequenced_bookings.resource,bookings.sequenced_bookings.service,bookings.sequenced_bookings.service.add_ons.cover_image,bookings.sequenced_bookings.service.add_ons.group,bookings.booking_participants,sub_orders.bookings,sub_orders.organization.legal_documents",
},
json={
"resource_id": [resource_id],
@@ -60,9 +72,9 @@ class BookingClient:
"add_ons_by_service": {SERVICE_ID: [[]]},
"sub_bookings_by_service": {},
# "booking_quota_grant_id":"24735199",
- "booking_quota_grant_id":"24735202",
- "strategy": "multi-resource"
- }
+ "booking_quota_grant_id": "24735202",
+ "strategy": "multi-resource",
+ },
# data = '{"resource_id":["15994"],"service_id":{"601":1},"start_date":"2026-02-03T22:30:00+01:00","end_date":"2026-02-03T23:30:00+01:00","description":"","customer_note":"","add_ons_by_service":{"601":[[]]},"sub_bookings_by_service":{},"booking_quota_grant_id":"24735199","strategy":"multi-resource"}'
)
@@ -85,7 +97,9 @@ class BookingClient:
print("❌ Missing booking ID or access token in response")
return False
- checkout = self.session.get(f"{CHECKOUT_FORM_API}?oid={oid}&oat={oat}&stateless=1")
+ checkout = self.session.get(
+ f"{CHECKOUT_FORM_API}?oid={oid}&oat={oat}&stateless=1"
+ )
if not checkout.ok:
print(f"❌ Checkout form failed: HTTP {checkout.status_code}")
return False
@@ -103,14 +117,14 @@ class BookingClient:
"customer": {
"given_name": customer.get("given_name"),
"family_name": customer.get("family_name"),
- "email": customer.get("email")
+ "email": customer.get("email"),
},
"accept_terms": True,
"payment_method": "",
"success_url": f"{ANNY_BASE_URL}/checkout/success?oids={oid}&oats={oat}",
"cancel_url": f"{ANNY_BASE_URL}/checkout?step=checkout&childResource={resource_id}",
- "meta": {"timezone": "Europe/Berlin"}
- }
+ "meta": {"timezone": "Europe/Berlin"},
+ },
)
if final.ok:
diff --git a/config/constants.py b/config/constants.py
index 945bb83..75bf241 100644
--- a/config/constants.py
+++ b/config/constants.py
@@ -10,16 +10,16 @@ SERVICE_ID = "601"
# RESOURCE_ID =15994 # 91 height adjustable desk
# RESOURCE_ID =16402 # 222 study room
# RESOURCE_ID =15883 #66 second best
-RESOURCE_ID =15502 # 17 cool snipe with view
+RESOURCE_ID = 15502 # 17 cool snipe with view
TIMEZONE = "Europe/Berlin"
# SSO_PROVIDER = "kit" # Available: kit (add more in auth/providers/)
SSO_PROVIDER = "tum" # Available: kit (add more in auth/providers/)
DEFAULT_HEADERS = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0',
- 'accept': 'application/vnd.api+json',
- 'accept-encoding': 'plain'
+ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0",
+ "accept": "application/vnd.api+json",
+ "accept-encoding": "plain",
}
# Booking time slots (in order of priority)
@@ -39,28 +39,22 @@ DEFAULT_HEADERS = {
BOOKING_TIMES = [
# SNIPE
-# {
-# 'start': '13:00:00',
-# 'end': '18:00:00'
-# },
+ # {
+ # 'start': '13:00:00',
+ # 'end': '18:00:00'
+ # },
# BEST
-# {
-# 'start': '12:00:00',
-# 'end': '22:00:00'
-# },
+ # {
+ # 'start': '12:00:00',
+ # 'end': '22:00:00'
+ # },
# OPTIMAL
# 40/7=5.7
- {
- 'start': '12:00:00',
- 'end': '17:30:00'
- },
-
-
+ {"start": "12:00:00", "end": "17:30:00"},
# {
# 'start': '12:00:00',
# 'end': '22:00:00'
# },
-
# {
# 'start': '22:30:00',
# 'end': '23:30:00'
@@ -73,3 +67,4 @@ BOOKING_TIMES = [
# 'start': '20:00:00',
# 'end': '23:45:00'
# },
+]
diff --git a/debug_sso_redirect.py b/debug_sso_redirect.py
index e72c7b6..4acb275 100644
--- a/debug_sso_redirect.py
+++ b/debug_sso_redirect.py
@@ -5,64 +5,72 @@ import html
AUTH_BASE_URL = "https://auth.anny.eu"
DEFAULT_HEADERS = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0',
- 'accept': 'application/vnd.api+json',
- 'accept-encoding': 'plain'
+ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0",
+ "accept": "application/vnd.api+json",
+ "accept-encoding": "plain",
}
+
def extract_html_value(text, pattern):
match = re.search(pattern, text)
return match.group(1) if match else None
+
def debug_sso():
session = requests.Session()
# 1. Init Headers like AnnySession
- session.headers.update({
- **DEFAULT_HEADERS,
- 'accept': 'text/html, application/xhtml+xml',
- 'referer': AUTH_BASE_URL + '/',
- 'origin': AUTH_BASE_URL
- })
+ session.headers.update(
+ {
+ **DEFAULT_HEADERS,
+ "accept": "text/html, application/xhtml+xml",
+ "referer": AUTH_BASE_URL + "/",
+ "origin": AUTH_BASE_URL,
+ }
+ )
print("--- 1. Initial Access ---")
r1 = session.get(f"{AUTH_BASE_URL}/login/sso")
-
- session.headers['X-XSRF-TOKEN'] = urllib.parse.unquote(r1.cookies['XSRF-TOKEN'])
-
+
+ session.headers["X-XSRF-TOKEN"] = urllib.parse.unquote(r1.cookies["XSRF-TOKEN"])
+
page_data_match = re.search(r'data-page="(.*?)"', r1.text)
if not page_data_match:
print("Could not find data-page")
return
-
- page_data = page_data_match.group(1).replace('"', '"')
+
+ page_data = page_data_match.group(1).replace(""", '"')
version = re.search(r'"version"\s*:\s*"([a-f0-9]{32})"', page_data)
- x_inertia_version = version.group(1) if version else '66b32acea13402d3aef4488ccd239c93'
-
- session.headers.update({
- 'x-requested-with': 'XMLHttpRequest',
- 'x-inertia': 'true',
- 'x-inertia-version': x_inertia_version
- })
-
- print(f"\n--- 2. SSO Request (TUM) ---")
+ x_inertia_version = (
+ version.group(1) if version else "66b32acea13402d3aef4488ccd239c93"
+ )
+
+ session.headers.update(
+ {
+ "x-requested-with": "XMLHttpRequest",
+ "x-inertia": "true",
+ "x-inertia-version": x_inertia_version,
+ }
+ )
+
+ print("\n--- 2. SSO Request (TUM) ---")
r2 = session.post(f"{AUTH_BASE_URL}/login/sso", json={"domain": "tum.de"})
-
- if 'x-inertia-location' not in r2.headers:
+
+ if "x-inertia-location" not in r2.headers:
print("No x-inertia-location header found")
print(r2.text[:500])
return
- redirect_url = r2.headers['x-inertia-location']
+ redirect_url = r2.headers["x-inertia-location"]
print(f"Redirect URL: {redirect_url}")
-
+
print("\n--- 3. Following Redirect (Simulating Provider Handoff) ---")
- headers_to_remove = ['x-requested-with', 'x-inertia', 'x-inertia-version']
+ headers_to_remove = ["x-requested-with", "x-inertia", "x-inertia-version"]
for h in headers_to_remove:
session.headers.pop(h, None)
print("Cleaned Anny-specific headers.")
current_url = redirect_url
-
+
# Loop to handle intermediate pages (e.g. cookie check)
for step in range(1, 6):
print(f"\n--- Step {step}: GET {current_url} ---")
@@ -70,65 +78,66 @@ def debug_sso():
resp = session.get(current_url)
else:
# We are posting from previous step
- pass
+ pass
# Logic handled inside loop
-
- # We need to render the response from the previous action.
+
+ # We need to render the response from the previous action.
# But wait, the loop structure is easier if we just do "process page".
- pass
+ pass
# Re-writing loop logic for clarity
resp = session.get(redirect_url)
-
+
for step in range(1, 6):
print(f"\n--- Step {step}: Processing Page {resp.url} ---")
-
+
# Check for login fields
- if 'j_username' in resp.text:
+ if "j_username" in resp.text:
print("FOUND: j_username field. This is the LOGIN PAGE.")
break
-
+
# Check for error
if "Cookies" in resp.text and "disabled" in resp.text:
print("ERROR: Page says Cookies disabled.")
-
+
# Extract form action
action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"')
if not action_url:
print("No form action found. End of flow?")
print(resp.text[:500])
break
-
- if action_url.startswith('/'):
+
+ if action_url.startswith("/"):
parsed = urllib.parse.urlparse(resp.url)
base = f"{parsed.scheme}://{parsed.netloc}"
action_url = base + action_url
-
+
print(f"Form Action: {action_url}")
-
+
# Extract inputs
inputs = re.findall(r']*name="([^"]+)"[^>]*value="([^"]*)"', resp.text)
data = {name: val for name, val in inputs}
print(f"Hidden inputs: {list(data.keys())}")
-
+
# If it's the cookie check page, there might be specific JS that auto-submits.
# usually just posting the form works.
-
- if 'shib_idp_ls_success.shib_idp_session_ss' in data:
+
+ if "shib_idp_ls_success.shib_idp_session_ss" in data:
print("Detected Shibboleth LocalStorage check.")
-
+
# Submit to move to next page
print("Submitting form to proceed...")
# Ensure csrf if not in inputs
- if 'csrf_token' not in data:
- csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
- if csrf: data['csrf_token'] = csrf
-
+ if "csrf_token" not in data:
+ csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
+ if csrf:
+ data["csrf_token"] = csrf
+
# Add basic things that might be needed
- data['_eventId_proceed'] = ''
-
+ data["_eventId_proceed"] = ""
+
resp = session.post(action_url, data=data)
-
+
else:
print("Max steps reached without finding login page.")
return
@@ -137,32 +146,36 @@ def debug_sso():
print("\n--- Attempting Login on Final Page ---")
action_url = extract_html_value(resp.text, r'form[^>]*action="([^"]+)"')
if action_url:
- if action_url.startswith('/'):
- parsed = urllib.parse.urlparse(resp.url)
- base = f"{parsed.scheme}://{parsed.netloc}"
- action_url = base + action_url
-
+ if action_url.startswith("/"):
+ parsed = urllib.parse.urlparse(resp.url)
+ base = f"{parsed.scheme}://{parsed.netloc}"
+ action_url = base + action_url
+
inputs = re.findall(r']*name="([^"]+)"[^>]*value="([^"]*)"', resp.text)
data = {name: val for name, val in inputs}
-
- data['j_username'] = 'dummy_user'
- data['j_password'] = 'dummy_pass'
- data['_eventId_proceed'] = ''
- data['donotcache'] = '1'
-
- if 'csrf_token' not in data:
- csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
- if csrf: data['csrf_token'] = csrf
-
+
+ data["j_username"] = "dummy_user"
+ data["j_password"] = "dummy_pass"
+ data["_eventId_proceed"] = ""
+ data["donotcache"] = "1"
+
+ if "csrf_token" not in data:
+ csrf = extract_html_value(resp.text, r'name="csrf_token" value="([^"]+)"')
+ if csrf:
+ data["csrf_token"] = csrf
+
print(f"Posting creds to {action_url}")
r4 = session.post(action_url, data=data)
print(f"Result Code: {r4.status_code}")
unescaped = html.unescape(r4.text)
- if "Identifizierung gescheitert" in unescaped or "Authentication failed" in unescaped:
+ if (
+ "Identifizierung gescheitert" in unescaped
+ or "Authentication failed" in unescaped
+ ):
print("SUCCESS: Got expected 'Authentication failed' message.")
else:
- print("Result unknown.")
- print(unescaped[:500])
+ print("Result unknown.")
+ print(unescaped[:500])
if __name__ == "__main__":
diff --git a/main.py b/main.py
index dc37466..e21798b 100644
--- a/main.py
+++ b/main.py
@@ -9,8 +9,9 @@ from utils.helpers import get_future_datetime
import pytz
from config.constants import RESOURCE_ID, TIMEZONE, SSO_PROVIDER, BOOKING_TIMES
+
def main():
- load_dotenv('.env', override=True)
+ load_dotenv(".env", override=True)
username = os.getenv("USERNAME")
password = os.getenv("PASSWORD")
@@ -30,20 +31,22 @@ def main():
# Only wait for midnight if within 10 minutes, otherwise execute immediately
now = datetime.datetime.now(tz)
- midnight = (now + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
+ midnight = (now + datetime.timedelta(days=1)).replace(
+ hour=0, minute=0, second=0, microsecond=0
+ )
seconds_until_midnight = (midnight - now).total_seconds()
max_wait_seconds = 10 * 60 # 10 minutes
if 0 < seconds_until_midnight <= max_wait_seconds:
- print(f"⏳ Waiting {seconds_until_midnight:.0f} seconds until midnight...")
+ print("⏳ Waiting {seconds_until_midnight:.0f} seconds until midnight...")
time.sleep(seconds_until_midnight)
elif seconds_until_midnight > max_wait_seconds:
- print(f"⚡ More than 10 min until midnight, executing immediately...")
+ print("⚡ More than 10 min until midnight, executing immediately...")
for time_ in BOOKING_TIMES:
try:
- start = get_future_datetime(hour=time_['start'])
- end = get_future_datetime(hour=time_['end'])
+ start = get_future_datetime(hour=time_["start"])
+ end = get_future_datetime(hour=time_["end"])
if RESOURCE_ID:
resource_id = RESOURCE_ID
@@ -57,5 +60,6 @@ def main():
except Exception as e:
print(f"❌ Error booking slot {time_['start']}-{time_['end']}: {e}")
+
if __name__ == "__main__":
main()
diff --git a/resource_id_helper.py b/resource_id_helper.py
index d05caaf..f8e9055 100644
--- a/resource_id_helper.py
+++ b/resource_id_helper.py
@@ -4,6 +4,5 @@ with open("nice.json", "r", encoding="utf-8") as f:
data = json.load(f)
for i in data["data"]:
- print ("%-32s id: %s" % (i["attributes"]["name"], i["id"]))
+ print("%-32s id: %s" % (i["attributes"]["name"], i["id"]))
print(type(data)) # usually
-
diff --git a/utils/helpers.py b/utils/helpers.py
index 08b321c..5fd5ad1 100644
--- a/utils/helpers.py
+++ b/utils/helpers.py
@@ -4,10 +4,14 @@ import datetime
import pytz
from config.constants import TIMEZONE
+
def get_future_datetime(days_ahead=4, hour="13:00:00"):
- dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(days=days_ahead)
+ dt = datetime.datetime.now(pytz.timezone(TIMEZONE)) + datetime.timedelta(
+ days=days_ahead
+ )
return dt.strftime(f"%Y-%m-%dT{hour}+01:00")
+
def extract_html_value(text, pattern):
match = re.search(pattern, text)
if not match: