Files
OSU-Public-Wi-Fi-Login/osu_wifi_login/portal.py

538 lines
20 KiB
Python

from __future__ import annotations
import logging
import re
import time
from dataclasses import dataclass, field
from html.parser import HTMLParser
from urllib.parse import urljoin
import requests
import urllib3
try:
from playwright.sync_api import Error as PlaywrightError
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
except ImportError: # pragma: no cover - depends on local environment
PlaywrightError = RuntimeError
PlaywrightTimeoutError = TimeoutError
sync_playwright = None
from .config import PortalConfig, SeleniumConfig
class PortalLoginError(RuntimeError):
"""Raised when the captive portal login fails."""
@dataclass(slots=True)
class ParsedPortalForm:
action: str
method: str
inputs: list[dict[str, str | None]] = field(default_factory=list)
textareas: dict[str, str] = field(default_factory=dict)
class _PortalFormParser(HTMLParser):
def __init__(self, accept_terms_name: str) -> None:
super().__init__(convert_charrefs=True)
self.accept_terms_name = accept_terms_name
self.portal_form: ParsedPortalForm | None = None
self._form_stack: list[ParsedPortalForm] = []
self._document_inputs: list[dict[str, str | None]] = []
self._document_textareas: dict[str, str] = {}
self._textarea_name: str | None = None
self._textarea_buffer: list[str] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_dict = {key: value for key, value in attrs}
tag_lower = tag.casefold()
if tag_lower == "form":
method = (attrs_dict.get("method") or "post").casefold()
action = attrs_dict.get("action") or ""
self._form_stack.append(ParsedPortalForm(action=action, method=method))
if tag_lower == "input":
self._document_inputs.append(attrs_dict)
if self._form_stack:
self._form_stack[-1].inputs.append(attrs_dict)
return
if tag_lower == "textarea":
self._textarea_name = attrs_dict.get("name")
self._textarea_buffer = []
return
def handle_data(self, data: str) -> None:
if self._textarea_name is not None:
self._textarea_buffer.append(data)
def handle_endtag(self, tag: str) -> None:
tag_lower = tag.casefold()
if tag_lower == "textarea" and self._textarea_name:
text = "".join(self._textarea_buffer)
self._document_textareas[self._textarea_name] = text
if self._form_stack:
self._form_stack[-1].textareas[self._textarea_name] = text
self._textarea_name = None
self._textarea_buffer = []
return
if tag_lower == "form" and self._form_stack:
completed_form = self._form_stack.pop()
if self.portal_form is None and self._is_portal_form(completed_form):
self.portal_form = completed_form
def _is_portal_form(self, form: ParsedPortalForm) -> bool:
for input_attrs in form.inputs:
if (input_attrs.get("name") or "").strip() == self.accept_terms_name:
return True
return False
def build_fallback_form(self) -> ParsedPortalForm | None:
for input_attrs in self._document_inputs:
if (input_attrs.get("name") or "").strip() == self.accept_terms_name:
return ParsedPortalForm(
action="",
method="post",
inputs=list(self._document_inputs),
textareas=dict(self._document_textareas),
)
return None
class CaptivePortalLogin:
def __init__(
self,
portal: PortalConfig,
selenium_config: SeleniumConfig,
logger: logging.Logger,
) -> None:
self.portal = portal
self.selenium_config = selenium_config
self.logger = logger
def login(self) -> None:
if not self.login_if_present():
raise PortalLoginError("Could not reach the OSU captive portal")
def login_if_present(self) -> bool:
browser_result = self._login_with_browser()
if browser_result:
return True
self.logger.info("Playwright-based portal automation did not detect login controls; trying HTTP fallback")
last_error: Exception | None = None
saw_successful_response = False
with requests.Session() as session:
session.verify = False
session.headers.update(
{
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0"
),
},
)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
for attempt in range(1, self.selenium_config.max_login_retries + 1):
self.logger.info(
"Portal login attempt %s/%s",
attempt,
self.selenium_config.max_login_retries,
)
for url in self.portal.trigger_urls:
self.logger.info("Opening captive portal trigger URL: %s", url)
try:
response = session.get(
url,
timeout=self.selenium_config.page_load_timeout_seconds,
allow_redirects=True,
)
except requests.RequestException as exc:
last_error = exc
continue
saw_successful_response = True
if response.url != url:
self.logger.info("Portal trigger resolved to %s", response.url)
form = self._extract_portal_form(response.text)
if form is None:
continue
self.logger.info("Captive portal form detected at %s", response.url)
try:
self._submit_form(session, response.url, form)
time.sleep(2)
return True
except PortalLoginError as exc:
last_error = exc
self.logger.warning(
"HTTP captive portal submission failed, falling back to browser automation: %s",
exc,
)
break
if attempt < self.selenium_config.max_login_retries:
time.sleep(2)
if saw_successful_response:
self.logger.info("Captive portal page was not detected")
return False
raise PortalLoginError(str(last_error or "Unknown captive portal request error"))
def _extract_portal_form(self, html: str) -> ParsedPortalForm | None:
parser = _PortalFormParser(self.portal.accept_terms_name)
parser.feed(html)
parser.close()
return parser.portal_form or parser.build_fallback_form()
def _submit_form(
self,
session: requests.Session,
page_url: str,
form: ParsedPortalForm,
) -> None:
payload = self._build_payload(form)
action_url = urljoin(page_url, form.action or page_url)
method = form.method or "post"
self.logger.info("Submitting captive portal form to %s", action_url)
try:
if method == "get":
response = session.get(
action_url,
params=payload,
timeout=self.selenium_config.page_load_timeout_seconds,
allow_redirects=True,
)
else:
response = session.post(
action_url,
data=payload,
timeout=self.selenium_config.page_load_timeout_seconds,
allow_redirects=True,
)
except requests.RequestException as exc:
raise PortalLoginError(f"Submitting captive portal form failed: {exc}") from exc
if response.status_code >= 400:
raise PortalLoginError(
f"Captive portal submission returned HTTP {response.status_code}",
)
def _build_payload(self, form: ParsedPortalForm) -> dict[str, str]:
payload: dict[str, str] = {}
submit_added = False
for input_attrs in form.inputs:
name = (input_attrs.get("name") or "").strip()
if not name:
continue
input_type = (input_attrs.get("type") or "text").casefold()
value = input_attrs.get("value") or ""
if input_type in {"checkbox", "radio"}:
if name == self.portal.accept_terms_name:
payload[name] = value or "on"
elif "checked" in input_attrs:
payload[name] = value or "on"
continue
if input_type in {"submit", "button", "image"}:
lowered = value.casefold()
if not submit_added and ("log in" in lowered or "login" in lowered or not lowered):
payload[name] = value
submit_added = True
continue
if input_type == "file":
continue
payload[name] = value
for name, text in form.textareas.items():
payload.setdefault(name, text)
if self.portal.accept_terms_name not in payload:
raise PortalLoginError(
f"Captive portal form did not contain '{self.portal.accept_terms_name}' payload data",
)
return payload
def _login_with_browser(self) -> bool:
if sync_playwright is None:
raise PortalLoginError(
"Playwright is not installed. Run 'uv sync' before using browser-based portal automation.",
)
try:
with sync_playwright() as playwright:
browser = playwright.chromium.launch(
channel="msedge",
headless=self.selenium_config.headless,
args=["--ignore-certificate-errors"],
)
context = browser.new_context(ignore_https_errors=True)
page = context.new_page()
try:
for url in self.portal.trigger_urls:
self.logger.info("Opening captive portal trigger URL in browser: %s", url)
try:
page.goto(
url,
wait_until="domcontentloaded",
timeout=self.selenium_config.page_load_timeout_seconds * 1000,
)
except KeyboardInterrupt:
raise
except PlaywrightTimeoutError:
self.logger.warning("Timed out opening portal trigger URL in browser: %s", url)
continue
page.wait_for_timeout(1500)
if page.url != url:
self.logger.info("Browser portal trigger resolved to %s", page.url)
if self._submit_portal_in_browser(page):
return True
finally:
browser.close()
except KeyboardInterrupt:
raise
except PlaywrightError as exc:
raise PortalLoginError(
f"Playwright browser automation failed: {exc}",
) from exc
self.logger.info("Playwright-based portal page was not detected")
return False
def _submit_portal_in_browser(self, page) -> bool:
for frame in page.frames:
checkbox = self._find_accept_control(frame)
if checkbox is None:
continue
frame_url = frame.url or page.url
self.logger.info("Captive portal controls detected in browser frame: %s", frame_url)
self._activate_accept_control(checkbox)
login_button = self._find_login_button(frame)
if login_button is None:
raise PortalLoginError("Found the Agree control, but could not find the login button")
self.logger.info("Clicking captive portal login button in browser")
submitted_with_js = self._submit_osu_guest_form(frame)
if not submitted_with_js:
login_button.click(timeout=self.selenium_config.element_timeout_seconds * 1000, force=True)
try:
page.wait_for_load_state("networkidle", timeout=10000)
except PlaywrightTimeoutError:
pass
page.wait_for_timeout(5000)
self.logger.info("Submitted portal form in browser; current URL is %s", page.url)
self._log_page_status(page)
self._log_visible_portal_errors(page)
return True
return False
def _find_accept_control(self, scope):
candidate_selectors = [
f'[name="{self.portal.accept_terms_name}"]',
f'input[name="{self.portal.accept_terms_name}"]',
'input[type="checkbox"]',
'text=/agree/i',
'text=/accept/i',
]
for selector in candidate_selectors:
locator = scope.locator(selector).first
if locator.count() > 0:
return locator
return None
def _activate_accept_control(self, locator) -> None:
input_type = (locator.get_attribute("type") or "").casefold()
if input_type in {"checkbox", "radio"}:
try:
locator.check(timeout=self.selenium_config.element_timeout_seconds * 1000, force=True)
locator.evaluate(
"""(el) => {
el.checked = true;
el.dispatchEvent(new Event('input', { bubbles: true }));
el.dispatchEvent(new Event('change', { bubbles: true }));
}""",
)
return
except PlaywrightError:
pass
try:
locator.click(timeout=self.selenium_config.element_timeout_seconds * 1000, force=True)
except PlaywrightError as exc:
try:
locator.evaluate(
"""(el) => {
if ('checked' in el) {
el.checked = true;
}
el.dispatchEvent(new Event('input', { bubbles: true }));
el.dispatchEvent(new Event('change', { bubbles: true }));
if (typeof el.click === 'function') {
el.click();
}
}""",
)
except PlaywrightError as inner_exc:
raise PortalLoginError(
f"Failed to activate the Agree control: {inner_exc}",
) from exc
def _find_login_button(self, scope):
role_locator = scope.get_by_role("button", name=re.compile(r"log\s*in", re.I)).first
if role_locator.count() > 0:
return role_locator
selector_candidates = [
f"xpath={self.portal.login_button_xpath}",
'input[type="submit"]',
'input[type="button"]',
'button',
'text=/log\\s*in/i',
]
for selector in selector_candidates:
locator = scope.locator(selector)
count = locator.count()
for index in range(count):
candidate = locator.nth(index)
label = " ".join(
filter(
None,
[
candidate.get_attribute("value") or "",
candidate.get_attribute("aria-label") or "",
candidate.text_content() or "",
],
),
).strip()
if selector == f"xpath={self.portal.login_button_xpath}" or re.search(r"log\s*in", label, re.I):
return candidate
return None
def _submit_osu_guest_form(self, scope) -> bool:
try:
result = scope.evaluate(
"""() => {
const checkbox = document.querySelector('[name="visitor_accept_terms"]');
if (!checkbox) {
return { submitted: false, reason: 'checkbox not found' };
}
checkbox.checked = true;
checkbox.value = checkbox.value || '1';
checkbox.dispatchEvent(new Event('input', { bubbles: true }));
checkbox.dispatchEvent(new Event('change', { bubbles: true }));
if (!checkbox.checked) {
checkbox.checked = true;
}
const form = checkbox.closest('form') || document.querySelector('form[name$="_weblogin"], form[id$="_weblogin"]');
if (!form) {
return { submitted: false, reason: 'form not found' };
}
const submitButton = form.querySelector('input[type="submit"], button[type="submit"], button');
if (submitButton) {
submitButton.disabled = false;
}
const state = {
checked: checkbox.checked,
value: checkbox.value,
formName: form.name || form.id || '',
submitId: submitButton ? submitButton.id : '',
};
if (typeof window.Nwa_SubmitForm === 'function') {
const submitId = submitButton ? submitButton.id : '';
checkbox.checked = true;
checkbox.dispatchEvent(new Event('change', { bubbles: true }));
window.Nwa_SubmitForm(form.name || form.id, submitId);
return { submitted: true, via: 'Nwa_SubmitForm', state };
}
if (typeof form.requestSubmit === 'function') {
form.requestSubmit(submitButton || undefined);
return { submitted: true, via: 'requestSubmit', state };
}
form.submit();
return { submitted: true, via: 'form.submit', state };
}""",
)
except PlaywrightError as exc:
self.logger.warning("Direct portal form submission failed: %s", exc)
return False
if result and result.get("submitted"):
self.logger.info(
"Submitted captive portal form via %s with state %s",
result.get("via"),
result.get("state"),
)
return True
self.logger.info("Direct portal form submission was not available: %s", result)
return False
def _log_visible_portal_errors(self, page) -> None:
try:
errors: list[str] = []
for selector in [".nwaError", ".nwaErrorBorder", "[role='alert']"]:
locator = page.locator(selector)
count = locator.count()
for index in range(count):
text = (locator.nth(index).inner_text(timeout=1000) or "").strip()
if text:
errors.append(text)
if errors:
self.logger.warning("Portal page reported: %s", " | ".join(errors[:3]))
except PlaywrightError:
return
def _log_page_status(self, page) -> None:
try:
body_text = " ".join((page.locator("body").inner_text(timeout=1500) or "").split())
except PlaywrightError:
return
if not body_text:
return
interesting_patterns = [
r"success",
r"authenticated",
r"logged\s+in",
r"access\s+granted",
r"error",
r"failed",
r"denied",
r"must\s+accept",
r"terms\s+and\s+conditions",
r"network\s+access\s+login",
]
if any(re.search(pattern, body_text, re.I) for pattern in interesting_patterns):
self.logger.info("Portal page status text: %s", body_text[:500])