mirror of
https://github.com/frappe/lms.git
synced 2026-05-02 13:39:31 +03:00
Merge pull request #2297 from frappe/main-hotfix
chore: merge 'main-hotfix' into 'main'
This commit is contained in:
+27
-24
@@ -3,14 +3,32 @@ import json
|
||||
import frappe
|
||||
|
||||
from lms.lms.doctype.lms_course.lms_course import update_course_statistics
|
||||
from lms.lms.utils import get_course_progress
|
||||
from lms.lms.utils import create_user, get_course_progress
|
||||
|
||||
|
||||
def create_demo_data(args: dict = None):
|
||||
course = create_course()
|
||||
student = create_user("Ashley", "Ippolito", "ash@ipp.com", "/assets/lms/images/student.jpg")
|
||||
student1 = create_user("John", "Doe", "john.doe@example.com", "/assets/lms/images/student1.jpeg")
|
||||
student2 = create_user("Jane", "Smith", "jane.smith@example.com", "/assets/lms/images/student2.jpeg")
|
||||
student = create_user(
|
||||
email="ash@ipp.com",
|
||||
first_name="Ashley",
|
||||
last_name="Ippolito",
|
||||
full_name="Ashley Ippolito",
|
||||
user_image="/assets/lms/images/student.jpg",
|
||||
)
|
||||
student1 = create_user(
|
||||
email="john.doe@example.com",
|
||||
first_name="John",
|
||||
last_name="Doe",
|
||||
full_name="John Doe",
|
||||
user_image="/assets/lms/images/student1.jpeg",
|
||||
)
|
||||
student2 = create_user(
|
||||
email="jane.smith@example.com",
|
||||
first_name="Jane",
|
||||
last_name="Smith",
|
||||
full_name="Jane Smith",
|
||||
user_image="/assets/lms/images/student2.jpeg",
|
||||
)
|
||||
create_chapter(course)
|
||||
create_lessons(course)
|
||||
enroll_student_in_course(student, course)
|
||||
@@ -93,29 +111,14 @@ def create_instructor():
|
||||
return instructor
|
||||
|
||||
return create_user(
|
||||
"Jannat", "Patel", "jannat@example.com", "/assets/lms/images/instructor.png", ["Moderator"]
|
||||
email="jannat@example.com",
|
||||
first_name="Jannat",
|
||||
last_name="Patel",
|
||||
user_image="/assets/lms/images/instructor.png",
|
||||
roles=["Moderator"],
|
||||
)
|
||||
|
||||
|
||||
def create_user(first_name, last_name, email, user_image, roles=None):
|
||||
if roles is None:
|
||||
roles = ["LMS Student"]
|
||||
|
||||
filters = {"first_name": first_name, "last_name": last_name, "email": email}
|
||||
if frappe.db.exists("User", filters):
|
||||
return frappe.get_doc("User", filters)
|
||||
|
||||
user = frappe.new_doc("User")
|
||||
user.first_name = first_name
|
||||
user.last_name = last_name
|
||||
user.user_image = user_image
|
||||
user.email = email
|
||||
user.send_welcome_email = False
|
||||
user.add_roles(*roles)
|
||||
user.save()
|
||||
return user
|
||||
|
||||
|
||||
def create_chapter(course):
|
||||
prepare_chapter(course, "Introduction")
|
||||
prepare_chapter(course, "Adding content to your lessons")
|
||||
|
||||
+37
-21
@@ -29,17 +29,16 @@ from frappe.utils import (
|
||||
from frappe.utils.response import Response
|
||||
from pypika import functions as fn
|
||||
|
||||
from lms.lms.course_import_export import export_course_zip, import_course_zip
|
||||
from lms.lms.doctype.course_lesson.course_lesson import save_progress
|
||||
from lms.lms.utils import (
|
||||
LMS_ROLES,
|
||||
can_modify_batch,
|
||||
can_modify_course,
|
||||
get_average_rating,
|
||||
get_batch_details,
|
||||
get_course_details,
|
||||
get_field_meta,
|
||||
get_instructors,
|
||||
get_lesson_count,
|
||||
get_lms_route,
|
||||
has_course_instructor_role,
|
||||
has_evaluator_role,
|
||||
@@ -664,7 +663,7 @@ def check_app_permission():
|
||||
def save_evaluation_details(
|
||||
member: str,
|
||||
course: str,
|
||||
date: str,
|
||||
date_value: str,
|
||||
start_time: str,
|
||||
end_time: str,
|
||||
status: str,
|
||||
@@ -680,7 +679,7 @@ def save_evaluation_details(
|
||||
evaluation = frappe.db.exists("LMS Certificate Evaluation", {"member": member, "course": course})
|
||||
|
||||
details = {
|
||||
"date": date,
|
||||
"date": date_value,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"status": status,
|
||||
@@ -1217,9 +1216,9 @@ def fetch_activity_data(member: str, start_date: str):
|
||||
|
||||
def count_dates(data: list, date_count: dict):
|
||||
for entry in data:
|
||||
date = format_date(entry.creation, "YYYY-MM-dd")
|
||||
if date in date_count:
|
||||
date_count[date] += 1
|
||||
date_value = format_date(entry.creation, "YYYY-MM-dd")
|
||||
if date_value in date_count:
|
||||
date_count[date_value] += 1
|
||||
|
||||
|
||||
def prepare_heatmap_data(start_date: str, number_of_days: int, date_count: dict):
|
||||
@@ -1230,18 +1229,18 @@ def prepare_heatmap_data(start_date: str, number_of_days: int, date_count: dict)
|
||||
last_seen_month = None
|
||||
sorted_dates = sorted(date_count.keys())
|
||||
|
||||
for date in sorted_dates:
|
||||
activity_count = date_count[date]
|
||||
day_of_week = get_datetime(date).strftime("%a")
|
||||
current_month = get_datetime(date).strftime("%b")
|
||||
column_index = get_week_difference(start_date, date)
|
||||
for date_value in sorted_dates:
|
||||
activity_count = date_count[date_value]
|
||||
day_of_week = get_datetime(date_value).strftime("%a")
|
||||
current_month = get_datetime(date_value).strftime("%b")
|
||||
column_index = get_week_difference(start_date, date_value)
|
||||
|
||||
if 0 <= column_index < week_count:
|
||||
heatmap_data[day_of_week].append(
|
||||
{
|
||||
"date": date,
|
||||
"date": date_value,
|
||||
"count": activity_count,
|
||||
"label": f"{activity_count} activities on {format_date(date, 'dd MMM')}",
|
||||
"label": f"{activity_count} activities on {format_date(date_value, 'dd MMM')}",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1392,9 +1391,9 @@ def cancel_evaluation(evaluation: dict):
|
||||
|
||||
for event in events:
|
||||
info = frappe.db.get_value("Event", event.parent, ["starts_on", "subject"], as_dict=1)
|
||||
date = str(info.starts_on).split(" ")[0]
|
||||
date_value = str(info.starts_on).split(" ")[0]
|
||||
|
||||
if date == str(evaluation.date.format("YYYY-MM-DD")) and evaluation.member_name in info.subject:
|
||||
if date_value == str(evaluation.date.format("YYYY-MM-DD")) and evaluation.member_name in info.subject:
|
||||
communication = frappe.db.get_value(
|
||||
"Communication",
|
||||
{"reference_doctype": "Event", "reference_name": event.parent},
|
||||
@@ -1551,6 +1550,9 @@ def update_meta_info(meta_type: str, route: str, meta_tags: list):
|
||||
def validate_meta_tags(meta_tags: list):
|
||||
if not isinstance(meta_tags, list):
|
||||
frappe.throw(_("Meta tags should be a list."))
|
||||
for tag in meta_tags:
|
||||
if tag.get("value"):
|
||||
tag["value"] = frappe.utils.strip_html_tags(str(tag["value"]))
|
||||
|
||||
|
||||
def create_meta(parent_name: str, tag_properties: dict):
|
||||
@@ -2271,7 +2273,7 @@ def get_course_programming_exercise_progress(course: str, member: str):
|
||||
return submissions
|
||||
|
||||
|
||||
def get_assessment_from_lesson(course: str, assessmentType: str):
|
||||
def get_assessment_from_lesson(course: str, assessment_type: str):
|
||||
assessments = []
|
||||
lessons = frappe.get_all("Course Lesson", {"course": course}, ["name", "title", "content"])
|
||||
|
||||
@@ -2279,10 +2281,10 @@ def get_assessment_from_lesson(course: str, assessmentType: str):
|
||||
if lesson.content:
|
||||
content = json.loads(lesson.content)
|
||||
for block in content.get("blocks", []):
|
||||
if block.get("type") == assessmentType:
|
||||
data_field = "exercise" if assessmentType == "program" else assessmentType
|
||||
quiz_name = block.get("data", {}).get(data_field)
|
||||
assessments.append(quiz_name)
|
||||
if block.get("type") == assessment_type:
|
||||
data_field = "exercise" if assessment_type == "program" else assessment_type
|
||||
assessment_name = block.get("data", {}).get(data_field)
|
||||
assessments.append(assessment_name)
|
||||
|
||||
return assessments
|
||||
|
||||
@@ -2365,3 +2367,17 @@ def search_users_by_role(txt: str = "", roles: str | list | None = None, page_le
|
||||
{"value": r.name, "description": r.full_name or r.name, "label": r.full_name or r.name}
|
||||
for r in results
|
||||
]
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def export_course_as_zip(course_name: str):
|
||||
if not can_modify_course(course_name):
|
||||
frappe.throw(_("You do not have permission to export this course."), frappe.PermissionError)
|
||||
|
||||
export_course_zip(course_name)
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def import_course_from_zip(zip_file_path: str):
|
||||
frappe.only_for(["Moderator", "Course Creator"])
|
||||
return import_course_zip(zip_file_path)
|
||||
|
||||
@@ -0,0 +1,773 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import secrets
|
||||
import shutil
|
||||
import tempfile
|
||||
import zipfile
|
||||
from datetime import date, datetime, timedelta
|
||||
|
||||
import frappe
|
||||
from frappe import _
|
||||
from frappe.utils import escape_html, validate_email_address
|
||||
from frappe.utils.file_manager import is_safe_path
|
||||
|
||||
from lms.lms.utils import create_user as create_lms_user
|
||||
|
||||
|
||||
def export_course_zip(course_name):
|
||||
course = frappe.get_doc("LMS Course", course_name)
|
||||
chapters = get_chapters_for_export(course.chapters)
|
||||
lessons = get_lessons_for_export(course_name)
|
||||
instructors = get_course_instructors(course)
|
||||
evaluator = get_course_evaluator(course)
|
||||
assets = get_course_assets(course, lessons, instructors, evaluator)
|
||||
assessments, questions, test_cases = get_course_assessments(lessons)
|
||||
safe_time = frappe.utils.now_datetime().strftime("%Y%m%d_%H%M%S")
|
||||
zip_filename = f"{course.name}_{safe_time}_{secrets.token_hex(4)}.zip"
|
||||
create_course_zip(
|
||||
zip_filename,
|
||||
course,
|
||||
chapters,
|
||||
lessons,
|
||||
assets,
|
||||
assessments,
|
||||
questions,
|
||||
test_cases,
|
||||
instructors,
|
||||
evaluator,
|
||||
)
|
||||
|
||||
|
||||
def get_chapters_for_export(chapters: list):
|
||||
chapters_list = []
|
||||
for row in chapters:
|
||||
chapter = frappe.get_doc("Course Chapter", row.chapter)
|
||||
chapters_list.append(chapter)
|
||||
return chapters_list
|
||||
|
||||
|
||||
def get_lessons_for_export(course_name: str):
|
||||
lessons = frappe.get_all("Course Lesson", {"course": course_name}, pluck="name")
|
||||
lessons_list = []
|
||||
for lesson in lessons:
|
||||
lesson_doc = frappe.get_doc("Course Lesson", lesson)
|
||||
lessons_list.append(lesson_doc)
|
||||
return lessons_list
|
||||
|
||||
|
||||
def get_assessment_from_block(block):
|
||||
block_type = block.get("type")
|
||||
data_field = "exercise" if block_type == "program" else block_type
|
||||
name = block.get("data", {}).get(data_field)
|
||||
doctype = get_assessment_map().get(block_type)
|
||||
if frappe.db.exists(doctype, name):
|
||||
return frappe.get_doc(doctype, name)
|
||||
return None
|
||||
|
||||
|
||||
def get_quiz_questions(doc):
|
||||
questions = []
|
||||
for q in doc.questions:
|
||||
question_doc = frappe.get_doc("LMS Question", q.question)
|
||||
questions.append(question_doc.as_dict())
|
||||
return questions
|
||||
|
||||
|
||||
def get_exercise_test_cases(doc):
|
||||
test_cases = []
|
||||
for tc in doc.test_cases:
|
||||
test_case_doc = frappe.get_doc("LMS Test Case", tc.name)
|
||||
test_cases.append(test_case_doc.as_dict())
|
||||
return test_cases
|
||||
|
||||
|
||||
def get_assessments_from_lesson(lesson):
|
||||
assessments, questions, test_cases = [], [], []
|
||||
content = json.loads(lesson.content) if lesson.content else {}
|
||||
for block in content.get("blocks", []):
|
||||
if block.get("type") not in ("quiz", "assignment", "program"):
|
||||
continue
|
||||
doc = get_assessment_from_block(block)
|
||||
if not doc:
|
||||
continue
|
||||
assessments.append(doc.as_dict())
|
||||
if doc.doctype == "LMS Quiz":
|
||||
questions.extend(get_quiz_questions(doc))
|
||||
elif doc.doctype == "LMS Programming Exercise":
|
||||
test_cases.extend(get_exercise_test_cases(doc))
|
||||
return assessments, questions, test_cases
|
||||
|
||||
|
||||
def get_course_assessments(lessons):
|
||||
assessments, questions, test_cases = [], [], []
|
||||
for lesson in lessons:
|
||||
lesson_assessments, lesson_questions, lesson_test_cases = get_assessments_from_lesson(lesson)
|
||||
assessments.extend(lesson_assessments)
|
||||
questions.extend(lesson_questions)
|
||||
test_cases.extend(lesson_test_cases)
|
||||
return assessments, questions, test_cases
|
||||
|
||||
|
||||
def get_course_instructors(course):
|
||||
users = []
|
||||
for instructor in course.instructors:
|
||||
user_info = frappe.db.get_value(
|
||||
"User",
|
||||
instructor.instructor,
|
||||
["name", "full_name", "first_name", "last_name", "email", "user_image"],
|
||||
as_dict=True,
|
||||
)
|
||||
if user_info:
|
||||
users.append(user_info)
|
||||
return users
|
||||
|
||||
|
||||
def get_course_evaluator(course):
|
||||
evaluators = []
|
||||
if course.evaluator and frappe.db.exists("Course Evaluator", course.evaluator):
|
||||
evaluator_info = frappe.get_doc("Course Evaluator", course.evaluator)
|
||||
evaluators.append(evaluator_info)
|
||||
return evaluators
|
||||
|
||||
|
||||
def get_course_assets(course, lessons, instructors, evaluator):
|
||||
assets = []
|
||||
if course.image:
|
||||
assets.append(course.image)
|
||||
for lesson in lessons:
|
||||
content = json.loads(lesson.content) if lesson.content else {}
|
||||
for block in content.get("blocks", []):
|
||||
if block.get("type") == "upload":
|
||||
url = block.get("data", {}).get("file_url")
|
||||
assets.append(url)
|
||||
for instructor in instructors:
|
||||
if instructor.get("user_image"):
|
||||
assets.append(instructor["user_image"])
|
||||
if len(evaluator):
|
||||
assets.append(evaluator[0].user_image)
|
||||
return assets
|
||||
|
||||
|
||||
def read_asset_content(url):
|
||||
try:
|
||||
file_doc = frappe.get_doc("File", {"file_url": url})
|
||||
file_path = file_doc.get_full_path()
|
||||
if not is_safe_path(file_path):
|
||||
return None
|
||||
with open(file_path, "rb") as f:
|
||||
return f.read()
|
||||
except Exception:
|
||||
frappe.log_error(frappe.get_traceback(), f"Could not read asset: {url}")
|
||||
return None
|
||||
|
||||
|
||||
def create_course_zip(
|
||||
zip_filename,
|
||||
course,
|
||||
chapters,
|
||||
lessons,
|
||||
assets,
|
||||
assessments,
|
||||
questions,
|
||||
test_cases,
|
||||
instructors,
|
||||
evaluator,
|
||||
):
|
||||
try:
|
||||
tmp_path = os.path.join(tempfile.gettempdir(), zip_filename)
|
||||
build_course_zip(
|
||||
tmp_path,
|
||||
course,
|
||||
chapters,
|
||||
lessons,
|
||||
assets,
|
||||
assessments,
|
||||
questions,
|
||||
test_cases,
|
||||
instructors,
|
||||
evaluator,
|
||||
)
|
||||
final_path = move_zip_to_private(tmp_path, zip_filename)
|
||||
schedule_file_deletion(final_path, delay_seconds=600) # 10 minutes
|
||||
serve_zip(final_path, zip_filename)
|
||||
except Exception as e:
|
||||
frappe.throw(
|
||||
_("Could not create the course ZIP file. Please try again later. Error: {0}").format(str(e))
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def build_course_zip(
|
||||
tmp_path, course, chapters, lessons, assets, assessments, questions, test_cases, instructors, evaluator
|
||||
):
|
||||
with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zip_file:
|
||||
write_course_json(zip_file, course)
|
||||
write_chapters_json(zip_file, chapters)
|
||||
write_lessons_json(zip_file, lessons)
|
||||
write_assessments_json(zip_file, assessments, questions, test_cases)
|
||||
write_assets(zip_file, assets)
|
||||
write_instructors_json(zip_file, instructors)
|
||||
write_evaluator_json(zip_file, evaluator)
|
||||
|
||||
|
||||
def write_course_json(zip_file, course):
|
||||
zip_file.writestr("course.json", frappe_json_dumps(course.as_dict()))
|
||||
|
||||
|
||||
def write_chapters_json(zip_file, chapters):
|
||||
for chapter in chapters:
|
||||
chapter_data = chapter.as_dict()
|
||||
chapter_json = frappe_json_dumps(chapter_data)
|
||||
safe_name = sanitize_string(chapter.name)
|
||||
zip_file.writestr(f"chapters/{safe_name}.json", chapter_json)
|
||||
|
||||
|
||||
def write_lessons_json(zip_file, lessons):
|
||||
for lesson in lessons:
|
||||
lesson_data = lesson.as_dict()
|
||||
lesson_json = frappe_json_dumps(lesson_data)
|
||||
safe_name = sanitize_string(lesson.name)
|
||||
zip_file.writestr(f"lessons/{safe_name}.json", lesson_json)
|
||||
|
||||
|
||||
def write_assessments_json(zip_file, assessments, questions, test_cases):
|
||||
for question in questions:
|
||||
question_json = frappe_json_dumps(question)
|
||||
safe_name = sanitize_string(question["name"])
|
||||
zip_file.writestr(f"assessments/questions/{safe_name}.json", question_json)
|
||||
|
||||
for test_case in test_cases:
|
||||
test_case_json = frappe_json_dumps(test_case)
|
||||
safe_name = sanitize_string(test_case["name"])
|
||||
zip_file.writestr(f"assessments/test_cases/{safe_name}.json", test_case_json)
|
||||
|
||||
for assessment in assessments:
|
||||
assessment_json = frappe_json_dumps(assessment)
|
||||
doctype = "_".join(assessment["doctype"].lower().split(" "))
|
||||
safe_name = "_".join(sanitize_string(assessment["name"]).split(" "))
|
||||
zip_file.writestr(f"assessments/{doctype}_{safe_name}.json", assessment_json)
|
||||
|
||||
|
||||
def write_assets(zip_file, assets):
|
||||
assets = list(set(assets))
|
||||
for asset in assets:
|
||||
real_path = frappe.get_site_path(asset.lstrip("/"))
|
||||
if not asset or not isinstance(asset, str) or not is_safe_path(real_path):
|
||||
continue
|
||||
|
||||
file_doc = frappe.get_doc("File", {"file_url": asset})
|
||||
file_path = os.path.abspath(file_doc.get_full_path())
|
||||
|
||||
safe_filename = sanitize_string(os.path.basename(asset))
|
||||
zip_file.write(file_path, f"assets/{safe_filename}")
|
||||
|
||||
|
||||
def move_zip_to_private(tmp_path, zip_filename):
|
||||
final_path = os.path.join(frappe.get_site_path("private", "files"), zip_filename)
|
||||
shutil.move(tmp_path, final_path)
|
||||
return final_path
|
||||
|
||||
|
||||
def write_instructors_json(zip_file, instructors):
|
||||
instructors_json = frappe_json_dumps(instructors)
|
||||
zip_file.writestr("instructors.json", instructors_json)
|
||||
|
||||
|
||||
def write_evaluator_json(zip_file, evaluator):
|
||||
if not len(evaluator):
|
||||
return
|
||||
evaluator_json = frappe_json_dumps(evaluator[0].as_dict())
|
||||
zip_file.writestr("evaluator.json", evaluator_json)
|
||||
|
||||
|
||||
def serve_zip(final_path, zip_filename):
|
||||
if not os.path.exists(final_path) or not os.path.isfile(final_path):
|
||||
frappe.throw(_("File not found"))
|
||||
|
||||
safe_filename = sanitize_string(zip_filename)
|
||||
|
||||
try:
|
||||
with open(final_path, "rb") as f:
|
||||
frappe.local.response.filename = safe_filename
|
||||
frappe.local.response.filecontent = f.read()
|
||||
frappe.local.response.type = "download"
|
||||
frappe.local.response.content_type = "application/zip"
|
||||
except Exception as e:
|
||||
frappe.log_error(f"Error serving ZIP file: {str(e)}")
|
||||
frappe.throw(_("Error downloading file"))
|
||||
|
||||
|
||||
def schedule_file_deletion(file_path, delay_seconds=600):
|
||||
frappe.enqueue(
|
||||
delete_file,
|
||||
file_path=file_path,
|
||||
queue="long",
|
||||
timeout=delay_seconds,
|
||||
at_front=False,
|
||||
enqueue_after_commit=True,
|
||||
)
|
||||
|
||||
|
||||
def delete_file(file_path):
|
||||
try:
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
except Exception as e:
|
||||
frappe.log_error(f"Error deleting exported file {file_path}: {e}")
|
||||
|
||||
|
||||
def frappe_json_dumps(data):
|
||||
def default(obj):
|
||||
try:
|
||||
if isinstance(obj, (datetime | date | timedelta)):
|
||||
return str(obj)
|
||||
except Exception as e:
|
||||
frappe.log_error(f"Error serializing object {obj}: {e}")
|
||||
|
||||
return json.dumps(data, indent=4, default=default)
|
||||
|
||||
|
||||
def import_course_zip(zip_file_path):
|
||||
zip_file_path = zip_file_path.lstrip("/")
|
||||
actual_path = frappe.get_site_path(zip_file_path)
|
||||
validate_zip_file(actual_path)
|
||||
|
||||
with zipfile.ZipFile(actual_path, "r") as zip_file:
|
||||
course_data = read_json_from_zip(zip_file, "course.json")
|
||||
if not course_data:
|
||||
frappe.throw(_("Invalid course ZIP: Missing course.json"))
|
||||
|
||||
create_assets(zip_file)
|
||||
create_user_for_instructors(zip_file)
|
||||
create_evaluator(zip_file)
|
||||
course_doc = create_course_doc(course_data)
|
||||
chapter_docs = create_chapter_docs(zip_file, course_doc.name)
|
||||
create_assessment_docs(zip_file)
|
||||
create_lesson_docs(zip_file, course_doc.name, chapter_docs)
|
||||
save_course_structure(zip_file, course_doc, chapter_docs)
|
||||
return course_doc.name
|
||||
|
||||
|
||||
def read_json_from_zip(zip_file, filename):
|
||||
try:
|
||||
with zip_file.open(filename) as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
frappe.log_error(f"Error reading {filename} from ZIP: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def create_user_for_instructors(zip_file):
|
||||
instructors = read_json_from_zip(zip_file, "instructors.json")
|
||||
if not instructors:
|
||||
return
|
||||
for instructor in instructors:
|
||||
if not frappe.db.exists("User", instructor["email"]):
|
||||
create_user(instructor)
|
||||
|
||||
|
||||
def sanitize_string(
|
||||
value,
|
||||
allow_spaces=True,
|
||||
max_length=None,
|
||||
replacement_char=None,
|
||||
escape_html_content=True,
|
||||
strip_whitespace=True,
|
||||
):
|
||||
"""
|
||||
Unified function to sanitize strings for various use cases.
|
||||
|
||||
Args:
|
||||
value: String to sanitize
|
||||
allow_spaces: Whether to allow spaces in the output (True for names, False for filenames)
|
||||
max_length: Maximum length to truncate to (None for no limit)
|
||||
replacement_char: Character to replace invalid chars with (None to remove them)
|
||||
escape_html_content: Whether to escape HTML entities
|
||||
strip_whitespace: Whether to strip leading/trailing whitespace
|
||||
|
||||
Returns:
|
||||
Sanitized string
|
||||
"""
|
||||
if not value:
|
||||
return value
|
||||
|
||||
if strip_whitespace:
|
||||
value = value.strip()
|
||||
if max_length:
|
||||
value = value[:max_length]
|
||||
|
||||
if escape_html_content:
|
||||
value = escape_html(value)
|
||||
|
||||
if allow_spaces:
|
||||
invalid_pattern = r"[^a-zA-Z0-9\s\-\.]"
|
||||
valid_pattern = r"^[a-zA-Z0-9\s\-\.]+$"
|
||||
else:
|
||||
invalid_pattern = r"[^a-zA-Z0-9_\-\.]"
|
||||
valid_pattern = r"^[a-zA-Z0-9_\-\.]+$"
|
||||
|
||||
if replacement_char is None:
|
||||
if not re.match(valid_pattern, value):
|
||||
value = re.sub(invalid_pattern, "", value)
|
||||
else:
|
||||
value = re.sub(invalid_pattern, replacement_char, value)
|
||||
|
||||
return value
|
||||
|
||||
|
||||
def validate_user_email(user):
|
||||
if not user.get("email") or not validate_email_address(user["email"]):
|
||||
frappe.throw(f"Invalid email for user creation: {user.get('email')}")
|
||||
|
||||
|
||||
def get_user_names(user):
|
||||
first_name = sanitize_string(user.get("first_name", ""), max_length=50)
|
||||
last_name = sanitize_string(user.get("last_name", ""), max_length=50)
|
||||
full_name = sanitize_string(user.get("full_name", ""), max_length=100)
|
||||
parts = full_name.split() if full_name else []
|
||||
return (
|
||||
first_name or (parts[0] if parts else "Imported"),
|
||||
last_name or (" ".join(parts[1:]) if len(parts) > 1 else None),
|
||||
full_name,
|
||||
)
|
||||
|
||||
|
||||
def create_user(user):
|
||||
first_name, last_name, full_name = get_user_names(user)
|
||||
user_doc = create_lms_user(
|
||||
email=user["email"],
|
||||
first_name=first_name,
|
||||
last_name=last_name,
|
||||
full_name=full_name,
|
||||
user_image=user.get("user_image"),
|
||||
roles=["Course Creator"],
|
||||
)
|
||||
return user_doc
|
||||
|
||||
|
||||
def create_evaluator(zip_file):
|
||||
evaluator_data = read_json_from_zip(zip_file, "evaluator.json")
|
||||
if not evaluator_data:
|
||||
return
|
||||
|
||||
if not evaluator_data.get("evaluator") or not validate_email_address(evaluator_data.get("evaluator", "")):
|
||||
frappe.log_error(f"Invalid evaluator data: {evaluator_data}")
|
||||
return
|
||||
|
||||
if not frappe.db.exists("User", evaluator_data["evaluator"]):
|
||||
create_user(evaluator_data)
|
||||
|
||||
if not frappe.db.exists("Course Evaluator", evaluator_data["name"]):
|
||||
evaluator_doc = frappe.new_doc("Course Evaluator")
|
||||
evaluator_doc.update(evaluator_data)
|
||||
evaluator_doc.insert(ignore_permissions=True)
|
||||
|
||||
|
||||
def get_course_fields():
|
||||
return [
|
||||
"title",
|
||||
"tags",
|
||||
"image",
|
||||
"video_link",
|
||||
"card_gradient",
|
||||
"short_introduction",
|
||||
"description",
|
||||
"published",
|
||||
"upcoming",
|
||||
"featured",
|
||||
"disable_self_learning",
|
||||
"published_on",
|
||||
"category",
|
||||
"evaluator",
|
||||
"timezone",
|
||||
"paid_course",
|
||||
"paid_certificate",
|
||||
"course_price",
|
||||
"currency",
|
||||
"amount_usd",
|
||||
"enable_certification",
|
||||
]
|
||||
|
||||
|
||||
def add_data_to_course(course_doc, course_data):
|
||||
for field in get_course_fields():
|
||||
if field in course_data:
|
||||
course_doc.set(field, course_data[field])
|
||||
|
||||
|
||||
def add_instructors_to_course(course_doc, course_data):
|
||||
instructors = course_data.get("instructors", [])
|
||||
for instructor in instructors:
|
||||
course_doc.append("instructors", {"instructor": instructor["instructor"]})
|
||||
|
||||
|
||||
def verify_category(category_name):
|
||||
if category_name and not frappe.db.exists("LMS Category", category_name):
|
||||
category = frappe.new_doc("LMS Category")
|
||||
category.category = category_name
|
||||
category.insert(ignore_permissions=True)
|
||||
|
||||
|
||||
def create_course_doc(course_data):
|
||||
course_doc = frappe.new_doc("LMS Course")
|
||||
add_instructors_to_course(course_doc, course_data)
|
||||
verify_category(course_data.get("category"))
|
||||
course_data.pop("instructors", None)
|
||||
course_data.pop("chapters", None)
|
||||
add_data_to_course(course_doc, course_data)
|
||||
course_doc.insert(ignore_permissions=True)
|
||||
return course_doc
|
||||
|
||||
|
||||
def exclude_meta_fields(data):
|
||||
meta_fields = ["name", "owner", "creation", "created_by", "modified", "modified_by", "docstatus"]
|
||||
return {k: v for k, v in data.items() if k not in meta_fields}
|
||||
|
||||
|
||||
def create_chapter_docs(zip_file, course_name):
|
||||
chapter_docs = []
|
||||
for file in zip_file.namelist():
|
||||
if file.startswith("chapters/") and file.endswith(".json"):
|
||||
chapter_data = read_json_from_zip(zip_file, file)
|
||||
chapter_data = exclude_meta_fields(chapter_data)
|
||||
if chapter_data:
|
||||
chapter_doc = frappe.new_doc("Course Chapter")
|
||||
chapter_data.pop("lessons", None)
|
||||
chapter_doc.update(chapter_data)
|
||||
chapter_doc.course = course_name
|
||||
chapter_doc.insert(ignore_permissions=True)
|
||||
chapter_docs.append(chapter_doc)
|
||||
return chapter_docs
|
||||
|
||||
|
||||
def get_chapter_name_for_lesson(zip_file, lesson_data, chapter_docs):
|
||||
for file in zip_file.namelist():
|
||||
if file.startswith("chapters/") and file.endswith(".json"):
|
||||
chapter_data = read_json_from_zip(zip_file, file)
|
||||
if chapter_data.get("name") == lesson_data.get("chapter"):
|
||||
title = chapter_data.get("title")
|
||||
chapter_doc = next((c for c in chapter_docs if c.title == title), None)
|
||||
if chapter_doc:
|
||||
return chapter_doc.name
|
||||
return None
|
||||
|
||||
|
||||
def get_assessment_map():
|
||||
return {"quiz": "LMS Quiz", "assignment": "LMS Assignment", "program": "LMS Programming Exercise"}
|
||||
|
||||
|
||||
def get_assessment_title(zip_file, assessment_name, assessment_type):
|
||||
assessment_map = get_assessment_map()
|
||||
doctype = "_".join(assessment_map.get(assessment_type).lower().split(" "))
|
||||
assessment_name = "_".join(assessment_name.split(" "))
|
||||
file_name = f"assessments/{doctype}_{assessment_name}.json"
|
||||
try:
|
||||
with zip_file.open(file_name) as f:
|
||||
assessment_data = json.load(f)
|
||||
return assessment_data.get("title")
|
||||
except Exception as e:
|
||||
frappe.log_error(f"Error reading {file_name} from ZIP: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def replace_assessment_names(zip_file, content):
|
||||
assessment_types = ["quiz", "assignment", "program"]
|
||||
content = json.loads(content)
|
||||
for block in content.get("blocks", []):
|
||||
if block.get("type") in assessment_types:
|
||||
data_field = "exercise" if block.get("type") == "program" else block.get("type")
|
||||
assessment_name = block.get("data", {}).get(data_field)
|
||||
assessment_title = get_assessment_title(zip_file, assessment_name, block.get("type"))
|
||||
doctype = get_assessment_map().get(block.get("type"))
|
||||
current_assessment_name = frappe.db.get_value(doctype, {"title": assessment_title}, "name")
|
||||
if current_assessment_name:
|
||||
block["data"][data_field] = current_assessment_name
|
||||
return json.dumps(content)
|
||||
|
||||
|
||||
def replace_assets(content):
|
||||
content = json.loads(content)
|
||||
for block in content.get("blocks", []):
|
||||
if block.get("type") == "upload":
|
||||
asset_url = block.get("data", {}).get("file_url")
|
||||
if asset_url:
|
||||
asset_name = asset_url.split("/")[-1]
|
||||
current_asset_url = frappe.db.get_value("LMS Asset", {"file_name": asset_name}, "file_url")
|
||||
if current_asset_url:
|
||||
block["data"]["url"] = current_asset_url
|
||||
|
||||
|
||||
def replace_values_in_content(zip_file, content):
|
||||
return replace_assessment_names(zip_file, content)
|
||||
# replace_assets(content)
|
||||
|
||||
|
||||
def create_lesson_docs(zip_file, course_name, chapter_docs):
|
||||
lesson_docs = []
|
||||
for file in zip_file.namelist():
|
||||
if file.startswith("lessons/") and file.endswith(".json"):
|
||||
lesson_data = read_json_from_zip(zip_file, file)
|
||||
lesson_data = exclude_meta_fields(lesson_data)
|
||||
if lesson_data:
|
||||
lesson_doc = frappe.new_doc("Course Lesson")
|
||||
lesson_doc.update(lesson_data)
|
||||
lesson_doc.course = course_name
|
||||
lesson_doc.chapter = get_chapter_name_for_lesson(zip_file, lesson_data, chapter_docs)
|
||||
lesson_doc.content = (
|
||||
replace_values_in_content(zip_file, lesson_doc.content) if lesson_doc.content else None
|
||||
)
|
||||
lesson_doc.insert(ignore_permissions=True)
|
||||
lesson_docs.append(lesson_doc)
|
||||
return lesson_docs
|
||||
|
||||
|
||||
def create_question_doc(zip_file, file):
|
||||
question_data = read_json_from_zip(zip_file, file)
|
||||
if question_data:
|
||||
doc = frappe.new_doc("LMS Question")
|
||||
doc.update(question_data)
|
||||
doc.insert(ignore_permissions=True)
|
||||
|
||||
|
||||
def create_test_case_doc(zip_file, file):
|
||||
test_case_data = read_json_from_zip(zip_file, file)
|
||||
if test_case_data:
|
||||
doc = frappe.new_doc("LMS Test Case")
|
||||
doc.update(test_case_data)
|
||||
doc.insert(ignore_permissions=True)
|
||||
|
||||
|
||||
def add_questions_to_quiz(quiz_doc, questions):
|
||||
for question in questions:
|
||||
question_detail = question["question_detail"]
|
||||
question_name = frappe.db.get_value("LMS Question", {"question": question_detail}, "name")
|
||||
if question_name:
|
||||
quiz_doc.append("questions", {"question": question_name})
|
||||
|
||||
|
||||
def create_supporting_docs(zip_file):
|
||||
for file in zip_file.namelist():
|
||||
if file.startswith("assessments/questions/") and file.endswith(".json"):
|
||||
create_question_doc(zip_file, file)
|
||||
elif file.startswith("assessments/test_cases/") and file.endswith(".json"):
|
||||
create_test_case_doc(zip_file, file)
|
||||
|
||||
|
||||
def is_assessment_file(file):
|
||||
return (
|
||||
file.startswith("assessments/")
|
||||
and file.endswith(".json")
|
||||
and not file.startswith("assessments/questions/")
|
||||
and not file.startswith("assessments/test_cases/")
|
||||
)
|
||||
|
||||
|
||||
def build_assessment_doc(assessment_data):
|
||||
doctype = assessment_data.get("doctype")
|
||||
if doctype not in ("LMS Quiz", "LMS Assignment", "LMS Programming Exercise"):
|
||||
return
|
||||
if frappe.db.exists(doctype, assessment_data.get("name")):
|
||||
return
|
||||
|
||||
questions = assessment_data.pop("questions", [])
|
||||
test_cases = assessment_data.pop("test_cases", [])
|
||||
doc = frappe.new_doc(doctype)
|
||||
doc.update(assessment_data)
|
||||
|
||||
if doctype == "LMS Quiz":
|
||||
add_questions_to_quiz(doc, questions)
|
||||
elif doctype == "LMS Programming Exercise":
|
||||
for row in test_cases:
|
||||
doc.append("test_cases", {"input": row["input"], "expected_output": row["expected_output"]})
|
||||
|
||||
doc.insert(ignore_permissions=True)
|
||||
|
||||
|
||||
def create_main_assessment_docs(zip_file):
|
||||
for file in zip_file.namelist():
|
||||
if not is_assessment_file(file):
|
||||
continue
|
||||
assessment_data = read_json_from_zip(zip_file, file)
|
||||
if not assessment_data:
|
||||
continue
|
||||
assessment_data.pop("lesson", None)
|
||||
assessment_data.pop("course", None)
|
||||
build_assessment_doc(assessment_data)
|
||||
|
||||
|
||||
def create_assessment_docs(zip_file):
|
||||
create_supporting_docs(zip_file)
|
||||
create_main_assessment_docs(zip_file)
|
||||
|
||||
|
||||
def create_asset_doc(asset_name, content):
|
||||
if frappe.db.exists("File", {"file_name": asset_name}):
|
||||
return
|
||||
asset_doc = frappe.new_doc("File")
|
||||
asset_doc.file_name = asset_name
|
||||
asset_doc.content = content
|
||||
asset_doc.insert()
|
||||
|
||||
|
||||
def process_asset_file(zip_file, file):
|
||||
if not is_safe_path(file):
|
||||
return
|
||||
with zip_file.open(file) as f:
|
||||
create_asset_doc(file.split("/")[-1], f.read())
|
||||
|
||||
|
||||
def create_assets(zip_file):
|
||||
for file in zip_file.namelist():
|
||||
if not file.startswith("assets/") or file.endswith("/"):
|
||||
continue
|
||||
try:
|
||||
process_asset_file(zip_file, file)
|
||||
except Exception as e:
|
||||
frappe.log_error(f"Error processing asset {file}: {e}")
|
||||
|
||||
|
||||
def get_lesson_title(zip_file, lesson_name):
|
||||
for file in zip_file.namelist():
|
||||
if file.startswith("lessons/") and file.endswith(".json"):
|
||||
lesson_data = read_json_from_zip(zip_file, file)
|
||||
if lesson_data.get("name") == lesson_name:
|
||||
return lesson_data.get("title")
|
||||
return None
|
||||
|
||||
|
||||
def add_lessons_to_chapters(zip_file, course_name, chapter_docs):
|
||||
for file in zip_file.namelist():
|
||||
if file.startswith("chapters/") and file.endswith(".json"):
|
||||
chapter_data = read_json_from_zip(zip_file, file)
|
||||
chapter_doc = next((c for c in chapter_docs if c.title == chapter_data.get("title")), None)
|
||||
if not chapter_doc:
|
||||
continue
|
||||
for lesson in chapter_data.get("lessons", []):
|
||||
lesson_title = get_lesson_title(zip_file, lesson["lesson"])
|
||||
lesson_name = frappe.db.get_value(
|
||||
"Course Lesson", {"title": lesson_title, "course": course_name}, "name"
|
||||
)
|
||||
if lesson_name:
|
||||
chapter_doc.append("lessons", {"lesson": lesson_name})
|
||||
chapter_doc.save(ignore_permissions=True)
|
||||
|
||||
|
||||
def add_chapter_to_course(course_doc, chapter_docs):
|
||||
course_doc.reload()
|
||||
for chapter_doc in chapter_docs:
|
||||
course_doc.append("chapters", {"chapter": chapter_doc.name})
|
||||
course_doc.save(ignore_permissions=True)
|
||||
|
||||
|
||||
def save_course_structure(zip_file, course_doc, chapter_docs):
|
||||
add_chapter_to_course(course_doc, chapter_docs)
|
||||
add_lessons_to_chapters(zip_file, course_doc.name, chapter_docs)
|
||||
|
||||
|
||||
def validate_zip_file(zip_file_path):
|
||||
if not os.path.exists(zip_file_path) or not zipfile.is_zipfile(zip_file_path):
|
||||
frappe.throw(_("Invalid ZIP file"))
|
||||
|
||||
if not is_safe_path(zip_file_path):
|
||||
frappe.throw(_("Unsafe file path detected"))
|
||||
@@ -10,9 +10,9 @@
|
||||
"field_order": [
|
||||
"title",
|
||||
"include_in_preview",
|
||||
"is_scorm_package",
|
||||
"column_break_4",
|
||||
"chapter",
|
||||
"is_scorm_package",
|
||||
"course",
|
||||
"section_break_11",
|
||||
"content",
|
||||
@@ -160,11 +160,11 @@
|
||||
],
|
||||
"index_web_pages_for_search": 1,
|
||||
"links": [],
|
||||
"modified": "2026-02-20 13:49:25.599827",
|
||||
"modified_by": "Administrator",
|
||||
"modified": "2026-04-01 12:21:25.050340",
|
||||
"modified_by": "sayali@frappe.io",
|
||||
"module": "LMS",
|
||||
"name": "Course Lesson",
|
||||
"naming_rule": "Expression",
|
||||
"naming_rule": "Expression (old style)",
|
||||
"owner": "Administrator",
|
||||
"permissions": [
|
||||
{
|
||||
|
||||
@@ -224,9 +224,7 @@ def update_course_statistics():
|
||||
|
||||
for course in courses:
|
||||
lessons = get_lesson_count(course.name)
|
||||
|
||||
enrollments = frappe.db.count("LMS Enrollment", {"course": course.name, "member_type": "Student"})
|
||||
|
||||
avg_rating = get_average_rating(course.name) or 0
|
||||
avg_rating = flt(avg_rating, frappe.get_system_settings("float_precision") or 3)
|
||||
|
||||
|
||||
@@ -50,6 +50,11 @@ class LMSEnrollment(Document):
|
||||
)
|
||||
|
||||
if self.enrollment_from_batch:
|
||||
if not frappe.db.exists(
|
||||
"Batch Course", {"parent": self.enrollment_from_batch, "course": self.course}
|
||||
):
|
||||
frappe.throw(_("This batch is not associated with this course."))
|
||||
|
||||
if frappe.db.exists(
|
||||
"LMS Batch Enrollment", {"batch": self.enrollment_from_batch, "member": self.member}
|
||||
):
|
||||
|
||||
@@ -34,8 +34,7 @@
|
||||
"fieldtype": "Data",
|
||||
"in_list_view": 1,
|
||||
"label": "Title",
|
||||
"reqd": 1,
|
||||
"unique": 1
|
||||
"reqd": 1
|
||||
},
|
||||
{
|
||||
"fieldname": "questions",
|
||||
@@ -159,7 +158,7 @@
|
||||
"link_fieldname": "quiz"
|
||||
}
|
||||
],
|
||||
"modified": "2026-03-25 20:22:22.124828",
|
||||
"modified": "2026-04-01 16:56:28.727089",
|
||||
"modified_by": "sayali@frappe.io",
|
||||
"module": "LMS",
|
||||
"name": "LMS Quiz",
|
||||
|
||||
+94
-1
@@ -1,6 +1,17 @@
|
||||
import glob
|
||||
import os
|
||||
import re
|
||||
import zipfile
|
||||
|
||||
import frappe
|
||||
|
||||
from lms.lms.api import get_certified_participants, get_course_assessment_progress
|
||||
from lms.lms.api import (
|
||||
export_course_as_zip,
|
||||
get_certified_participants,
|
||||
get_course_assessment_progress,
|
||||
import_course_from_zip,
|
||||
)
|
||||
from lms.lms.course_import_export import sanitize_string
|
||||
from lms.lms.test_helpers import BaseTestUtils
|
||||
|
||||
|
||||
@@ -83,3 +94,85 @@ class TestLMSAPI(BaseTestUtils):
|
||||
)
|
||||
self.assertEqual(result.is_correct, 1 if index % 2 == 0 else 0)
|
||||
self.assertEqual(result.marks, 5 if index % 2 == 0 else 0)
|
||||
|
||||
def test_export_course_as_zip(self):
|
||||
latest_file = self.get_latest_zip_file()
|
||||
self.assertTrue(latest_file)
|
||||
self.assertTrue(latest_file.endswith(".zip"))
|
||||
expected_name_pattern = re.escape(self.course.name) + r"_\d{8}_\d{6}_[a-f0-9]{8}\.zip"
|
||||
self.assertRegex(latest_file, expected_name_pattern)
|
||||
with zipfile.ZipFile(latest_file, "r") as zip_ref:
|
||||
expected_files = [
|
||||
"course.json",
|
||||
"instructors.json",
|
||||
]
|
||||
for expected_file in expected_files:
|
||||
self.assertIn(expected_file, zip_ref.namelist())
|
||||
chapter_files = [
|
||||
f for f in zip_ref.namelist() if f.startswith("chapters/") and f.endswith(".json")
|
||||
]
|
||||
self.assertEqual(len(chapter_files), 3)
|
||||
lesson_files = [f for f in zip_ref.namelist() if f.startswith("lessons/") and f.endswith(".json")]
|
||||
self.assertEqual(len(lesson_files), 12)
|
||||
assessment_files = [
|
||||
f
|
||||
for f in zip_ref.namelist()
|
||||
if f.startswith("assessments/") and f.endswith(".json") and len(f.split("/")) == 2
|
||||
]
|
||||
self.assertEqual(len(assessment_files), 3)
|
||||
|
||||
def get_latest_zip_file(self):
|
||||
export_course_as_zip(self.course.name)
|
||||
site_path = frappe.get_site_path("private", "files")
|
||||
zip_files = glob.glob(os.path.join(site_path, f"{self.course.name}_*.zip"))
|
||||
latest_file = max(zip_files, key=os.path.getctime) if zip_files else None
|
||||
return latest_file
|
||||
|
||||
def test_import_course_from_zip(self):
|
||||
imported_course = self.get_imported_course()
|
||||
self.assertEqual(imported_course.title, self.course.title)
|
||||
self.assertEqual(imported_course.category, self.course.category)
|
||||
# self.assertEqual(imported_course.lessons, self.course.lessons)
|
||||
self.assertEqual(len(imported_course.instructors), len(self.course.instructors))
|
||||
self.assertEqual(imported_course.instructors[0].instructor, self.course.instructors[0].instructor)
|
||||
imported_first_chapter = frappe.get_doc("Course Chapter", self.course.chapters[0].chapter)
|
||||
original_first_chapter = frappe.get_doc("Course Chapter", self.course.chapters[0].chapter)
|
||||
self.assertEqual(imported_first_chapter.title, original_first_chapter.title)
|
||||
imported_first_lesson = frappe.get_doc("Course Lesson", imported_first_chapter.lessons[0].lesson)
|
||||
original_first_lesson = frappe.get_doc("Course Lesson", original_first_chapter.lessons[0].lesson)
|
||||
self.assertEqual(imported_first_lesson.title, original_first_lesson.title)
|
||||
self.assertEqual(imported_first_lesson.content, original_first_lesson.content)
|
||||
self.cleanup_imported_course(imported_course.name)
|
||||
|
||||
def get_imported_course(self):
|
||||
latest_file = self.get_latest_zip_file()
|
||||
self.assertTrue(latest_file)
|
||||
zip_file_path = f"/{'/'.join(latest_file.split('/')[2:])}"
|
||||
imported_course_name = import_course_from_zip(zip_file_path)
|
||||
imported_course = frappe.get_doc("LMS Course", imported_course_name)
|
||||
return imported_course
|
||||
|
||||
def cleanup_imported_course(self, course_name):
|
||||
self.cleanup_items.append(("LMS Course", course_name))
|
||||
self.cleanup_imported_assessment("LMS Quiz", self.quiz)
|
||||
self.cleanup_imported_assessment("LMS Assignment", self.assignment)
|
||||
self.cleanup_imported_assessment("LMS Programming Exercise", self.programming_exercise)
|
||||
|
||||
def cleanup_imported_assessment(self, doctype, doc):
|
||||
imported_assessment = frappe.db.get_value(
|
||||
doctype, {"title": doc.title, "name": ["!=", doc.name]}, "name"
|
||||
)
|
||||
if imported_assessment:
|
||||
self.cleanup_items.append((doctype, imported_assessment))
|
||||
|
||||
def test_sanitize_string_filename_behavior(self):
|
||||
result = sanitize_string(
|
||||
"my file@name!.txt", allow_spaces=False, replacement_char="_", escape_html_content=False
|
||||
)
|
||||
self.assertEqual(result, "my_file_name_.txt")
|
||||
|
||||
def test_sanitize_string_name_field_behavior(self):
|
||||
result = sanitize_string(
|
||||
"John#Doe$", allow_spaces=True, max_length=50, replacement_char=None, escape_html_content=True
|
||||
)
|
||||
self.assertEqual(result, "JohnDoe")
|
||||
|
||||
@@ -7,6 +7,7 @@ from frappe.utils import getdate, to_timedelta
|
||||
from lms.lms.doctype.lms_certificate.lms_certificate import is_certified
|
||||
from lms.lms.test_helpers import BaseTestUtils
|
||||
from lms.lms.utils import (
|
||||
create_user,
|
||||
get_average_rating,
|
||||
get_batch_details,
|
||||
get_chapters,
|
||||
@@ -157,3 +158,24 @@ class TestLMSUtils(BaseTestUtils):
|
||||
self.assertEqual(batch_details.evaluation_end_date, getdate(self.batch.evaluation_end_date))
|
||||
self.assertEqual(len(batch_details.instructors), len(self.batch.instructors))
|
||||
self.assertEqual(len(batch_details.students), 2)
|
||||
|
||||
def test_create_user(self):
|
||||
user = create_user(
|
||||
email="testuser@example.com", first_name="Test", last_name="User", roles=["LMS Student"]
|
||||
)
|
||||
self.assertEqual(user.email, "testuser@example.com")
|
||||
self.assertEqual(user.first_name, "Test")
|
||||
self.assertEqual(user.last_name, "User")
|
||||
self.assertEqual(user.full_name, "Test User")
|
||||
self.assertIn("LMS Student", [role.role for role in user.roles])
|
||||
self.cleanup_items.append(("User", user.name))
|
||||
|
||||
def test_create_user_with_full_name(self):
|
||||
user = create_user(
|
||||
email="fullnameuser@example.com", full_name="John Michael Doe", roles=["Course Creator"]
|
||||
)
|
||||
self.assertEqual(user.first_name, "John")
|
||||
self.assertEqual(user.last_name, "Michael Doe")
|
||||
self.assertEqual(user.full_name, "John Michael Doe")
|
||||
self.assertIn("Course Creator", [role.role for role in user.roles])
|
||||
self.cleanup_items.append(("User", user.name))
|
||||
|
||||
+5
-3
@@ -1,7 +1,7 @@
|
||||
import frappe
|
||||
from frappe import _
|
||||
from frappe.model.naming import append_number_if_name_exists
|
||||
from frappe.utils import escape_html, random_string
|
||||
from frappe.utils import cint, escape_html, random_string
|
||||
from frappe.website.utils import cleanup_page_name, is_signup_disabled
|
||||
|
||||
from lms.lms.utils import get_country_code, get_lms_route
|
||||
@@ -23,7 +23,7 @@ def after_insert(doc, method):
|
||||
doc.add_roles("LMS Student")
|
||||
|
||||
|
||||
@frappe.whitelist(allow_guest=True)
|
||||
@frappe.whitelist(allow_guest=True) # nosemgrep: frappe-semgrep-rules.rules.security.guest-whitelisted-method
|
||||
def sign_up(email: str, full_name: str, verify_terms: bool, user_category: str):
|
||||
if is_signup_disabled():
|
||||
frappe.throw(_("Sign Up is disabled"), _("Not Allowed"))
|
||||
@@ -35,7 +35,9 @@ def sign_up(email: str, full_name: str, verify_terms: bool, user_category: str):
|
||||
else:
|
||||
return 0, _("Registered but disabled")
|
||||
else:
|
||||
if frappe.db.get_creation_count("User", 60) > 300:
|
||||
max_signups_allowed_per_hour = cint(frappe.get_system_settings("max_signups_allowed_per_hour") or 300)
|
||||
users_created_past_hour = frappe.db.get_creation_count("User", 60)
|
||||
if users_created_past_hour >= max_signups_allowed_per_hour:
|
||||
frappe.respond_as_web_page(
|
||||
_("Temporarily Disabled"),
|
||||
_(
|
||||
|
||||
+44
-1
@@ -23,6 +23,7 @@ from frappe.utils import (
|
||||
nowtime,
|
||||
pretty_date,
|
||||
rounded,
|
||||
validate_email_address,
|
||||
)
|
||||
from pypika import Case
|
||||
from pypika import functions as fn
|
||||
@@ -86,6 +87,49 @@ def generate_slug(title: str, doctype: str):
|
||||
return slugify(title, used_slugs=slugs)
|
||||
|
||||
|
||||
def process_user_names(first_name, last_name, full_name):
|
||||
if not first_name and full_name:
|
||||
name_parts = full_name.split()
|
||||
first_name = name_parts[0] if name_parts else "User"
|
||||
last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else ""
|
||||
|
||||
if not full_name:
|
||||
full_name = f"{first_name} {last_name or ''}".strip()
|
||||
|
||||
return first_name, last_name or "", full_name
|
||||
|
||||
|
||||
def create_user_document(email, first_name, last_name, full_name, user_image=None, roles=None):
|
||||
user_doc = frappe.new_doc("User")
|
||||
user_doc.email = email
|
||||
user_doc.first_name = first_name
|
||||
user_doc.last_name = last_name
|
||||
user_doc.full_name = full_name
|
||||
user_doc.user_image = user_image
|
||||
user_doc.send_welcome_email = False
|
||||
if not roles:
|
||||
roles = ["LMS Student"]
|
||||
for role in roles:
|
||||
user_doc.append("roles", {"role": role})
|
||||
user_doc.insert()
|
||||
return user_doc
|
||||
|
||||
|
||||
def create_user(email, first_name=None, last_name=None, full_name=None, user_image=None, roles=None):
|
||||
validate_email_address(email, True)
|
||||
print(email)
|
||||
print(frappe.db.exists("User", email))
|
||||
existing_user = frappe.db.exists("User", email)
|
||||
print("existing_user", existing_user)
|
||||
if existing_user:
|
||||
print("User already exists")
|
||||
return frappe.get_doc("User", email)
|
||||
|
||||
first_name, last_name, full_name = process_user_names(first_name, last_name, full_name)
|
||||
user_doc = create_user_document(email, first_name, last_name, full_name, user_image, roles)
|
||||
return user_doc
|
||||
|
||||
|
||||
def get_membership(course: str, member: str = None):
|
||||
if not member:
|
||||
member = frappe.session.user
|
||||
@@ -549,7 +593,6 @@ def get_lesson_count(course: str) -> int:
|
||||
chapters = frappe.get_all("Chapter Reference", {"parent": course}, ["chapter"])
|
||||
for chapter in chapters:
|
||||
lesson_count += frappe.db.count("Lesson Reference", {"parent": chapter.chapter})
|
||||
|
||||
return lesson_count
|
||||
|
||||
|
||||
|
||||
+200
-191
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+252
-243
File diff suppressed because it is too large
Load Diff
+252
-243
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+200
-191
File diff suppressed because it is too large
Load Diff
+201
-192
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user