fix: cleanup of import functionality

This commit is contained in:
Jannat Patel
2026-04-02 16:48:33 +05:30
parent e1e2c08493
commit 564d10feb6
2 changed files with 162 additions and 128 deletions

View File

@@ -3,12 +3,10 @@
import json
import os
import re
import secrets
import shutil
import tempfile
import xml.etree.ElementTree as ET
import zipfile
from datetime import date, datetime, timedelta
from datetime import date, timedelta
from xml.dom.minidom import parseString
import frappe

View File

@@ -10,7 +10,7 @@ from urllib.parse import urlparse
import frappe
from frappe import _
from frappe.utils import validate_email_address
from frappe.utils import escape_html, validate_email_address
def export_course_zip(course_name):
@@ -54,32 +54,56 @@ def get_lessons_for_export(course_name: str):
return lessons_list
def get_assessment_from_block(block):
block_type = block.get("type")
data_field = "exercise" if block_type == "program" else block_type
name = block.get("data", {}).get(data_field)
doctype = get_assessment_map().get(block_type)
if frappe.db.exists(doctype, name):
return frappe.get_doc(doctype, name)
return None
def get_quiz_questions(doc):
questions = []
for q in doc.questions:
question_doc = frappe.get_doc("LMS Question", q.question)
questions.append(question_doc.as_dict())
return questions
def get_exercise_test_cases(doc):
test_cases = []
for tc in doc.test_cases:
test_case_doc = frappe.get_doc("LMS Test Case", tc.name)
test_cases.append(test_case_doc.as_dict())
return test_cases
def get_assessments_from_lesson(lesson):
assessments, questions, test_cases = [], [], []
content = json.loads(lesson.content) if lesson.content else {}
for block in content.get("blocks", []):
if block.get("type") not in ("quiz", "assignment", "program"):
continue
doc = get_assessment_from_block(block)
if not doc:
continue
assessments.append(doc.as_dict())
if doc.doctype == "LMS Quiz":
questions.extend(get_quiz_questions(doc))
elif doc.doctype == "LMS Programming Exercise":
test_cases.extend(get_exercise_test_cases(doc))
return assessments, questions, test_cases
def get_course_assessments(lessons):
assessments, questions, test_cases = [], [], []
for lesson in lessons:
content = json.loads(lesson.content) if lesson.content else {}
for block in content.get("blocks", []):
block_type = block.get("type")
if block_type in ("quiz", "assignment", "program"):
data_field = "exercise" if block_type == "program" else block_type
name = block.get("data", {}).get(data_field)
doctype = (
"LMS Quiz"
if block_type == "quiz"
else ("LMS Assignment" if block_type == "assignment" else "LMS Programming Exercise")
)
if frappe.db.exists(doctype, name):
doc = frappe.get_doc(doctype, name)
assessments.append(doc.as_dict())
if doctype == "LMS Quiz":
for q in doc.questions:
question_doc = frappe.get_doc("LMS Question", q.question)
questions.append(question_doc.as_dict())
if doctype == "LMS Programming Exercise":
for tc in doc.test_cases:
test_case_doc = frappe.get_doc("LMS Test Case", tc.name)
test_cases.append(test_case_doc.as_dict())
lesson_assessments, lesson_questions, lesson_test_cases = get_assessments_from_lesson(lesson)
assessments.extend(lesson_assessments)
questions.extend(lesson_questions)
test_cases.extend(lesson_test_cases)
return assessments, questions, test_cases
@@ -222,25 +246,19 @@ def write_assessments_json(zip_file, assessments, questions, test_cases):
def write_assets(zip_file, assets):
assets = list(set(assets))
for asset in assets:
try:
# Validate asset URL
if not asset or not isinstance(asset, str):
continue
# Check if it's a valid file URL
parsed_url = urlparse(asset)
if parsed_url.scheme and parsed_url.scheme not in ["http", "https"]:
continue
file_doc = frappe.get_doc("File", {"file_url": asset})
file_path = os.path.abspath(file_doc.get_full_path())
safe_filename = sanitize_filename(os.path.basename(asset))
zip_file.write(file_path, f"assets/{safe_filename}")
except Exception:
frappe.log_error(frappe.get_traceback(), f"Could not add asset: {asset}")
if not asset or not isinstance(asset, str):
continue
parsed_url = urlparse(asset)
if parsed_url.scheme and parsed_url.scheme not in ["http", "https"]:
continue
file_doc = frappe.get_doc("File", {"file_url": asset})
file_path = os.path.abspath(file_doc.get_full_path())
safe_filename = sanitize_filename(os.path.basename(asset))
zip_file.write(file_path, f"assets/{safe_filename}")
def move_zip_to_public(tmp_path, zip_filename):
final_path = os.path.join(frappe.get_site_path("public", "files"), zip_filename)
@@ -261,16 +279,13 @@ def write_evaluator_json(zip_file, evaluator):
def serve_zip(final_path, zip_filename):
# Security: Validate file path is within site directory
site_path = frappe.get_site_path()
if not os.path.abspath(final_path).startswith(site_path):
frappe.throw(_("Invalid file path"))
# Security: Check if file exists and is readable
if not os.path.exists(final_path) or not os.path.isfile(final_path):
frappe.throw(_("File not found"))
# Security: Sanitize filename for download
safe_filename = sanitize_filename(zip_filename)
try:
@@ -349,44 +364,45 @@ def create_user_for_instructors(zip_file):
create_user(instructor)
def create_user(user):
def sanitize_name_field(value, max_length=50):
if not value:
return value
value = escape_html(value.strip()[:max_length])
name_pattern = re.compile(r"^[a-zA-Z0-9\s\-\.]+$")
if not name_pattern.match(value):
value = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", value)
return value
def validate_user_email(user):
if not user.get("email") or not validate_email_address(user["email"]):
frappe.throw(f"Invalid email for user creation: {user.get('email')}")
return
email = user["email"].strip().lower()
first_name = frappe.utils.escape_html(user.get("first_name", "").strip()[:50])
last_name = frappe.utils.escape_html(user.get("last_name", "").strip()[:50])
full_name = frappe.utils.escape_html(user.get("full_name", "").strip()[:100])
name_pattern = re.compile(r"^[a-zA-Z0-9\s\-\.]+$")
if first_name and not name_pattern.match(first_name):
first_name = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", first_name)
if last_name and not name_pattern.match(last_name):
last_name = re.sub(r"[^a-zA-Z0-9\s\-\.]", "", last_name)
user_doc = frappe.new_doc("User")
user_doc.email = email
user_doc.first_name = first_name or full_name.split()[0] if full_name else "Imported"
user_doc.last_name = last_name or (
" ".join(full_name.split()[1:]) if len(full_name.split()) > 1 else "User"
def get_user_names(user):
first_name = sanitize_name_field(user.get("first_name", ""), 50)
last_name = sanitize_name_field(user.get("last_name", ""), 50)
full_name = sanitize_name_field(user.get("full_name", ""), 100)
parts = full_name.split() if full_name else []
return (
first_name or (parts[0] if parts else "Imported"),
last_name or (" ".join(parts[1:]) if len(parts) > 1 else None),
full_name,
)
user_doc.full_name = full_name or f"{user_doc.first_name} {user_doc.last_name}".strip()
user_doc.add_roles(["Course Creator"])
user_doc.send_welcome_email = False
user_image = user.get("user_image")
if user_image:
try:
parsed_url = urlparse(user_image)
if parsed_url.scheme in ["http", "https"] and len(user_image) < 500:
user_doc.user_image = user_image
except Exception:
pass
try:
user_doc.insert()
except Exception as e:
frappe.log_error(f"Error creating user {email}: {str(e)}")
def create_user(user):
validate_user_email(user)
first_name, last_name, full_name = get_user_names(user)
user_doc = frappe.new_doc("User")
user_doc.email = user["email"].strip().lower()
user_doc.first_name = first_name
user_doc.last_name = last_name
user_doc.full_name = full_name or f"{first_name} {last_name}".strip()
user_doc.send_welcome_email = False
user_doc.user_image = user.get("user_image")
user_doc.insert()
user_doc.add_roles("Course Creator")
def create_evaluator(zip_file):
@@ -394,7 +410,6 @@ def create_evaluator(zip_file):
if not evaluator_data:
return
# Validate evaluator data
if not evaluator_data.get("evaluator") or not validate_email_address(evaluator_data.get("evaluator", "")):
frappe.log_error(f"Invalid evaluator data: {evaluator_data}")
return
@@ -581,65 +596,86 @@ def add_questions_to_quiz(quiz_doc, questions):
quiz_doc.append("questions", {"question": question_name})
def create_assessment_docs(zip_file):
def create_supporting_docs(zip_file):
for file in zip_file.namelist():
if file.startswith("assessments/questions/") and file.endswith(".json"):
create_question_doc(zip_file, file)
elif file.startswith("assessments/test_cases/") and file.endswith(".json"):
create_test_case_doc(zip_file, file)
def is_assessment_file(file):
return (
file.startswith("assessments/")
and file.endswith(".json")
and not file.startswith("assessments/questions/")
and not file.startswith("assessments/test_cases/")
)
def build_assessment_doc(assessment_data):
doctype = assessment_data.get("doctype")
if doctype not in ("LMS Quiz", "LMS Assignment", "LMS Programming Exercise"):
return
if frappe.db.exists(doctype, assessment_data.get("name")):
return
questions = assessment_data.pop("questions", [])
test_cases = assessment_data.pop("test_cases", [])
doc = frappe.new_doc(doctype)
doc.update(assessment_data)
if doctype == "LMS Quiz":
add_questions_to_quiz(doc, questions)
elif doctype == "LMS Programming Exercise":
for row in test_cases:
doc.append("test_cases", {"input": row["input"], "expected_output": row["expected_output"]})
doc.insert(ignore_permissions=True)
def create_main_assessment_docs(zip_file):
for file in zip_file.namelist():
if (
file.startswith("assessments/")
and file.endswith(".json")
and not file.startswith("assessments/questions/")
and not file.startswith("assessments/test_cases/")
):
assessment_data = read_json_from_zip(zip_file, file)
if not assessment_data:
continue
assessment_data.pop("lesson", None)
assessment_data.pop("course", None)
doctype = assessment_data.get("doctype")
if doctype in ("LMS Quiz", "LMS Assignment", "LMS Programming Exercise") and not frappe.db.exists(
doctype, assessment_data.get("name")
):
questions = assessment_data.pop("questions", [])
test_cases = assessment_data.pop("test_cases", [])
doc = frappe.new_doc(doctype)
doc.update(assessment_data)
if doctype == "LMS Quiz":
add_questions_to_quiz(doc, questions)
elif doctype == "LMS Programming Exercise":
for row in test_cases:
doc.append(
"test_cases", {"input": row["input"], "expected_output": row["expected_output"]}
)
doc.insert(ignore_permissions=True)
if not is_assessment_file(file):
continue
assessment_data = read_json_from_zip(zip_file, file)
if not assessment_data:
continue
assessment_data.pop("lesson", None)
assessment_data.pop("course", None)
build_assessment_doc(assessment_data)
def create_assessment_docs(zip_file):
create_supporting_docs(zip_file)
create_main_assessment_docs(zip_file)
def create_asset_doc(asset_name, content):
if frappe.db.exists("File", {"file_name": asset_name}):
return
asset_doc = frappe.new_doc("File")
asset_doc.file_name = asset_name
asset_doc.content = content
asset_doc.insert()
def process_asset_file(zip_file, file):
if is_unsafe_path(file):
frappe.log_error(f"Unsafe asset path: {file}")
return
with zip_file.open(file) as f:
create_asset_doc(file.split("/")[-1], f.read())
def create_assets(zip_file):
for file in zip_file.namelist():
if file.startswith("assets/") and not file.endswith("/"):
try:
# Validate file path
if is_unsafe_path(file):
frappe.log_error(f"Unsafe asset path: {file}")
continue
with zip_file.open(file) as f:
content = f.read()
asset_name = file.split("/")[-1]
if not frappe.db.exists("File", {"file_name": asset_name}):
try:
asset_doc = frappe.new_doc("File")
asset_doc.file_name = asset_name
asset_doc.content = content
asset_doc.insert()
except Exception as e:
frappe.log_error(f"Error creating asset {asset_name}: {str(e)}")
except Exception as e:
frappe.log_error(f"Error processing asset {file}: {e}")
if not file.startswith("assets/") or file.endswith("/"):
continue
try:
process_asset_file(zip_file, file)
except Exception as e:
frappe.log_error(f"Error processing asset {file}: {e}")
def get_lesson_title(zip_file, lesson_name):