import requests from bs4 import BeautifulSoup import sqlite3 import time import hashlib import logging import signal import sys import unicodedata from datetime import datetime # --- CONFIGURATION --- URL = "https://alienor.myds.me/~cahierlabo/tmp/qcm_entrainement.html" DB_FILE = "qcm.db" INTERVAL = 10 * 60 # 10 minutes en secondes # --- SETUP DE LA DB --- conn = sqlite3.connect(DB_FILE, timeout=10) c = conn.cursor() # Crée la table si elle n'existe pas et ajoute un hash unique pour éviter les doublons c.execute(''' CREATE TABLE IF NOT EXISTS questions ( id INTEGER PRIMARY KEY AUTOINCREMENT, question TEXT, question_hash TEXT UNIQUE, answers TEXT, last_scraped TEXT, UNIQUE(question) ) ''') # Index unique sur le hash pour garantir unicité même si le texte a de légères différences c.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_question_hash ON questions(question_hash)') conn.commit() # --- FONCTIONS --- def fetch_qcm(session): logging.info("Récupération du QCM...") r = session.get(URL, timeout=10) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") questions_data = [] # Récupère toutes les questions li_questions = soup.select("li.QuizQuestion") from urllib.parse import urljoin for qi, q in enumerate(li_questions): qt_elem = q.select_one(".QuestionText") question_html = qt_elem.decode_contents() if qt_elem else "" # convertir les src d'images en URLs absolues et récupérer la liste des images q_soup = BeautifulSoup(question_html, "html.parser") images = [] for img in q_soup.find_all('img'): src = img.get('src') if src: src_abs = urljoin(URL, src) img['src'] = src_abs images.append(src_abs) # question_html modifié avec src absolues question_html = str(q_soup) # texte brut pour le hash/normalisation question_text = q_soup.get_text(" ", strip=True) # Récupère les réponses li_answers = q.select("ol li") answers_list = [] for ai, li in enumerate(li_answers): text = li.get_text(strip=True) # enlever le ou les '?' de début (le site utilise un bouton avec '?') import re text = re.sub(r'^[\?\s]+', '', text) # On essaie de trouver si la réponse est bonne depuis le JS I # On récupère l'array I depuis le script try: # Cherche le script contenant I = [...] script_tag = soup.find("script", text=lambda t: t and "I[" in t) js_text = script_tag.string if script_tag else "" # Cherche le pattern correspondant à la bonne réponse # Exemple : I[0][3][0]=new Array('réponse', '', 1, 100, 1); import re pattern = re.compile(rf"I\[{qi}\]\[3\]\[{ai}\]=new Array\('.*?','',(\d),\d+,\d+\);") match = pattern.search(js_text) correct = match.group(1) == '1' if match else False except Exception: correct = False answers_list.append({"text": text, "correct": correct}) questions_data.append({ "question": question_html, "question_text": question_text, "images": images, "answers": answers_list, "last_scraped": datetime.utcnow().isoformat() }) return questions_data def save_to_db(questions): import json for q in questions: answers_json = json.dumps(q["answers"], ensure_ascii=False) # Calcule un hash normalisé de la question (utilise le texte brut) pour empêcher doublons q_norm = normalize_question(q.get("question_text") or q.get("question")) qhash = hashlib.sha256(q_norm.encode('utf-8')).hexdigest() last_scraped = q.get('last_scraped') or datetime.utcnow().isoformat() # Insert ou update selon si le hash existe déjà c.execute(''' INSERT INTO questions(question, question_hash, answers, last_scraped) VALUES(?, ?, ?, ?) ON CONFLICT(question_hash) DO UPDATE SET question=excluded.question, answers=excluded.answers, last_scraped=excluded.last_scraped ''', (q["question"], qhash, answers_json, last_scraped)) conn.commit() logging.info(f"{len(questions)} questions sauvegardées / mises à jour dans la DB.") def normalize_question(text): # Normalise unicode, retire espaces multiples et passe en minuscule if not text: return '' # si le texte contient du HTML, extraire le texte brut if '<' in text and '>' in text: try: text = BeautifulSoup(text, 'html.parser').get_text(' ', strip=True) except Exception: pass s = unicodedata.normalize('NFKC', text) s = ' '.join(s.split()) return s.strip().lower() # --- LOGGING ET SIGNALS --- logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') def shutdown(signum, frame): logging.info("Arrêt demandé, fermeture de la DB.") try: conn.commit() conn.close() except Exception: pass sys.exit(0) signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown) # --- BOUCLE PRINCIPALE --- from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry session = requests.Session() retries = Retry(total=3, backoff_factor=1, status_forcelist=[429,500,502,503,504]) session.mount('https://', HTTPAdapter(max_retries=retries)) session.mount('http://', HTTPAdapter(max_retries=retries)) while True: try: data = fetch_qcm(session) save_to_db(data) except Exception as e: logging.exception("Erreur lors de la récupération ou sauvegarde:") logging.info(f"Attente {INTERVAL//60} minutes...") time.sleep(INTERVAL)