diff --git a/lms/lms/api.py b/lms/lms/api.py index 36caf16d..245e65e9 100644 --- a/lms/lms/api.py +++ b/lms/lms/api.py @@ -3,12 +3,10 @@ import json import os import re -import secrets import shutil -import tempfile import xml.etree.ElementTree as ET import zipfile -from datetime import date, datetime, timedelta +from datetime import date, timedelta from xml.dom.minidom import parseString import frappe diff --git a/lms/lms/course_import_export.py b/lms/lms/course_import_export.py index 8840eb86..e7e52707 100644 --- a/lms/lms/course_import_export.py +++ b/lms/lms/course_import_export.py @@ -10,7 +10,7 @@ from urllib.parse import urlparse import frappe from frappe import _ -from frappe.utils import validate_email_address +from frappe.utils import escape_html, validate_email_address def export_course_zip(course_name): @@ -54,32 +54,56 @@ def get_lessons_for_export(course_name: str): return lessons_list +def get_assessment_from_block(block): + block_type = block.get("type") + data_field = "exercise" if block_type == "program" else block_type + name = block.get("data", {}).get(data_field) + doctype = get_assessment_map().get(block_type) + if frappe.db.exists(doctype, name): + return frappe.get_doc(doctype, name) + return None + + +def get_quiz_questions(doc): + questions = [] + for q in doc.questions: + question_doc = frappe.get_doc("LMS Question", q.question) + questions.append(question_doc.as_dict()) + return questions + + +def get_exercise_test_cases(doc): + test_cases = [] + for tc in doc.test_cases: + test_case_doc = frappe.get_doc("LMS Test Case", tc.name) + test_cases.append(test_case_doc.as_dict()) + return test_cases + + +def get_assessments_from_lesson(lesson): + assessments, questions, test_cases = [], [], [] + content = json.loads(lesson.content) if lesson.content else {} + for block in content.get("blocks", []): + if block.get("type") not in ("quiz", "assignment", "program"): + continue + doc = get_assessment_from_block(block) + if not doc: + continue + assessments.append(doc.as_dict()) + if doc.doctype == "LMS Quiz": + questions.extend(get_quiz_questions(doc)) + elif doc.doctype == "LMS Programming Exercise": + test_cases.extend(get_exercise_test_cases(doc)) + return assessments, questions, test_cases + + def get_course_assessments(lessons): assessments, questions, test_cases = [], [], [] for lesson in lessons: - content = json.loads(lesson.content) if lesson.content else {} - for block in content.get("blocks", []): - block_type = block.get("type") - if block_type in ("quiz", "assignment", "program"): - data_field = "exercise" if block_type == "program" else block_type - name = block.get("data", {}).get(data_field) - doctype = ( - "LMS Quiz" - if block_type == "quiz" - else ("LMS Assignment" if block_type == "assignment" else "LMS Programming Exercise") - ) - if frappe.db.exists(doctype, name): - doc = frappe.get_doc(doctype, name) - assessments.append(doc.as_dict()) - if doctype == "LMS Quiz": - for q in doc.questions: - question_doc = frappe.get_doc("LMS Question", q.question) - questions.append(question_doc.as_dict()) - if doctype == "LMS Programming Exercise": - for tc in doc.test_cases: - test_case_doc = frappe.get_doc("LMS Test Case", tc.name) - test_cases.append(test_case_doc.as_dict()) - + lesson_assessments, lesson_questions, lesson_test_cases = get_assessments_from_lesson(lesson) + assessments.extend(lesson_assessments) + questions.extend(lesson_questions) + test_cases.extend(lesson_test_cases) return assessments, questions, test_cases @@ -222,25 +246,19 @@ def write_assessments_json(zip_file, assessments, questions, test_cases): def write_assets(zip_file, assets): assets = list(set(assets)) for asset in assets: - try: - # Validate asset URL - if not asset or not isinstance(asset, str): - continue - - # Check if it's a valid file URL - parsed_url = urlparse(asset) - if parsed_url.scheme and parsed_url.scheme not in ["http", "https"]: - continue - - file_doc = frappe.get_doc("File", {"file_url": asset}) - file_path = os.path.abspath(file_doc.get_full_path()) - - safe_filename = sanitize_filename(os.path.basename(asset)) - zip_file.write(file_path, f"assets/{safe_filename}") - except Exception: - frappe.log_error(frappe.get_traceback(), f"Could not add asset: {asset}") + if not asset or not isinstance(asset, str): continue + parsed_url = urlparse(asset) + if parsed_url.scheme and parsed_url.scheme not in ["http", "https"]: + continue + + file_doc = frappe.get_doc("File", {"file_url": asset}) + file_path = os.path.abspath(file_doc.get_full_path()) + + safe_filename = sanitize_filename(os.path.basename(asset)) + zip_file.write(file_path, f"assets/{safe_filename}") + def move_zip_to_public(tmp_path, zip_filename): final_path = os.path.join(frappe.get_site_path("public", "files"), zip_filename) @@ -261,16 +279,13 @@ def write_evaluator_json(zip_file, evaluator): def serve_zip(final_path, zip_filename): - # Security: Validate file path is within site directory site_path = frappe.get_site_path() if not os.path.abspath(final_path).startswith(site_path): frappe.throw(_("Invalid file path")) - # Security: Check if file exists and is readable if not os.path.exists(final_path) or not os.path.isfile(final_path): frappe.throw(_("File not found")) - # Security: Sanitize filename for download safe_filename = sanitize_filename(zip_filename) try: @@ -349,44 +364,45 @@ def create_user_for_instructors(zip_file): create_user(instructor) -def create_user(user): +def sanitize_name_field(value, max_length=50): + if not value: + return value + value = escape_html(value.strip()[:max_length]) + name_pattern = re.compile(r"^[a-zA-Z0-9\s\-\.]+$") + if not name_pattern.match(value): + value = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", value) + return value + + +def validate_user_email(user): if not user.get("email") or not validate_email_address(user["email"]): frappe.throw(f"Invalid email for user creation: {user.get('email')}") - return - email = user["email"].strip().lower() - first_name = frappe.utils.escape_html(user.get("first_name", "").strip()[:50]) - last_name = frappe.utils.escape_html(user.get("last_name", "").strip()[:50]) - full_name = frappe.utils.escape_html(user.get("full_name", "").strip()[:100]) - name_pattern = re.compile(r"^[a-zA-Z0-9\s\-\.]+$") - if first_name and not name_pattern.match(first_name): - first_name = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", first_name) - if last_name and not name_pattern.match(last_name): - last_name = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", last_name) - - user_doc = frappe.new_doc("User") - user_doc.email = email - user_doc.first_name = first_name or full_name.split()[0] if full_name else "Imported" - user_doc.last_name = last_name or ( - " ".join(full_name.split()[1:]) if len(full_name.split()) > 1 else "User" +def get_user_names(user): + first_name = sanitize_name_field(user.get("first_name", ""), 50) + last_name = sanitize_name_field(user.get("last_name", ""), 50) + full_name = sanitize_name_field(user.get("full_name", ""), 100) + parts = full_name.split() if full_name else [] + return ( + first_name or (parts[0] if parts else "Imported"), + last_name or (" ".join(parts[1:]) if len(parts) > 1 else None), + full_name, ) - user_doc.full_name = full_name or f"{user_doc.first_name} {user_doc.last_name}".strip() - user_doc.add_roles(["Course Creator"]) - user_doc.send_welcome_email = False - user_image = user.get("user_image") - if user_image: - try: - parsed_url = urlparse(user_image) - if parsed_url.scheme in ["http", "https"] and len(user_image) < 500: - user_doc.user_image = user_image - except Exception: - pass - try: - user_doc.insert() - except Exception as e: - frappe.log_error(f"Error creating user {email}: {str(e)}") + +def create_user(user): + validate_user_email(user) + first_name, last_name, full_name = get_user_names(user) + user_doc = frappe.new_doc("User") + user_doc.email = user["email"].strip().lower() + user_doc.first_name = first_name + user_doc.last_name = last_name + user_doc.full_name = full_name or f"{first_name} {last_name}".strip() + user_doc.send_welcome_email = False + user_doc.user_image = user.get("user_image") + user_doc.insert() + user_doc.add_roles("Course Creator") def create_evaluator(zip_file): @@ -394,7 +410,6 @@ def create_evaluator(zip_file): if not evaluator_data: return - # Validate evaluator data if not evaluator_data.get("evaluator") or not validate_email_address(evaluator_data.get("evaluator", "")): frappe.log_error(f"Invalid evaluator data: {evaluator_data}") return @@ -581,65 +596,86 @@ def add_questions_to_quiz(quiz_doc, questions): quiz_doc.append("questions", {"question": question_name}) -def create_assessment_docs(zip_file): +def create_supporting_docs(zip_file): for file in zip_file.namelist(): if file.startswith("assessments/questions/") and file.endswith(".json"): create_question_doc(zip_file, file) elif file.startswith("assessments/test_cases/") and file.endswith(".json"): create_test_case_doc(zip_file, file) + +def is_assessment_file(file): + return ( + file.startswith("assessments/") + and file.endswith(".json") + and not file.startswith("assessments/questions/") + and not file.startswith("assessments/test_cases/") + ) + + +def build_assessment_doc(assessment_data): + doctype = assessment_data.get("doctype") + if doctype not in ("LMS Quiz", "LMS Assignment", "LMS Programming Exercise"): + return + if frappe.db.exists(doctype, assessment_data.get("name")): + return + + questions = assessment_data.pop("questions", []) + test_cases = assessment_data.pop("test_cases", []) + doc = frappe.new_doc(doctype) + doc.update(assessment_data) + + if doctype == "LMS Quiz": + add_questions_to_quiz(doc, questions) + elif doctype == "LMS Programming Exercise": + for row in test_cases: + doc.append("test_cases", {"input": row["input"], "expected_output": row["expected_output"]}) + + doc.insert(ignore_permissions=True) + + +def create_main_assessment_docs(zip_file): for file in zip_file.namelist(): - if ( - file.startswith("assessments/") - and file.endswith(".json") - and not file.startswith("assessments/questions/") - and not file.startswith("assessments/test_cases/") - ): - assessment_data = read_json_from_zip(zip_file, file) - if not assessment_data: - continue - assessment_data.pop("lesson", None) - assessment_data.pop("course", None) - doctype = assessment_data.get("doctype") - if doctype in ("LMS Quiz", "LMS Assignment", "LMS Programming Exercise") and not frappe.db.exists( - doctype, assessment_data.get("name") - ): - questions = assessment_data.pop("questions", []) - test_cases = assessment_data.pop("test_cases", []) - doc = frappe.new_doc(doctype) - doc.update(assessment_data) - if doctype == "LMS Quiz": - add_questions_to_quiz(doc, questions) - elif doctype == "LMS Programming Exercise": - for row in test_cases: - doc.append( - "test_cases", {"input": row["input"], "expected_output": row["expected_output"]} - ) - doc.insert(ignore_permissions=True) + if not is_assessment_file(file): + continue + assessment_data = read_json_from_zip(zip_file, file) + if not assessment_data: + continue + assessment_data.pop("lesson", None) + assessment_data.pop("course", None) + build_assessment_doc(assessment_data) + + +def create_assessment_docs(zip_file): + create_supporting_docs(zip_file) + create_main_assessment_docs(zip_file) + + +def create_asset_doc(asset_name, content): + if frappe.db.exists("File", {"file_name": asset_name}): + return + asset_doc = frappe.new_doc("File") + asset_doc.file_name = asset_name + asset_doc.content = content + asset_doc.insert() + + +def process_asset_file(zip_file, file): + if is_unsafe_path(file): + frappe.log_error(f"Unsafe asset path: {file}") + return + with zip_file.open(file) as f: + create_asset_doc(file.split("/")[-1], f.read()) def create_assets(zip_file): for file in zip_file.namelist(): - if file.startswith("assets/") and not file.endswith("/"): - try: - # Validate file path - if is_unsafe_path(file): - frappe.log_error(f"Unsafe asset path: {file}") - continue - - with zip_file.open(file) as f: - content = f.read() - asset_name = file.split("/")[-1] - if not frappe.db.exists("File", {"file_name": asset_name}): - try: - asset_doc = frappe.new_doc("File") - asset_doc.file_name = asset_name - asset_doc.content = content - asset_doc.insert() - except Exception as e: - frappe.log_error(f"Error creating asset {asset_name}: {str(e)}") - except Exception as e: - frappe.log_error(f"Error processing asset {file}: {e}") + if not file.startswith("assets/") or file.endswith("/"): + continue + try: + process_asset_file(zip_file, file) + except Exception as e: + frappe.log_error(f"Error processing asset {file}: {e}") def get_lesson_title(zip_file, lesson_name):