mirror of
https://github.com/arthur-pbty/QCM_physique.git
synced 2026-06-03 23:36:21 +02:00
173 lines
5.8 KiB
Python
173 lines
5.8 KiB
Python
import requests
|
|
from bs4 import BeautifulSoup
|
|
import sqlite3
|
|
import time
|
|
import hashlib
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import unicodedata
|
|
from datetime import datetime
|
|
|
|
# --- CONFIGURATION ---
|
|
URL = "https://alienor.myds.me/~cahierlabo/tmp/qcm_entrainement.html"
|
|
DB_FILE = "qcm.db"
|
|
INTERVAL = 10 * 60 # 10 minutes en secondes
|
|
|
|
# --- SETUP DE LA DB ---
|
|
conn = sqlite3.connect(DB_FILE, timeout=10)
|
|
c = conn.cursor()
|
|
|
|
# Crée la table si elle n'existe pas et ajoute un hash unique pour éviter les doublons
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS questions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
question TEXT,
|
|
question_hash TEXT UNIQUE,
|
|
answers TEXT,
|
|
last_scraped TEXT,
|
|
UNIQUE(question)
|
|
)
|
|
''')
|
|
# Index unique sur le hash pour garantir unicité même si le texte a de légères différences
|
|
c.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_question_hash ON questions(question_hash)')
|
|
conn.commit()
|
|
|
|
# --- FONCTIONS ---
|
|
def fetch_qcm(session):
|
|
logging.info("Récupération du QCM...")
|
|
r = session.get(URL, timeout=10)
|
|
r.raise_for_status()
|
|
soup = BeautifulSoup(r.text, "html.parser")
|
|
|
|
questions_data = []
|
|
|
|
# Récupère toutes les questions
|
|
li_questions = soup.select("li.QuizQuestion")
|
|
from urllib.parse import urljoin
|
|
|
|
for qi, q in enumerate(li_questions):
|
|
qt_elem = q.select_one(".QuestionText")
|
|
question_html = qt_elem.decode_contents() if qt_elem else ""
|
|
|
|
# convertir les src d'images en URLs absolues et récupérer la liste des images
|
|
q_soup = BeautifulSoup(question_html, "html.parser")
|
|
images = []
|
|
for img in q_soup.find_all('img'):
|
|
src = img.get('src')
|
|
if src:
|
|
src_abs = urljoin(URL, src)
|
|
img['src'] = src_abs
|
|
images.append(src_abs)
|
|
|
|
# question_html modifié avec src absolues
|
|
question_html = str(q_soup)
|
|
# texte brut pour le hash/normalisation
|
|
question_text = q_soup.get_text(" ", strip=True)
|
|
|
|
# Récupère les réponses
|
|
li_answers = q.select("ol li")
|
|
answers_list = []
|
|
for ai, li in enumerate(li_answers):
|
|
text = li.get_text(strip=True)
|
|
# enlever le ou les '?' de début (le site utilise un bouton avec '?')
|
|
import re
|
|
text = re.sub(r'^[\?\s]+', '', text)
|
|
|
|
# On essaie de trouver si la réponse est bonne depuis le JS I
|
|
# On récupère l'array I depuis le script
|
|
try:
|
|
# Cherche le script contenant I = [...]
|
|
script_tag = soup.find("script", text=lambda t: t and "I[" in t)
|
|
js_text = script_tag.string if script_tag else ""
|
|
|
|
# Cherche le pattern correspondant à la bonne réponse
|
|
# Exemple : I[0][3][0]=new Array('réponse', '', 1, 100, 1);
|
|
import re
|
|
pattern = re.compile(rf"I\[{qi}\]\[3\]\[{ai}\]=new Array\('.*?','',(\d),\d+,\d+\);")
|
|
match = pattern.search(js_text)
|
|
correct = match.group(1) == '1' if match else False
|
|
except Exception:
|
|
correct = False
|
|
|
|
answers_list.append({"text": text, "correct": correct})
|
|
|
|
questions_data.append({
|
|
"question": question_html,
|
|
"question_text": question_text,
|
|
"images": images,
|
|
"answers": answers_list,
|
|
"last_scraped": datetime.utcnow().isoformat()
|
|
})
|
|
|
|
return questions_data
|
|
|
|
def save_to_db(questions):
|
|
import json
|
|
for q in questions:
|
|
answers_json = json.dumps(q["answers"], ensure_ascii=False)
|
|
|
|
# Calcule un hash normalisé de la question (utilise le texte brut) pour empêcher doublons
|
|
q_norm = normalize_question(q.get("question_text") or q.get("question"))
|
|
qhash = hashlib.sha256(q_norm.encode('utf-8')).hexdigest()
|
|
last_scraped = q.get('last_scraped') or datetime.utcnow().isoformat()
|
|
|
|
# Insert ou update selon si le hash existe déjà
|
|
c.execute('''
|
|
INSERT INTO questions(question, question_hash, answers, last_scraped)
|
|
VALUES(?, ?, ?, ?)
|
|
ON CONFLICT(question_hash) DO UPDATE SET question=excluded.question, answers=excluded.answers, last_scraped=excluded.last_scraped
|
|
''', (q["question"], qhash, answers_json, last_scraped))
|
|
conn.commit()
|
|
logging.info(f"{len(questions)} questions sauvegardées / mises à jour dans la DB.")
|
|
|
|
|
|
def normalize_question(text):
|
|
# Normalise unicode, retire espaces multiples et passe en minuscule
|
|
if not text:
|
|
return ''
|
|
# si le texte contient du HTML, extraire le texte brut
|
|
if '<' in text and '>' in text:
|
|
try:
|
|
text = BeautifulSoup(text, 'html.parser').get_text(' ', strip=True)
|
|
except Exception:
|
|
pass
|
|
|
|
s = unicodedata.normalize('NFKC', text)
|
|
s = ' '.join(s.split())
|
|
return s.strip().lower()
|
|
|
|
# --- LOGGING ET SIGNALS ---
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
|
|
|
def shutdown(signum, frame):
|
|
logging.info("Arrêt demandé, fermeture de la DB.")
|
|
try:
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, shutdown)
|
|
signal.signal(signal.SIGTERM, shutdown)
|
|
|
|
# --- BOUCLE PRINCIPALE ---
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
session = requests.Session()
|
|
retries = Retry(total=3, backoff_factor=1, status_forcelist=[429,500,502,503,504])
|
|
session.mount('https://', HTTPAdapter(max_retries=retries))
|
|
session.mount('http://', HTTPAdapter(max_retries=retries))
|
|
|
|
while True:
|
|
try:
|
|
data = fetch_qcm(session)
|
|
save_to_db(data)
|
|
except Exception as e:
|
|
logging.exception("Erreur lors de la récupération ou sauvegarde:")
|
|
|
|
logging.info(f"Attente {INTERVAL//60} minutes...")
|
|
time.sleep(INTERVAL)
|