You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
178 lines
6.5 KiB
Python
178 lines
6.5 KiB
Python
from django.shortcuts import render, redirect, get_object_or_404
|
|
from django.http.response import HttpResponseNotAllowed
|
|
from django.contrib.auth.decorators import permission_required, login_required
|
|
from django.http import HttpResponse, JsonResponse, HttpResponseForbidden, HttpResponseBadRequest, HttpResponseNotAllowed
|
|
from django.contrib.auth import logout
|
|
from django.urls import reverse
|
|
from django.views.generic.detail import SingleObjectMixin, DetailView
|
|
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.contrib import messages
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils import timezone
|
|
|
|
import json
|
|
|
|
from .forms import FileUploadForm, UploadCertificateForm, RuleAttributeForm
|
|
from .models import File, Rule, UserAttribute, RuleAttribute, AttributeType, User
|
|
|
|
def create_user(request):
|
|
special_user = request.user
|
|
if special_user.has_perm('abac.can_create_users'):
|
|
pass #TODO: Create new User
|
|
else:
|
|
# Return a response indicating insufficient permissions
|
|
return HttpResponseNotAllowed(request)
|
|
|
|
@permission_required('abac.can_create_users', raise_exception=True)
|
|
def create_user_view(request):
|
|
# Your view logic here
|
|
return HttpResponse('New user created')
|
|
|
|
@login_required
|
|
def landing_page(request):
|
|
files = File.objects.all()
|
|
return render(request, 'landing_page.html', {'files': files})
|
|
|
|
def login_view(request):
|
|
return HttpResponse('Login View')
|
|
|
|
def logout_view(request):
|
|
logout(request)
|
|
return redirect('abac:login')
|
|
|
|
@login_required
|
|
def upload_file_view(request):
|
|
if request.method == 'POST':
|
|
form = FileUploadForm(request.POST, request.FILES)
|
|
if form.is_valid():
|
|
file_instance = form.save(commit=False)
|
|
file_instance.owner = request.user # Assign the logged-in user as the owner of the uploaded file.
|
|
file_instance.save()
|
|
return redirect('abac:home') # Redirect to the landing page after a successful file upload.
|
|
else:
|
|
form = FileUploadForm()
|
|
return render(request, 'file_upload.html', {'form': form})
|
|
|
|
def create_user_view(request):
|
|
return HttpResponse('Create User View')
|
|
|
|
@login_required
|
|
def upload_certificate_view(request):
|
|
if request.method == 'POST':
|
|
uploaded_file = request.FILES.get('certificate')
|
|
|
|
if not uploaded_file:
|
|
return HttpResponseBadRequest('No file uploaded.')
|
|
|
|
certificate_data = json.load(uploaded_file)
|
|
|
|
for attribute_data in certificate_data:
|
|
name = attribute_data.get('name')
|
|
value = attribute_data.get('value')
|
|
|
|
# Assuming you have a method to get or create AttributeType
|
|
attribute_type = get_or_create_attribute_type(name, value, private=True)
|
|
|
|
attribute, created = UserAttribute.objects.update_or_create(
|
|
user=request.user,
|
|
attribute_type=attribute_type,
|
|
defaults={'value': value, 'last_modified': timezone.now()}
|
|
)
|
|
|
|
messages.success(request, 'Certificate uploaded successfully.')
|
|
return redirect('abac:user_details', username=request.user.username)
|
|
else:
|
|
form = UploadCertificateForm()
|
|
|
|
return render(request, 'upload_certificate.html', {'form': form})
|
|
|
|
@login_required
|
|
def file_detail(request, file_id):
|
|
file = get_object_or_404(File, id=file_id)
|
|
rules = Rule.objects.filter(file=file)
|
|
return render(request, 'file_detail.html', {'file': file, 'rules': rules})
|
|
|
|
@login_required
|
|
def create_rule(request, file_id):
|
|
file = get_object_or_404(File, id=file_id)
|
|
|
|
if request.method == "POST":
|
|
rule_name = request.POST.get('rule_name')
|
|
if rule_name:
|
|
rule = Rule.objects.create(name=rule_name, file=file)
|
|
url = reverse('abac:rule_detail', args=[file_id, rule.id])
|
|
return redirect(url)
|
|
|
|
# If the rule_name is not provided or if the method is GET, redirect to file_detail view.
|
|
return redirect('abac:file_detail', file_id=file_id)
|
|
|
|
|
|
@login_required
|
|
def rule_detail(request, file_id, rule_id=None):
|
|
file = get_object_or_404(File, id=file_id)
|
|
rule = get_object_or_404(Rule, id=rule_id) if rule_id else None
|
|
|
|
if request.method == "POST":
|
|
form = RuleAttributeForm(request.POST)
|
|
if form.is_valid():
|
|
rule_attribute = form.save(commit=False)
|
|
rule_attribute.rule = rule
|
|
rule_attribute.save()
|
|
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
|
|
else:
|
|
form = RuleAttributeForm()
|
|
|
|
rule_attributes = RuleAttribute.objects.filter(rule=rule) if rule else []
|
|
|
|
return render(request, 'rule_detail.html', {
|
|
'file': file,
|
|
'rule': rule,
|
|
'form': form,
|
|
'rule_attributes': rule_attributes
|
|
})
|
|
|
|
@login_required
|
|
def delete_rule_attribute(request, file_id, rule_id, rule_attribute_id):
|
|
rule_attribute = get_object_or_404(RuleAttribute, id=rule_attribute_id)
|
|
rule_attribute.delete()
|
|
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
|
|
|
|
@login_required
|
|
def user_detail_view(request, username):
|
|
user = get_object_or_404(User, username=username)
|
|
|
|
# Check if the requested user is the same as the logged-in user or the logged-in user has the required permission
|
|
if user != request.user and not request.user.has_perm('abac.can_create_users'):
|
|
return HttpResponseForbidden('You do not have permission to view this page.')
|
|
|
|
attributes = UserAttribute.objects.filter(user=user)
|
|
return render(request, 'user_detail.html', {'user': user, 'attributes': attributes})
|
|
|
|
|
|
|
|
|
|
def get_or_create_attribute_type(name, value, private=False):
|
|
# Determine the datatype from the value
|
|
if isinstance(value, int):
|
|
datatype = 'INTEGER'
|
|
elif isinstance(value, float):
|
|
datatype = 'FLOAT'
|
|
elif isinstance(value, bool):
|
|
datatype = 'BOOLEAN'
|
|
elif isinstance(value, str):
|
|
datatype = 'STRING'
|
|
else:
|
|
raise ValidationError('Invalid data type in certificate for attribute: {}'.format(name))
|
|
|
|
# Try to get the existing AttributeType object with matching name and datatype.
|
|
attribute_type, created = AttributeType.objects.get_or_create(
|
|
name=name,
|
|
datatype=datatype
|
|
)
|
|
|
|
if not created and attribute_type.datatype != datatype:
|
|
# If an AttributeType with the same name but different datatype exists, create a new one.
|
|
attribute_type = AttributeType.objects.create(name=name, datatype=datatype, is_private=private)
|
|
|
|
return attribute_type
|