mirror of
https://github.com/arthur-pbty/QCM_physique.git
synced 2026-06-21 05:44:43 +02:00
first commit
This commit is contained in:
@@ -0,0 +1,172 @@
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
import sqlite3
|
||||
import time
|
||||
import hashlib
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import unicodedata
|
||||
from datetime import datetime
|
||||
|
||||
# --- CONFIGURATION ---
|
||||
URL = "https://alienor.myds.me/~cahierlabo/tmp/qcm_entrainement.html"
|
||||
DB_FILE = "qcm.db"
|
||||
INTERVAL = 10 * 60 # 10 minutes en secondes
|
||||
|
||||
# --- SETUP DE LA DB ---
|
||||
conn = sqlite3.connect(DB_FILE, timeout=10)
|
||||
c = conn.cursor()
|
||||
|
||||
# Crée la table si elle n'existe pas et ajoute un hash unique pour éviter les doublons
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS questions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
question TEXT,
|
||||
question_hash TEXT UNIQUE,
|
||||
answers TEXT,
|
||||
last_scraped TEXT,
|
||||
UNIQUE(question)
|
||||
)
|
||||
''')
|
||||
# Index unique sur le hash pour garantir unicité même si le texte a de légères différences
|
||||
c.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_question_hash ON questions(question_hash)')
|
||||
conn.commit()
|
||||
|
||||
# --- FONCTIONS ---
|
||||
def fetch_qcm(session):
|
||||
logging.info("Récupération du QCM...")
|
||||
r = session.get(URL, timeout=10)
|
||||
r.raise_for_status()
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
|
||||
questions_data = []
|
||||
|
||||
# Récupère toutes les questions
|
||||
li_questions = soup.select("li.QuizQuestion")
|
||||
from urllib.parse import urljoin
|
||||
|
||||
for qi, q in enumerate(li_questions):
|
||||
qt_elem = q.select_one(".QuestionText")
|
||||
question_html = qt_elem.decode_contents() if qt_elem else ""
|
||||
|
||||
# convertir les src d'images en URLs absolues et récupérer la liste des images
|
||||
q_soup = BeautifulSoup(question_html, "html.parser")
|
||||
images = []
|
||||
for img in q_soup.find_all('img'):
|
||||
src = img.get('src')
|
||||
if src:
|
||||
src_abs = urljoin(URL, src)
|
||||
img['src'] = src_abs
|
||||
images.append(src_abs)
|
||||
|
||||
# question_html modifié avec src absolues
|
||||
question_html = str(q_soup)
|
||||
# texte brut pour le hash/normalisation
|
||||
question_text = q_soup.get_text(" ", strip=True)
|
||||
|
||||
# Récupère les réponses
|
||||
li_answers = q.select("ol li")
|
||||
answers_list = []
|
||||
for ai, li in enumerate(li_answers):
|
||||
text = li.get_text(strip=True)
|
||||
# enlever le ou les '?' de début (le site utilise un bouton avec '?')
|
||||
import re
|
||||
text = re.sub(r'^[\?\s]+', '', text)
|
||||
|
||||
# On essaie de trouver si la réponse est bonne depuis le JS I
|
||||
# On récupère l'array I depuis le script
|
||||
try:
|
||||
# Cherche le script contenant I = [...]
|
||||
script_tag = soup.find("script", text=lambda t: t and "I[" in t)
|
||||
js_text = script_tag.string if script_tag else ""
|
||||
|
||||
# Cherche le pattern correspondant à la bonne réponse
|
||||
# Exemple : I[0][3][0]=new Array('réponse', '', 1, 100, 1);
|
||||
import re
|
||||
pattern = re.compile(rf"I\[{qi}\]\[3\]\[{ai}\]=new Array\('.*?','',(\d),\d+,\d+\);")
|
||||
match = pattern.search(js_text)
|
||||
correct = match.group(1) == '1' if match else False
|
||||
except Exception:
|
||||
correct = False
|
||||
|
||||
answers_list.append({"text": text, "correct": correct})
|
||||
|
||||
questions_data.append({
|
||||
"question": question_html,
|
||||
"question_text": question_text,
|
||||
"images": images,
|
||||
"answers": answers_list,
|
||||
"last_scraped": datetime.utcnow().isoformat()
|
||||
})
|
||||
|
||||
return questions_data
|
||||
|
||||
def save_to_db(questions):
|
||||
import json
|
||||
for q in questions:
|
||||
answers_json = json.dumps(q["answers"], ensure_ascii=False)
|
||||
|
||||
# Calcule un hash normalisé de la question (utilise le texte brut) pour empêcher doublons
|
||||
q_norm = normalize_question(q.get("question_text") or q.get("question"))
|
||||
qhash = hashlib.sha256(q_norm.encode('utf-8')).hexdigest()
|
||||
last_scraped = q.get('last_scraped') or datetime.utcnow().isoformat()
|
||||
|
||||
# Insert ou update selon si le hash existe déjà
|
||||
c.execute('''
|
||||
INSERT INTO questions(question, question_hash, answers, last_scraped)
|
||||
VALUES(?, ?, ?, ?)
|
||||
ON CONFLICT(question_hash) DO UPDATE SET question=excluded.question, answers=excluded.answers, last_scraped=excluded.last_scraped
|
||||
''', (q["question"], qhash, answers_json, last_scraped))
|
||||
conn.commit()
|
||||
logging.info(f"{len(questions)} questions sauvegardées / mises à jour dans la DB.")
|
||||
|
||||
|
||||
def normalize_question(text):
|
||||
# Normalise unicode, retire espaces multiples et passe en minuscule
|
||||
if not text:
|
||||
return ''
|
||||
# si le texte contient du HTML, extraire le texte brut
|
||||
if '<' in text and '>' in text:
|
||||
try:
|
||||
text = BeautifulSoup(text, 'html.parser').get_text(' ', strip=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
s = unicodedata.normalize('NFKC', text)
|
||||
s = ' '.join(s.split())
|
||||
return s.strip().lower()
|
||||
|
||||
# --- LOGGING ET SIGNALS ---
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
||||
|
||||
def shutdown(signum, frame):
|
||||
logging.info("Arrêt demandé, fermeture de la DB.")
|
||||
try:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, shutdown)
|
||||
signal.signal(signal.SIGTERM, shutdown)
|
||||
|
||||
# --- BOUCLE PRINCIPALE ---
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
session = requests.Session()
|
||||
retries = Retry(total=3, backoff_factor=1, status_forcelist=[429,500,502,503,504])
|
||||
session.mount('https://', HTTPAdapter(max_retries=retries))
|
||||
session.mount('http://', HTTPAdapter(max_retries=retries))
|
||||
|
||||
while True:
|
||||
try:
|
||||
data = fetch_qcm(session)
|
||||
save_to_db(data)
|
||||
except Exception as e:
|
||||
logging.exception("Erreur lors de la récupération ou sauvegarde:")
|
||||
|
||||
logging.info(f"Attente {INTERVAL//60} minutes...")
|
||||
time.sleep(INTERVAL)
|
||||
Reference in New Issue
Block a user