You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
7.5 KiB
Python

from django.shortcuts import render, redirect, get_object_or_404
from django.http.response import HttpResponseNotAllowed
from django.contrib.auth.decorators import permission_required, login_required, user_passes_test
from django.http import HttpResponse, FileResponse, HttpResponseForbidden, HttpResponseBadRequest, HttpResponseNotAllowed
from django.contrib.auth import logout
from django.urls import reverse
from django.contrib import messages
from django.core.exceptions import ValidationError
from django.utils import timezone
import json
from .forms import FileUploadForm, UploadCertificateForm, RuleAttributeForm
from .models import File, Rule, UserAttribute, RuleAttribute, AttributeType, User
@login_required
def landing_page(request):
files = File.objects.all()
return render(request, 'landing_page.html', {'files': files})
def logout_view(request):
logout(request)
return redirect('abac:login')
@login_required
def upload_file_view(request):
if request.method == 'POST':
form = FileUploadForm(request.POST, request.FILES)
if form.is_valid():
file_instance = form.save(commit=False)
file_instance.owner = request.user
file_instance.save()
return redirect('abac:home')
else:
form = FileUploadForm()
return render(request, 'file_upload.html', {'form': form})
def create_user_view(request):
return HttpResponse('Create User View')
@login_required
def upload_certificate_view(request):
if request.method == 'POST':
uploaded_file = request.FILES.get('certificate')
if not uploaded_file:
return HttpResponseBadRequest('No file uploaded.')
certificate_data = json.load(uploaded_file)
for attribute_data in certificate_data:
name = attribute_data.get('name')
value = attribute_data.get('value')
attribute_type = get_or_create_attribute_type(name, value, private=True)
attribute, created = UserAttribute.objects.update_or_create(
user=request.user,
attribute_type=attribute_type,
defaults={'value': value, 'last_modified': timezone.now()}
)
messages.success(request, 'Certificate uploaded successfully.')
return redirect('abac:user_details', username=request.user.username)
else:
form = UploadCertificateForm()
return render(request, 'upload_certificate.html', {'form': form})
@login_required
def file_detail(request, file_id):
file = get_object_or_404(File, id=file_id)
rules = Rule.objects.filter(file=file)
return render(request, 'file_detail.html', {'file': file, 'rules': rules})
@login_required
def create_rule(request, file_id):
file = get_object_or_404(File, id=file_id)
if request.method == "POST":
rule_name = request.POST.get('rule_name')
if rule_name:
rule = Rule.objects.create(name=rule_name, file=file)
url = reverse('abac:rule_detail', args=[file_id, rule.id])
return redirect(url)
# If the rule_name is not provided or if the method is GET, redirect to file_detail view.
return redirect('abac:file_detail', file_id=file_id)
@login_required
def rule_detail(request, file_id, rule_id=None):
file = get_object_or_404(File, id=file_id)
rule = get_object_or_404(Rule, id=rule_id) if rule_id else None
if request.method == "POST":
form = RuleAttributeForm(request.POST)
if form.is_valid():
rule_attribute = form.save(commit=False)
rule_attribute.rule = rule
rule_attribute.save()
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
else:
form = RuleAttributeForm()
rule_attributes = RuleAttribute.objects.filter(rule=rule) if rule else []
return render(request, 'rule_detail.html', {
'file': file,
'rule': rule,
'form': form,
'rule_attributes': rule_attributes
})
@login_required
def delete_rule_attribute(request, file_id, rule_id, rule_attribute_id):
rule_attribute = get_object_or_404(RuleAttribute, id=rule_attribute_id)
rule_attribute.delete()
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
@login_required
def user_detail_view(request, username):
user = get_object_or_404(User, username=username)
# Check if the requested user is the same as the logged-in user or the logged-in user has the required permission
if user != request.user and not request.user.has_perm('abac.can_create_users'):
return HttpResponseForbidden('You do not have permission to view this page.')
attributes = UserAttribute.objects.filter(user=user)
return render(request, 'user_detail.html', {'user': user, 'attributes': attributes})
def get_or_create_attribute_type(name, value, private=False):
# Determine the datatype from the value
if isinstance(value, int):
datatype = 'INTEGER'
elif isinstance(value, float):
datatype = 'FLOAT'
elif isinstance(value, bool):
datatype = 'BOOLEAN'
elif isinstance(value, str):
datatype = 'STRING'
else:
raise ValidationError('Invalid data type in certificate for attribute: {}'.format(name))
# Try to get the existing AttributeType object with matching name and datatype.
attribute_type, created = AttributeType.objects.get_or_create(
name=name,
datatype=datatype,
defaults={'is_private': private}
)
if not created and attribute_type.datatype != datatype:
# If an AttributeType with the same name but different datatype exists, create a new one.
attribute_type = AttributeType.objects.create(name=name, datatype=datatype, is_private=private)
return attribute_type
@login_required
def download_file(request, file_id):
file = get_object_or_404(File, id=file_id)
user = request.user
# a) Check if the user is the owner of the file.
if user == file.owner:
print("User is Owner")
return serve_file(file)
# b) Check if the user is a superuser.
if user.is_superuser:
print("User is superuser")
return serve_file(file)
# c) Check the user's attributes against the file's rules.
for rule in file.rule_set.all():
if rule.is_satisfied_by(user):
print(f"user satisfies rule {rule.name}")
return serve_file(file)
return HttpResponseForbidden('You do not have permission to download this file.')
def serve_file(file):
# Serve the file using FileResponse.
response = FileResponse(file.file.open('rb'))
response['Content-Disposition'] = f'attachment; filename="{file.name}"'
return response
@login_required
@user_passes_test(lambda u: u.is_superuser)
def user_management(request):
if request.method == "POST":
username = request.POST.get('username')
password = request.POST.get('password')
is_superuser = 'is_superuser' in request.POST # The checkbox for superuser will be in request.POST if checked.
if not username or not password:
messages.error(request, 'Username and password are required.')
else:
User.objects.create_user(username=username, password=password, is_superuser=is_superuser)
messages.success(request, f'User {username} created successfully.')
return redirect('abac:user_management')
users = User.objects.all().order_by('-is_superuser', 'username')
return render(request, 'user_management.html', {'users': users})