Présentation de la base de connaissance : """ Nom du script : Base de Connaissance.py Auteur : Neil ANDRE Date : 27/08/2025 Version : 1.0 Script Python : Base de Connaissance avec Extraction, Indexation et Recherche Fonctionnalités : - Extraction de texte (.txt, .docx, .pptx, .pdf) - Indexation dans MySQL (MariaDB) - Interface web sécurisée (upload + recherche) - Authentification pour l'upload - Pagination des résultats - Optimisation des requêtes MySQL - Surlignage des mots-clés dans les résultats - Ouverture/téléchargement des fichiers - Page intermédiaire après upload pour choisir entre ouvrir ou enregistrer le fichier - Enregistrement des fichiers dans un répertoire dédié (saved_files/) - Statistiques par type de fichier """ ✅ Indexation automatique sans doublons ✅ Mise à jour des fichiers modifiés ✅ Surlignage des mots-clés ✅ Ouverture/téléchargement des fichiers ✅ Page intermédiaire après upload ✅ Authentification sécurisée # I - Prérequis # pip install flask mysql-connector-python python-docx python-pptx pdfplumber werkzeug flask-login # II - Structure de la Base MySQL (MariaDB) # Commandes exécutées dans le SGBD : # CREATE DATABASE knowledge_base; # USE knowledge_base; # CREATE TABLE documents ( # id INT AUTO_INCREMENT PRIMARY KEY, # title VARCHAR(255), # content LONGTEXT, # file_type VARCHAR(10), # import_date DATETIME DEFAULT CURRENT_TIMESTAMP, # FULLTEXT(content), # INDEX(file_type) # ); # CREATE TABLE users ( # id INT AUTO_INCREMENT PRIMARY KEY, # username VARCHAR(50) UNIQUE NOT NULL, # password VARCHAR(255) NOT NULL # ); # INSERT INTO users (username, password) VALUES ('admin', '$5K7lDMYIH4DR4jAt$42a6512e29cce828a6e84359d40dcc037cf872bb20d2405e711844337bbd61856a79cb8a65bf3432f6d32d7b9d90997a97b23d1b07cf06408460aab31d1a0aa2'); -- Mot de passe : admin # III - Script Python import os import re import shutil import pdfplumber from docx import Document from pptx import Presentation import mysql.connector from flask import Flask, request, redirect, render_template_string, send_from_directory, flash, url_for from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user app = Flask(__name__) app.secret_key = 'votre_cle_secrete_ici' # À changer en production ! UPLOAD_FOLDER = 'uploads' SAVED_FOLDER = 'saved_files' # Répertoire pour enregistrer les fichiers DOCUMENTS_DIR = 'documents' ALLOWED_EXTENSIONS = {'txt', 'docx', 'pptx', 'pdf'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # Configuration Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # Modèle utilisateur class User(UserMixin): def __init__(self, id, username): self.id = id self.username = username # Connexion à la base de données MySQL db = mysql.connector.connect( host="localhost", user="root", password="Password@2025", database="knowledge_base" ) cursor = db.cursor() def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def save_file(file): filename = secure_filename(file.filename) path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(path) return filename, path def save_file_permanently(filename): """Enregistre le fichier dans le répertoire saved_files/.""" src = os.path.join(UPLOAD_FOLDER, filename) dst = os.path.join(SAVED_FOLDER, filename) shutil.copy2(src, dst) return dst def extract_content(path, ext): try: if ext == "txt": with open(path, 'r', encoding='utf-8', errors='ignore') as f: return f.read() elif ext == "docx": doc = Document(path) return "\n".join([p.text for p in doc.paragraphs]) elif ext == "pptx": prs = Presentation(path) return "\n".join([shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")]) elif ext == "pdf": text = "" with pdfplumber.open(path) as pdf: for page in pdf.pages: text += page.extract_text() + "\n" return text except Exception as e: print(f"[ERREUR] Extraction du fichier {path}: {e}") return "" def insert_into_db(title, content, file_type): try: cursor.execute("INSERT INTO documents (title, content, file_type) VALUES (%s, %s, %s)", (title, content, file_type)) db.commit() except Exception as e: print(f"[ERREUR] Insertion dans la base : {e}") def process_directory(directory): for filename in os.listdir(directory): path = os.path.join(directory, filename) if os.path.isfile(path) and allowed_file(filename): ext = filename.rsplit('.', 1)[1].lower() content = extract_content(path, ext) if content: insert_into_db(filename, content, ext) print(f"[DEBUG] Fichier indexé : {filename}") @login_manager.user_loader def load_user(user_id): cursor.execute("SELECT id, username FROM users WHERE id = %s", (user_id,)) user_data = cursor.fetchone() if user_data: return User(id=user_data[0], username=user_data[1]) return None @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] cursor.execute("SELECT id, username, password FROM users WHERE username = %s", (username,)) user_data = cursor.fetchone() if user_data and check_password_hash(user_data[2], password): user = User(id=user_data[0], username=user_data[1]) login_user(user) return redirect(url_for('search')) flash('Identifiants incorrects', 'error') return render_template_string(''' Login

Connexion

{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}




''') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('search')) @app.route('/uploads/') def uploaded_file(filename): upload_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) saved_path = os.path.join(SAVED_FOLDER, filename) documents_path = os.path.join(DOCUMENTS_DIR, filename) path = None if os.path.exists(upload_path): path = upload_path elif os.path.exists(saved_path): path = saved_path elif os.path.exists(documents_path): path = documents_path else: return "Fichier non trouvé", 404 if filename.endswith('.pdf'): mimetype = 'application/pdf' elif filename.endswith('.docx'): mimetype = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' elif filename.endswith('.pptx'): mimetype = 'application/vnd.openxmlformats-officedocument.presentationml.presentation' else: mimetype = 'text/plain' if os.path.dirname(path) == app.config['UPLOAD_FOLDER']: return send_from_directory(app.config['UPLOAD_FOLDER'], filename, mimetype=mimetype, as_attachment=False) elif os.path.dirname(path) == SAVED_FOLDER: return send_from_directory(SAVED_FOLDER, filename, mimetype=mimetype, as_attachment=False) else: return send_from_directory(DOCUMENTS_DIR, filename, mimetype=mimetype, as_attachment=False) def highlight_text(text, keyword): if not keyword: return text pattern = re.compile(re.escape(keyword), re.IGNORECASE) return pattern.sub(f'{keyword}', text) @app.route("/upload", methods=["GET", "POST"]) @login_required def upload_file(): if request.method == "POST": file = request.files["file"] if file and allowed_file(file.filename): filename, path = save_file(file) ext = filename.rsplit('.', 1)[1].lower() content = extract_content(path, ext) if content: insert_into_db(filename, content, ext) return redirect(url_for('post_upload', filename=filename)) else: flash('Impossible d\'extraire le contenu du fichier.', 'error') return redirect(url_for('search')) return render_template_string(''' Uploader un fichier

Uploader un nouveau fichier

{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}
Retour à la recherche ''') @app.route("/post_upload/") @login_required def post_upload(filename): return render_template_string(''' Fichier uploadé

Fichier "{{ filename }}" uploadé avec succès !

Que souhaitez-vous faire ?

Retour à la recherche
''', filename=filename) @app.route("/save_file/") @login_required def save_file_route(filename): try: save_file_permanently(filename) flash(f'Fichier "{filename}" enregistré dans le répertoire "saved_files/".', 'success') except Exception as e: flash(f'Erreur lors de l\'enregistrement du fichier : {e}', 'error') return redirect(url_for('search')) @app.route("/") def search(): keyword = request.args.get("search", "") file_type = request.args.get("type", "") page = int(request.args.get("page", 1)) per_page = 5 # Requête pour compter les résultats par type type_counts = {} query_count = "SELECT file_type, COUNT(*) FROM documents WHERE content LIKE %s" params = [f"%{keyword}%"] if file_type: query_count += " AND file_type = %s" params.append(file_type) query_count += " GROUP BY file_type" cursor.execute(query_count, tuple(params)) for ft, count in cursor.fetchall(): type_counts[ft] = count # Requête principale query = "SELECT * FROM documents WHERE content LIKE %s" params = [f"%{keyword}%"] if file_type: query += " AND file_type = %s" params.append(file_type) cursor.execute(f"SELECT COUNT(*) FROM documents WHERE content LIKE %s{' AND file_type = %s' if file_type else ''}", tuple(params)) total = cursor.fetchone()[0] offset = (page - 1) * per_page query += " LIMIT %s OFFSET %s" params.extend([per_page, offset]) cursor.execute(query, tuple(params)) documents = cursor.fetchall() total_pages = (total + per_page - 1) // per_page return render_template_string(''' Recherche

Rechercher dans la base de connaissance

{% if current_user.is_authenticated %}
Connecté en tant que {{ current_user.username }} | Se déconnecter
{% else %} Se connecter pour uploader {% endif %}
{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}
{% if keyword %}
Résultats pour "{{ keyword }}" : {% for ft, count in type_counts.items() %} {{ ft|upper }}: {{ count }} {% endfor %} | Total: {{ total }}
{% endif %} {% if current_user.is_authenticated %}

Uploader un fichier

{% endif %} {% if documents %} {% for doc in documents %}

{{ doc[1] }} ({{ doc[3] }})

{{ highlight_text(doc[2][:500] + ('...' if doc[2]|length > 500 else ''), keyword)|safe }}

Importé le: {{ doc[4] }}
{% endfor %} {% if total_pages > 1 %} {% endif %} {% else %}

Aucun résultat trouvé pour "{{ keyword }}".

{% endif %} ''', keyword=keyword, file_type=file_type, documents=documents, total=total, page=page, per_page=per_page, total_pages=total_pages, type_counts=type_counts, highlight_text=highlight_text ) def create_admin_user(): admin_username = "admin" admin_password = "admin" cursor.execute("SELECT COUNT(*) FROM users WHERE username = %s", (admin_username,)) if cursor.fetchone()[0] == 0: hashed_password = generate_password_hash(admin_password) cursor.execute("INSERT INTO users (username, password) VALUES (%s, %s)", (admin_username, hashed_password)) db.commit() print("[DEBUG] Utilisateur admin créé.") if __name__ == "__main__": os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(SAVED_FOLDER, exist_ok=True) os.makedirs(DOCUMENTS_DIR, exist_ok=True) create_admin_user() if os.path.exists(DOCUMENTS_DIR): print(f"[DEBUG] Indexation des fichiers du répertoire {DOCUMENTS_DIR}...") process_directory(DOCUMENTS_DIR) app.run(debug=True)