-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathapi.py
3646 lines (3110 loc) · 140 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Instructor Dashboard API views
JSON views which the instructor dashboard requests.
Many of these GETs may become PUTs in the future.
"""
import csv
import datetime
import json
import logging
import string
import random
import re
import dateutil
import pytz
import edx_api_doc_tools as apidocs
from django.conf import settings
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
from django.core.exceptions import MultipleObjectsReturned, ObjectDoesNotExist, PermissionDenied, ValidationError
from django.core.validators import validate_email
from django.db import IntegrityError, transaction
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseNotFound
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.html import strip_tags
from django.utils.translation import gettext as _
from django.views.decorators.cache import cache_control
from django.views.decorators.csrf import ensure_csrf_cookie
from django.views.decorators.http import require_POST, require_http_methods
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser
from edx_when.api import get_date_for_block
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey, UsageKey
from openedx.core.djangoapps.course_groups.cohorts import get_cohort_by_name
from rest_framework import serializers, status # lint-amnesty, pylint: disable=wrong-import-order
from rest_framework.permissions import IsAdminUser, IsAuthenticated # lint-amnesty, pylint: disable=wrong-import-order
from rest_framework.response import Response # lint-amnesty, pylint: disable=wrong-import-order
from rest_framework.views import APIView # lint-amnesty, pylint: disable=wrong-import-order
from submissions import api as sub_api # installed from the edx-submissions repository # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.exceptions import ItemNotFoundError # lint-amnesty, pylint: disable=wrong-import-order
from common.djangoapps.course_modes.models import CourseMode
from common.djangoapps.student import auth
from common.djangoapps.student.api import is_user_enrolled_in_course
from common.djangoapps.student.models import (
ALLOWEDTOENROLL_TO_ENROLLED,
ALLOWEDTOENROLL_TO_UNENROLLED,
CourseEnrollment,
CourseEnrollmentAllowed,
DEFAULT_TRANSITION_STATE,
ENROLLED_TO_ENROLLED,
ENROLLED_TO_UNENROLLED,
EntranceExamConfiguration,
ManualEnrollmentAudit,
Registration,
UNENROLLED_TO_ALLOWEDTOENROLL,
UNENROLLED_TO_ENROLLED,
UNENROLLED_TO_UNENROLLED,
UserProfile,
get_user_by_username_or_email,
is_email_retired,
)
from common.djangoapps.student.roles import CourseFinanceAdminRole, CourseSalesAdminRole
from common.djangoapps.util.file import (
FileValidationException,
course_and_time_based_filename_generator,
store_uploaded_file,
)
from common.djangoapps.util.json_request import JsonResponse, JsonResponseBadRequest
from common.djangoapps.util.views import require_global_staff # pylint: disable=unused-import
from lms.djangoapps.bulk_email.api import is_bulk_email_feature_enabled, create_course_email
from lms.djangoapps.certificates import api as certs_api
from lms.djangoapps.certificates.models import (
CertificateStatuses
)
from lms.djangoapps.course_home_api.toggles import course_home_mfe_progress_tab_is_active
from lms.djangoapps.courseware.access import has_access
from lms.djangoapps.courseware.courses import get_course_with_access
from lms.djangoapps.courseware.models import StudentModule
from lms.djangoapps.discussion.django_comment_client.utils import (
get_group_id_for_user,
get_group_name,
has_forum_access,
)
from lms.djangoapps.instructor import enrollment
from lms.djangoapps.instructor.access import ROLES, allow_access, list_with_level, revoke_access, update_forum_role
from lms.djangoapps.instructor.constants import INVOICE_KEY
from lms.djangoapps.instructor.enrollment import (
enroll_email,
get_email_params,
get_user_email_language,
send_beta_role_email,
send_mail_to_student,
unenroll_email,
)
from lms.djangoapps.instructor.views.instructor_task_helpers import extract_email_features, extract_task_features
from lms.djangoapps.instructor_analytics import basic as instructor_analytics_basic, csvs as instructor_analytics_csvs
from lms.djangoapps.instructor_task import api as task_api
from lms.djangoapps.instructor_task.api_helper import AlreadyRunningError, QueueConnectionError
from lms.djangoapps.instructor_task.data import InstructorTaskTypes
from lms.djangoapps.instructor_task.models import ReportStore
from lms.djangoapps.instructor.views.serializer import RoleNameSerializer, UserSerializer
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
from openedx.core.djangoapps.course_groups.cohorts import add_user_to_cohort, is_course_cohorted
from openedx.core.djangoapps.course_groups.models import CourseUserGroup
from openedx.core.djangoapps.django_comment_common.models import (
CourseDiscussionSettings,
FORUM_ROLE_ADMINISTRATOR,
FORUM_ROLE_COMMUNITY_TA,
FORUM_ROLE_GROUP_MODERATOR,
FORUM_ROLE_MODERATOR,
Role,
)
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
from openedx.core.djangoapps.user_api.preferences.api import get_user_preference
from openedx.core.djangolib.markup import HTML, Text
from openedx.core.lib.api.authentication import BearerAuthenticationAllowInactiveUser
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin, view_auth_classes
from openedx.core.lib.courses import get_course_by_id
from openedx.core.lib.api.serializers import CourseKeyField
from openedx.features.course_experience.url_helpers import get_learning_mfe_home_url
from .tools import (
dump_block_extensions,
dump_student_extensions,
find_unit,
get_student_from_identifier,
handle_dashboard_error,
keep_field_private,
parse_datetime,
require_student_from_identifier,
set_due_date_extension,
strip_if_string,
)
from .. import permissions
log = logging.getLogger(__name__)
TASK_SUBMISSION_OK = 'created'
SUCCESS_MESSAGE_TEMPLATE = _("The {report_type} report is being created. "
"To view the status of the report, see Pending Tasks below.")
def common_exceptions_400(func):
"""
Catches common exceptions and renders matching 400 errors.
(decorator without arguments)
"""
def wrapped(request, *args, **kwargs):
use_json = (request.headers.get('x-requested-with') == 'XMLHttpRequest' or
request.META.get("HTTP_ACCEPT", "").startswith("application/json"))
try:
return func(request, *args, **kwargs)
except User.DoesNotExist:
message = _('User does not exist.')
except MultipleObjectsReturned:
message = _('Found a conflict with given identifier. Please try an alternative identifier')
except (AlreadyRunningError, QueueConnectionError, AttributeError) as err:
message = str(err)
if use_json:
return JsonResponseBadRequest(message)
else:
return HttpResponseBadRequest(message)
return wrapped
def require_post_params(*args, **kwargs):
"""
Checks for required parameters or renders a 400 error.
(decorator with arguments)
`args` is a *list of required POST parameter names.
`kwargs` is a **dict of required POST parameter names
to string explanations of the parameter
"""
required_params = []
required_params += [(arg, None) for arg in args]
required_params += [(key, kwargs[key]) for key in kwargs]
# required_params = e.g. [('action', 'enroll or unenroll'), ['emails', None]]
def decorator(func):
def wrapped(*args, **kwargs):
request = args[0]
error_response_data = {
'error': 'Missing required query parameter(s)',
'parameters': [],
'info': {},
}
for (param, extra) in required_params:
default = object()
if request.POST.get(param, default) == default:
error_response_data['parameters'].append(param)
error_response_data['info'][param] = extra
if error_response_data['parameters']:
return JsonResponse(error_response_data, status=400)
else:
return func(*args, **kwargs)
return wrapped
return decorator
def require_course_permission(permission):
"""
Decorator with argument that requires a specific permission of the requesting
user. If the requirement is not satisfied, returns an
HttpResponseForbidden (403).
Assumes that request is in args[0].
Assumes that course_id is in kwargs['course_id'].
"""
def decorator(func):
def wrapped(*args, **kwargs):
request = args[0]
course = get_course_by_id(CourseKey.from_string(kwargs['course_id']))
if request.user.has_perm(permission, course):
return func(*args, **kwargs)
else:
return HttpResponseForbidden()
return wrapped
return decorator
def require_sales_admin(func):
"""
Decorator for checking sales administrator access before executing an HTTP endpoint. This decorator
is designed to be used for a request based action on a course. It assumes that there will be a
request object as well as a course_id attribute to leverage to check course level privileges.
If the user does not have privileges for this operation, this will return HttpResponseForbidden (403).
"""
def wrapped(request, course_id):
try:
course_key = CourseKey.from_string(course_id)
except InvalidKeyError:
log.error("Unable to find course with course key %s", course_id)
return HttpResponseNotFound()
access = auth.user_has_role(request.user, CourseSalesAdminRole(course_key))
if access:
return func(request, course_id)
else:
return HttpResponseForbidden()
return wrapped
def require_finance_admin(func):
"""
Decorator for checking finance administrator access before executing an HTTP endpoint. This decorator
is designed to be used for a request based action on a course. It assumes that there will be a
request object as well as a course_id attribute to leverage to check course level privileges.
If the user does not have privileges for this operation, this will return HttpResponseForbidden (403).
"""
def wrapped(request, course_id):
try:
course_key = CourseKey.from_string(course_id)
except InvalidKeyError:
log.error("Unable to find course with course key %s", course_id)
return HttpResponseNotFound()
access = auth.user_has_role(request.user, CourseFinanceAdminRole(course_key))
if access:
return func(request, course_id)
else:
return HttpResponseForbidden()
return wrapped
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_course_permission(permissions.CAN_ENROLL)
def register_and_enroll_students(request, course_id): # pylint: disable=too-many-statements
"""
Create new account and Enroll students in this course.
Passing a csv file that contains a list of students.
Order in csv should be the following email = 0; username = 1; name = 2; country = 3.
If there are more than 4 columns in the csv: cohort = 4, course mode = 5.
Requires staff access.
-If the email address and username already exists and the user is enrolled in the course,
do nothing (including no email gets sent out)
-If the email address already exists, but the username is different,
match on the email address only and continue to enroll the user in the course using the email address
as the matching criteria. Note the change of username as a warning message (but not a failure).
Send a standard enrollment email which is the same as the existing manual enrollment
-If the username already exists (but not the email), assume it is a different user and fail
to create the new account.
The failure will be messaged in a response in the browser.
"""
if not configuration_helpers.get_value(
'ALLOW_AUTOMATED_SIGNUPS',
settings.FEATURES.get('ALLOW_AUTOMATED_SIGNUPS', False),
):
return HttpResponseForbidden()
course_id = CourseKey.from_string(course_id)
warnings = []
row_errors = []
general_errors = []
# email-students is a checkbox input type; will be present in POST if checked, absent otherwise
notify_by_email = 'email-students' in request.POST
# for white labels we use 'shopping cart' which uses CourseMode.HONOR as
# course mode for creating course enrollments.
if CourseMode.is_white_label(course_id):
default_course_mode = CourseMode.HONOR
else:
default_course_mode = None
# Allow bulk enrollments in all non-expired course modes including "credit" (which is non-selectable)
valid_course_modes = set(map(lambda x: x.slug, CourseMode.modes_for_course(
course_id=course_id,
only_selectable=False,
include_expired=False,
)))
if 'students_list' in request.FILES: # lint-amnesty, pylint: disable=too-many-nested-blocks
students = []
try:
upload_file = request.FILES.get('students_list')
if upload_file.name.endswith('.csv'):
students = list(csv.reader(upload_file.read().decode('utf-8-sig').splitlines()))
course = get_course_by_id(course_id)
else:
general_errors.append({
'username': '', 'email': '',
'response': _(
'Make sure that the file you upload is in CSV format with no extraneous characters or rows.')
})
except Exception: # pylint: disable=broad-except
general_errors.append({
'username': '', 'email': '', 'response': _('Could not read uploaded file.')
})
finally:
upload_file.close()
generated_passwords = []
# To skip fetching cohorts from the DB while iterating on students,
# {<cohort name>: CourseUserGroup}
cohorts_cache = {}
already_warned_not_cohorted = False
extra_fields_is_enabled = configuration_helpers.get_value(
'ENABLE_AUTOMATED_SIGNUPS_EXTRA_FIELDS',
settings.FEATURES.get('ENABLE_AUTOMATED_SIGNUPS_EXTRA_FIELDS', False),
)
# Iterate each student in the uploaded csv file.
for row_num, student in enumerate(students, 1):
# Verify that we have the expected number of columns in every row
# but allow for blank lines.
if not student:
continue
if extra_fields_is_enabled:
is_valid_csv = 4 <= len(student) <= 6
error = _('Data in row #{row_num} must have between four and six columns: '
'email, username, full name, country, cohort, and course mode. '
'The last two columns are optional.').format(row_num=row_num)
else:
is_valid_csv = len(student) == 4
error = _('Data in row #{row_num} must have exactly four columns: '
'email, username, full name, and country.').format(row_num=row_num)
if not is_valid_csv:
general_errors.append({
'username': '',
'email': '',
'response': error
})
continue
# Extract each column, handle optional columns if they exist.
email, username, name, country, *optional_cols = student
if optional_cols:
optional_cols.append(default_course_mode)
cohort_name, course_mode, *_tail = optional_cols
else:
cohort_name = None
course_mode = None
# Validate cohort name, and get the cohort object. Skip if course
# is not cohorted.
cohort = None
if cohort_name and not already_warned_not_cohorted:
if not is_course_cohorted(course_id):
row_errors.append({
'username': username,
'email': email,
'response': _('Course is not cohorted but cohort provided. '
'Ignoring cohort assignment for all users.')
})
already_warned_not_cohorted = True
elif cohort_name in cohorts_cache:
cohort = cohorts_cache[cohort_name]
else:
# Don't attempt to create cohort or assign student if cohort
# does not exist.
try:
cohort = get_cohort_by_name(course_id, cohort_name)
except CourseUserGroup.DoesNotExist:
row_errors.append({
'username': username,
'email': email,
'response': _('Cohort name not found: {cohort}. '
'Ignoring cohort assignment for '
'all users.').format(cohort=cohort_name)
})
cohorts_cache[cohort_name] = cohort
# Validate course mode.
if not course_mode:
course_mode = default_course_mode
if (course_mode is not None
and course_mode not in valid_course_modes):
# If `default is None` and the user is already enrolled,
# `CourseEnrollment.change_mode()` will not update the mode,
# hence two error messages.
if default_course_mode is None:
err_msg = _(
'Invalid course mode: {mode}. Falling back to the '
'default mode, or keeping the current mode in case the '
'user is already enrolled.'
).format(mode=course_mode)
else:
err_msg = _(
'Invalid course mode: {mode}. Failling back to '
'{default_mode}, or resetting to {default_mode} in case '
'the user is already enrolled.'
).format(mode=course_mode, default_mode=default_course_mode)
row_errors.append({
'username': username,
'email': email,
'response': err_msg,
})
course_mode = default_course_mode
email_params = get_email_params(course, True, secure=request.is_secure())
try:
validate_email(email) # Raises ValidationError if invalid
except ValidationError:
row_errors.append({
'username': username,
'email': email,
'response': _('Invalid email {email_address}.').format(email_address=email)
})
else:
if User.objects.filter(email=email).exists():
# Email address already exists. assume it is the correct user
# and just register the user in the course and send an enrollment email.
user = User.objects.get(email=email)
# see if it is an exact match with email and username
# if it's not an exact match then just display a warning message, but continue onwards
if not User.objects.filter(email=email, username=username).exists():
warning_message = _(
'An account with email {email} exists but the provided username {username} '
'is different. Enrolling anyway with {email}.'
).format(email=email, username=username)
warnings.append({
'username': username, 'email': email, 'response': warning_message
})
log.warning('email %s already exist', email)
else:
log.info(
"user already exists with username '%s' and email '%s'",
username,
email
)
# enroll a user if it is not already enrolled.
if not is_user_enrolled_in_course(user, course_id):
# Enroll user to the course and add manual enrollment audit trail
create_manual_course_enrollment(
user=user,
course_id=course_id,
mode=course_mode,
enrolled_by=request.user,
reason='Enrolling via csv upload',
state_transition=UNENROLLED_TO_ENROLLED,
)
enroll_email(course_id=course_id,
student_email=email,
auto_enroll=True,
email_students=notify_by_email,
email_params=email_params)
else:
# update the course mode if already enrolled
existing_enrollment = CourseEnrollment.get_enrollment(user, course_id)
if existing_enrollment.mode != course_mode:
existing_enrollment.change_mode(mode=course_mode)
if cohort:
try:
add_user_to_cohort(cohort, user)
except ValueError:
# user already in this cohort; ignore
pass
elif is_email_retired(email):
# We are either attempting to enroll a retired user or create a new user with an email which is
# already associated with a retired account. Simply block these attempts.
row_errors.append({
'username': username,
'email': email,
'response': _('Invalid email {email_address}.').format(email_address=email),
})
log.warning('Email address %s is associated with a retired user, so course enrollment was ' + # lint-amnesty, pylint: disable=logging-not-lazy
'blocked.', email)
else:
# This email does not yet exist, so we need to create a new account
# If username already exists in the database, then create_and_enroll_user
# will raise an IntegrityError exception.
password = generate_unique_password(generated_passwords)
errors = create_and_enroll_user(
email,
username,
name,
country,
password,
course_id,
course_mode,
request.user,
email_params,
email_user=notify_by_email,
)
row_errors.extend(errors)
if cohort:
try:
add_user_to_cohort(cohort, email)
except ValueError:
# user already in this cohort; ignore
# NOTE: Checking this here may be unnecessary if we can prove that a new user will never be
# automatically assigned to a cohort from the above.
pass
except ValidationError:
row_errors.append({
'username': username,
'email': email,
'response': _('Invalid email {email_address}.').format(email_address=email),
})
else:
general_errors.append({
'username': '', 'email': '', 'response': _('File is not attached.')
})
results = {
'row_errors': row_errors,
'general_errors': general_errors,
'warnings': warnings
}
return JsonResponse(results)
def generate_random_string(length):
"""
Create a string of random characters of specified length
"""
chars = [
char for char in string.ascii_uppercase + string.digits + string.ascii_lowercase
if char not in 'aAeEiIoOuU1l'
]
return ''.join(random.choice(chars) for i in range(length))
def generate_unique_password(generated_passwords, password_length=12):
"""
generate a unique password for each student.
"""
password = generate_random_string(password_length)
while password in generated_passwords:
password = generate_random_string(password_length)
generated_passwords.append(password)
return password
def create_user_and_user_profile(email, username, name, country, password):
"""
Create a new user, add a new Registration instance for letting user verify its identity and create a user profile.
:param email: user's email address
:param username: user's username
:param name: user's name
:param country: user's country
:param password: user's password
:return: User instance of the new user.
"""
user = User.objects.create_user(username, email, password)
reg = Registration()
reg.register(user)
profile = UserProfile(user=user)
profile.name = name
profile.country = country
profile.save()
return user
def create_manual_course_enrollment(user, course_id, mode, enrolled_by, reason, state_transition):
"""
Create course enrollment for the given student and create manual enrollment audit trail.
:param user: User who is to enroll in course
:param course_id: course identifier of the course in which to enroll the user.
:param mode: mode for user enrollment, e.g. 'honor', 'audit' etc.
:param enrolled_by: User who made the manual enrollment entry (usually instructor or support)
:param reason: Reason behind manual enrollment
:param state_transition: state transition denoting whether student enrolled from un-enrolled,
un-enrolled from enrolled etc.
:return CourseEnrollment instance.
"""
enrollment_obj = CourseEnrollment.enroll(user, course_id, mode=mode)
ManualEnrollmentAudit.create_manual_enrollment_audit(
enrolled_by, user.email, state_transition, reason, enrollment_obj
)
log.info('user %s enrolled in the course %s', user.username, course_id)
return enrollment_obj
def create_and_enroll_user(
email,
username,
name,
country,
password,
course_id,
course_mode,
enrolled_by,
email_params,
email_user=True,
):
"""
Create a new user and enroll him/her to the given course, return list of errors in the following format
Error format:
each error is key-value pait dict with following key-value pairs.
1. username: username of the user to enroll
1. email: email of the user to enroll
1. response: readable error message
:param email: user's email address
:param username: user's username
:param name: user's name
:param country: user's country
:param password: user's password
:param course_id: course identifier of the course in which to enroll the user.
:param course_mode: mode for user enrollment, e.g. 'honor', 'audit' etc.
:param enrolled_by: User who made the manual enrollment entry (usually instructor or support)
:param email_params: information to send to the user via email
:param email_user: If True and it's a new user, an email will be sent to
them upon account creation.
:return: list of errors
"""
errors = []
try:
with transaction.atomic():
# Create a new user
user = create_user_and_user_profile(email, username, name, country, password)
# Enroll user to the course and add manual enrollment audit trail
create_manual_course_enrollment(
user=user,
course_id=course_id,
mode=course_mode,
enrolled_by=enrolled_by,
reason='Enrolling via csv upload',
state_transition=UNENROLLED_TO_ENROLLED,
)
except IntegrityError:
errors.append({
'username': username,
'email': email,
'response': _('Username {user} already exists.').format(user=username)
})
except Exception as ex: # pylint: disable=broad-except
log.exception(type(ex).__name__)
errors.append({
'username': username, 'email': email, 'response': type(ex).__name__,
})
else:
if email_user:
try:
# It's a new user, an email will be sent to each newly created user.
email_params.update({
'message_type': 'account_creation_and_enrollment',
'email_address': email,
'user_id': user.id,
'password': password,
'platform_name': configuration_helpers.get_value('platform_name', settings.PLATFORM_NAME),
})
send_mail_to_student(email, email_params)
except Exception as ex: # pylint: disable=broad-except
log.exception(
f"Exception '{type(ex).__name__}' raised while sending email to new user."
)
errors.append({
'username': username,
'email': email,
'response':
_("Error '{error}' while sending email to new user (user email={email}). "
"Without the email student would not be able to login. "
"Please contact support for further information.").format(
error=type(ex).__name__, email=email
),
})
else:
log.info('email sent to new created user at %s', email)
return errors
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_course_permission(permissions.CAN_ENROLL)
@require_post_params(action="enroll or unenroll", identifiers="stringified list of emails and/or usernames")
def students_update_enrollment(request, course_id): # lint-amnesty, pylint: disable=too-many-statements
"""
Enroll or unenroll students by email.
Requires staff access.
Query Parameters:
- action in ['enroll', 'unenroll']
- identifiers is string containing a list of emails and/or usernames separated by anything split_input_list can handle. # lint-amnesty, pylint: disable=line-too-long
- auto_enroll is a boolean (defaults to false)
If auto_enroll is false, students will be allowed to enroll.
If auto_enroll is true, students will be enrolled as soon as they register.
- email_students is a boolean (defaults to false)
If email_students is true, students will be sent email notification
If email_students is false, students will not be sent email notification
Returns an analog to this JSON structure: {
"action": "enroll",
"auto_enroll": false,
"results": [
{
"email": "testemail@test.org",
"before": {
"enrollment": false,
"auto_enroll": false,
"user": true,
"allowed": false
},
"after": {
"enrollment": true,
"auto_enroll": false,
"user": true,
"allowed": false
}
}
]
}
"""
course_id = CourseKey.from_string(course_id)
action = request.POST.get('action')
identifiers_raw = request.POST.get('identifiers')
identifiers = _split_input_list(identifiers_raw)
auto_enroll = _get_boolean_param(request, 'auto_enroll')
email_students = _get_boolean_param(request, 'email_students')
reason = request.POST.get('reason')
enrollment_obj = None
state_transition = DEFAULT_TRANSITION_STATE
email_params = {}
if email_students:
course = get_course_by_id(course_id)
email_params = get_email_params(course, auto_enroll, secure=request.is_secure())
results = []
for identifier in identifiers: # lint-amnesty, pylint: disable=too-many-nested-blocks
# First try to get a user object from the identifier
user = None
email = None
language = None
try:
user = get_student_from_identifier(identifier)
except User.DoesNotExist:
email = identifier
else:
email = user.email
language = get_user_email_language(user)
try:
# Use django.core.validators.validate_email to check email address
# validity (obviously, cannot check if email actually /exists/,
# simply that it is plausibly valid)
validate_email(email) # Raises ValidationError if invalid
if action == 'enroll':
before, after, enrollment_obj = enroll_email(
course_id, email, auto_enroll, email_students, email_params, language=language
)
before_enrollment = before.to_dict()['enrollment']
before_user_registered = before.to_dict()['user']
before_allowed = before.to_dict()['allowed']
after_enrollment = after.to_dict()['enrollment']
after_allowed = after.to_dict()['allowed']
if before_user_registered:
if after_enrollment:
if before_enrollment:
state_transition = ENROLLED_TO_ENROLLED
else:
if before_allowed:
state_transition = ALLOWEDTOENROLL_TO_ENROLLED
else:
state_transition = UNENROLLED_TO_ENROLLED
else:
if after_allowed:
state_transition = UNENROLLED_TO_ALLOWEDTOENROLL
elif action == 'unenroll':
before, after = unenroll_email(
course_id, email, email_students, email_params, language=language
)
before_enrollment = before.to_dict()['enrollment']
before_allowed = before.to_dict()['allowed']
enrollment_obj = CourseEnrollment.get_enrollment(user, course_id) if user else None
if before_enrollment:
state_transition = ENROLLED_TO_UNENROLLED
else:
if before_allowed:
state_transition = ALLOWEDTOENROLL_TO_UNENROLLED
else:
state_transition = UNENROLLED_TO_UNENROLLED
else:
return HttpResponseBadRequest(strip_tags(
f"Unrecognized action '{action}'"
))
except ValidationError:
# Flag this email as an error if invalid, but continue checking
# the remaining in the list
results.append({
'identifier': identifier,
'invalidIdentifier': True,
})
except Exception as exc: # pylint: disable=broad-except
# catch and log any exceptions
# so that one error doesn't cause a 500.
log.exception("Error while #{}ing student")
log.exception(exc)
results.append({
'identifier': identifier,
'error': True,
})
else:
ManualEnrollmentAudit.create_manual_enrollment_audit(
request.user, email, state_transition, reason, enrollment_obj
)
results.append({
'identifier': identifier,
'before': before.to_dict(),
'after': after.to_dict(),
})
response_payload = {
'action': action,
'results': results,
'auto_enroll': auto_enroll,
}
return JsonResponse(response_payload)
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_course_permission(permissions.CAN_BETATEST)
@common_exceptions_400
@require_post_params(
identifiers="stringified list of emails and/or usernames",
action="add or remove",
)
def bulk_beta_modify_access(request, course_id):
"""
Enroll or unenroll users in beta testing program.
Query parameters:
- identifiers is string containing a list of emails and/or usernames separated by
anything split_input_list can handle.
- action is one of ['add', 'remove']
"""
course_id = CourseKey.from_string(course_id)
action = request.POST.get('action')
identifiers_raw = request.POST.get('identifiers')
identifiers = _split_input_list(identifiers_raw)
email_students = _get_boolean_param(request, 'email_students')
auto_enroll = _get_boolean_param(request, 'auto_enroll')
results = []
rolename = 'beta'
course = get_course_by_id(course_id)
email_params = {}
if email_students:
secure = request.is_secure()
email_params = get_email_params(course, auto_enroll=auto_enroll, secure=secure)
for identifier in identifiers:
try:
error = False
user_does_not_exist = False
user = get_student_from_identifier(identifier)
user_active = user.is_active
if action == 'add':
allow_access(course, user, rolename)
elif action == 'remove':
revoke_access(course, user, rolename)
else:
return HttpResponseBadRequest(strip_tags(
f"Unrecognized action '{action}'"
))
except User.DoesNotExist:
error = True
user_does_not_exist = True
user_active = None
# catch and log any unexpected exceptions
# so that one error doesn't cause a 500.
except Exception as exc: # pylint: disable=broad-except
log.exception("Error while #{}ing student")
log.exception(exc)
error = True
else:
# If no exception thrown, see if we should send an email
if email_students:
send_beta_role_email(action, user, email_params)
# See if we should autoenroll the student
if auto_enroll:
# Check if student is already enrolled
if not is_user_enrolled_in_course(user, course_id):
CourseEnrollment.enroll(user, course_id)
finally:
# Tabulate the action result of this email address
results.append({
'identifier': identifier,
'error': error, # pylint: disable=used-before-assignment
'userDoesNotExist': user_does_not_exist, # pylint: disable=used-before-assignment
'is_active': user_active # pylint: disable=used-before-assignment
})
response_payload = {
'action': action,
'results': results,
}
return JsonResponse(response_payload)
@require_POST
@ensure_csrf_cookie
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_course_permission(permissions.EDIT_COURSE_ACCESS)
@require_post_params(
unique_student_identifier="email or username of user to change access",
rolename="'instructor', 'staff', 'beta', or 'ccx_coach'",
action="'allow' or 'revoke'"
)
@common_exceptions_400
def modify_access(request, course_id):
"""
Modify staff/instructor access of other user.
Requires instructor access.
NOTE: instructors cannot remove their own instructor access.