import sys
import urllib.parse
import base64
import re
import json
import os
from mitmproxy import http
from mitmproxy.tools.main import mitmdump
# --- Configuration ---
TARGET_URL = "http://baodao.*.edu.cn/XSSC/Question.aspx?tid="
JSON_FILE = "question_bank.json"
# DEBUG_LOG_FILE = "viewstate_dump.txt" # Debug log file (disabled)
# --- End Configuration ---
# def log_debug(message): # Debug log function (disabled)
# """Print debug info"""
# # print(f"[DEBUG] {message}")
# pass
# def dump_to_file(content): # Dump function (disabled)
# """Write raw ViewState to file for analysis"""
# # try:
# # with open(DEBUG_LOG_FILE, 'a', encoding='utf-8') as f:
# # f.write(content + "\n" + "="*20 + "\n")
# # except Exception as e:
# # print(f"[ERROR] Unable to write debug file: {e}")
# pass
def load_question_bank():
if os.path.exists(JSON_FILE):
try:
with open(JSON_FILE, 'r', encoding='utf-8') as f:
# Use set to store question strings of known questions for quick lookup
bank_data = json.load(f)
known_questions = set(bank_data.keys())
return bank_data, known_questions
except json.JSONDecodeError:
print(f"[ERROR] {JSON_FILE} file corrupted, creating new file.")
return {}, set()
except Exception as e:
print(f"[ERROR] Error loading question bank: {e}")
return {}, set()
return {}, set()
def save_question_bank(bank):
try:
with open(JSON_FILE, 'w', encoding='utf-8') as f:
json.dump(bank, f, ensure_ascii=False, indent=2)
except IOError as e:
print(f"[ERROR] Unable to write question bank file: {e}")
class ViewStateSniffer:
def __init__(self):
self.bank, self.known_questions = load_question_bank()
print(f"[INFO] Starting ViewState sniffer... Loaded {len(self.bank)} questions.")
print(f"[INFO] Intercept target: {TARGET_URL}")
print(f"[INFO] Question bank file: {JSON_FILE}")
# print(f"[INFO] Debug log: {DEBUG_LOG_FILE} (Raw ViewState will be dumped here)") # Disabled
# Clear previous debug log (disabled)
# if os.path.exists(DEBUG_LOG_FILE):
# os.remove(DEBUG_LOG_FILE)
def parse_viewstate_bytes(self, data: bytes):
# log_debug("Starting to parse ViewState binary data...")
try:
# 1. Decode to UTF-8 text
text = data.decode('utf-8', errors='ignore')
# log_debug(f"UTF-8 decoded text (first 500 chars): {text[:500]}")
# 2. Extract question
q_match = re.search(r'(\d+\.\s.*?(\s*))', text)
if not q_match:
q_match = re.search(r'(\d+\..*?\([^\)]*\))', text) # Compatible with English parentheses
if not q_match:
# log_debug("[PARSE_FAIL] Step 2 failed: Question not found")
return None, [], None
question = q_match.group(1).strip()
# log_debug(f"[PARSE_OK] Step 2 success: Found question: {question}")
# 3. Extract options
options_text = text[q_match.end():]
# log_debug(f"Starting to search for options in the following text (first 300 chars): {options_text[:300]}")
options_matches = re.findall(r'([A-D]\.[^\x00#,<]+)', options_text)
options = [opt.strip().rstrip(',').rstrip('#').strip() for opt in options_matches][:4]
if len(options) != 4:
# log_debug(f"[PARSE_FAIL] Step 3 failed: Found {len(options)} options, not equal to 4.")
# log_debug(f"Found options: {options}")
# log_debug("Attempting fallback option extraction logic...")
options = []
temp_options_text = text[q_match.end():]
for opt_char in ['A', 'B', 'C', 'D']:
opt_match = re.search(rf'({opt_char}\.[^\x00]+?)(?=[A-D]\.|\x00|\x14|\x15|$)', temp_options_text)
if opt_match:
options.append(opt_match.group(1).strip().replace('\x00', ''))
temp_options_text = temp_options_text[opt_match.end():]
else:
if len(options) < 3:
# log_debug(f"Fallback logic interrupted when searching for '{opt_char}.'.")
options = []
break
if len(options) != 4:
# log_debug(f"Fallback logic failed, found {len(options)} options.")
return None, [], None
# Clean up invisible characters or binary fragments that may be mixed in
cleaned_options = []
for opt in options:
# Find the position of the first non-text common character
end_pos = -1
for i, char in enumerate(opt):
# Allow alphanumeric, Chinese, punctuation, spaces, tabs, newlines
if not ('\u4e00' <= char <= '\u9fff' or 'a' <= char.lower() <= 'z' or '0' <= char <= '9' or char in '().,,。() \t\n'):
if i > 2: # Keep at least prefixes like A.
end_pos = i
break
if end_pos != -1:
cleaned_options.append(opt[:end_pos].strip())
else:
cleaned_options.append(opt.strip())
options = cleaned_options
# Check option count again
if len(options) != 4:
# log_debug(f"Option count after cleanup ({len(options)}) is still not 4, skipping.")
return None, [], None
# log_debug(f"[PARSE_OK] Step 3 success: Found options: {options}")
# 4. Extract answer array
ans_match = re.search(b'\x01([\x30\x31])\x01([\x30\x31])\x01([\x30\x31])\x01([\x30\x31])', data)
if not ans_match:
# log_debug("[PARSE_FAIL] Step 4 failed: Answer array not found")
return None, [], None
answers = [ans_match.group(i).decode() for i in range(1, 5)]
# log_debug(f"[PARSE_OK] Step 4 success: Found answer array: {answers}")
# Handle multiple-choice questions, actually useless, multiple-choice answers are not exposed in the request body
# 5. Find all correct answer indices
correct_indices = [i for i, val in enumerate(answers) if val == '1']
if not correct_indices:
# log_debug("[PARSE_FAIL] Step 5 failed: No '1' in answer array.")
return None, [], None
# Check if indices are valid
if any(index >= len(options) for index in correct_indices):
# log_debug(f"[PARSE_FAIL] Step 5 failed: Found correct answer indices {correct_indices} contain values exceeding option list length {len(options)}")
return None, [], None
# Combine text of all correct answers
correct_answer_texts = [options[i] for i in correct_indices]
correct_answer = ", ".join(correct_answer_texts) # Separate multiple answers with commas
# log_debug(f"[PARSE_OK] Step 5 success: Correct answer is {correct_answer}")
return question, options, correct_answer
except Exception as e:
print(f"[ERROR] Severe exception occurred while parsing ViewState: {e}")
import traceback
traceback.print_exc()
return None, [], None
def request(self, flow: http.HTTPFlow):
if not flow.request.pretty_url.startswith(TARGET_URL):
return
if flow.request.method != "POST" or not flow.request.content:
return
# log_debug(f"Intercepted target POST request: {flow.request.pretty_url}")
try:
form_data = flow.request.get_text()
parsed_data = urllib.parse.parse_qs(form_data)
except Exception as e:
print(f"[ERROR] Failed to parse POST form data: {e}")
return
if '__VIEWSTATE' not in parsed_data:
# log_debug("__VIEWSTATE field not found in request.")
return
viewstate_b64 = parsed_data['__VIEWSTATE'][0]
# log_debug(f"Found __VIEWSTATE (length: {len(viewstate_b64)}). Dumping to {DEBUG_LOG_FILE}...") # Disabled
# dump_to_file(viewstate_b64) # Disabled
try:
decoded_bytes = base64.b64decode(viewstate_b64.encode('latin-1'))
except Exception as e:
print(f"[ERROR] Base64 decoding ViewState failed: {e}")
return
question, options, correct_answer = self.parse_viewstate_bytes(decoded_bytes)
if not question:
# log_debug("Complete question info not found in this ViewState, skipping.") # Disabled
# print("-" * 40) # Disabled
return
# Only print and save if the question is new
if question not in self.known_questions:
tid = flow.request.query.get('tid', 'unknown')
print("=" * 40)
print(f"✅ [Sniff Success] - Intercepted a new question")
print(f" TID: {tid}")
print(f" Question: {question}")
for opt in options:
print(f" {opt}")
print(f" [!!] Correct Answer: {correct_answer}")
print("=" * 40)
self.bank[question] = {
'tid': tid,
'question': question,
'options': options,
'correct_answer': correct_answer
}
self.known_questions.add(question) # Add new question to known set
save_question_bank(self.bank)
print(f"[INFO] New question found! Saved to {JSON_FILE}")
# else: # If question is known, handle silently
# log_debug(f"Question '{question[:30]}...' already exists in bank, skipping print.")
# Mitmproxy addon entry point
addons = [
ViewStateSniffer()
]
if __name__ == "__main__":
# Extract domain from TARGET_URL
try:
domain = urllib.parse.urlparse(TARGET_URL).hostname
if not domain:
raise ValueError("Unable to extract domain from TARGET_URL")
except Exception as e:
print(f"[CRITICAL] Unable to parse TARGET_URL: {e}")
print("Please ensure TARGET_URL format is correct (e.g., http://example.com/path)")
sys.exit(1)
print("Mitmproxy ViewState sniffing script starting...")
print(f"Please point your device proxy to the port mitmproxy is running on (default is 8080)")
print(f"[INFO] Console filter set, will only display traffic from <{domain}>.")
sys.argv.append('-s')
sys.argv.append(__file__)
sys.argv.append('-q')
# Add mitmproxy filter expression, only display traffic for target domain
sys.argv.append(f"~d ^{domain}$") # Use ^ and $ to ensure exact match
mitmdump()