You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

356 lines
12 KiB
Python

from django.shortcuts import render, redirect, get_object_or_404
from django.http.response import HttpResponseNotAllowed
from django.contrib.auth.decorators import permission_required, login_required, user_passes_test
from django.http import HttpResponse, FileResponse, HttpResponseForbidden, HttpResponseBadRequest, HttpResponseNotAllowed, JsonResponse
from django.contrib.auth import logout
from django.urls import reverse
from django.contrib import messages
from django.core.exceptions import ValidationError
from django.utils import timezone
import hashlib
import json
from phe import paillier
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
from .forms import FileUploadForm, UploadCertificateForm, RuleAttributeForm, TrustedAuthorityForm
from .models import File, Rule, UserAttribute, RuleAttribute, AttributeType, User, TrustedAuthority
@login_required
def landing_page(request):
files = File.objects.all()
return render(request, 'landing_page.html', {'files': files})
def logout_view(request):
logout(request)
return redirect('abac:login')
@login_required
def upload_file_view(request):
if request.method == 'POST':
form = FileUploadForm(request.POST, request.FILES)
if form.is_valid():
file_instance = form.save(commit=False)
file_instance.owner = request.user
file_instance.save()
return redirect('abac:home')
else:
form = FileUploadForm()
return render(request, 'file_upload.html', {'form': form})
def create_user_view(request):
return HttpResponse('Create User View')
@login_required
def upload_certificate_view(request):
if request.method == 'POST':
form = UploadCertificateForm(request.POST, request.FILES)
if form.is_valid():
certificate = form.cleaned_data['certificate']
signature = form.cleaned_data['signature'].read()
is_verified = False
for authority in TrustedAuthority.objects.all():
pub_key = serialization.load_pem_public_key(
authority.public_key.encode(),
backend=default_backend()
)
try:
pub_key.verify(
signature,
certificate.read(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
is_verified = True
break
except:
continue
if is_verified:
certificate_data = json.load(certificate)
if not request.user.public_key:
messages.warning(request, 'Please upload your public key first.')
return redirect(reverse('abac:upload_public_key', args=[request.user.username]))
public_key_native = paillier.PaillierPublicKey(n=int(request.user.public_key))
for attribute_data in certificate_data:
name = attribute_data.get('name')
value = attribute_data.get('value')
attribute_type = get_or_create_attribute_type(name, value, private=True)
encrypted_value = str(public_key_native.encrypt(value).ciphertext())
attribute, created = UserAttribute.objects.update_or_create(
user=request.user,
attribute_type=attribute_type,
defaults={'value': encrypted_value, 'last_modified': timezone.now()}
)
messages.success(request, 'Certificate uploaded successfully.')
return redirect('abac:user_details', username=request.user.username)
else:
messages.error(request, 'Certificate verification failed.')
else:
form = UploadCertificateForm()
return render(request, 'upload_certificate.html', {'form': form})
@login_required
def file_detail(request, file_id):
user = request.user
file = get_object_or_404(File, id=file_id)
if check_permission(user, file):
rules = Rule.objects.filter(file=file)
return render(request, 'file_detail.html', {'file': file, 'rules': rules})
else:
return HttpResponseForbidden("You don't have the permission to access this file")
@login_required
def create_rule(request, file_id):
file = get_object_or_404(File, id=file_id)
if request.method == "POST":
rule_name = request.POST.get('rule_name')
if rule_name:
rule = Rule.objects.create(name=rule_name, file=file)
url = reverse('abac:rule_detail', args=[file_id, rule.id])
return redirect(url)
# If the rule_name is not provided or if the method is GET, redirect to file_detail view.
return redirect('abac:file_detail', file_id=file_id)
@login_required
def rule_detail(request, file_id, rule_id=None):
file = get_object_or_404(File, id=file_id)
rule = get_object_or_404(Rule, id=rule_id) if rule_id else None
if request.method == "POST":
form = RuleAttributeForm(request.POST)
if form.is_valid():
rule_attribute = form.save(commit=False)
rule_attribute.rule = rule
rule_attribute.save()
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
else:
form = RuleAttributeForm()
rule_attributes = RuleAttribute.objects.filter(rule=rule) if rule else []
return render(request, 'rule_detail.html', {
'file': file,
'rule': rule,
'form': form,
'rule_attributes': rule_attributes
})
@login_required
def delete_rule_attribute(request, file_id, rule_id, rule_attribute_id):
rule_attribute = get_object_or_404(RuleAttribute, id=rule_attribute_id)
rule_attribute.delete()
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
@login_required
def user_detail_view(request, username):
user = get_object_or_404(User, username=username)
# Check if the requested user is the same as the logged-in user or the logged-in user has the required permission
if user != request.user and not request.user.is_superuser:
return HttpResponseForbidden('You do not have permission to view this page.')
attributes = UserAttribute.objects.filter(user=user)
return render(request, 'user_detail.html', {'user': user, 'attributes': attributes})
def get_or_create_attribute_type(name, value, private=False):
# Determine the datatype from the value
if isinstance(value, int):
datatype = 'INTEGER'
elif isinstance(value, float):
datatype = 'FLOAT'
elif isinstance(value, bool):
datatype = 'BOOLEAN'
elif isinstance(value, str):
datatype = 'STRING'
else:
raise ValidationError('Invalid data type in certificate for attribute: {}'.format(name))
# Try to get the existing AttributeType object with matching name and datatype.
attribute_type, created = AttributeType.objects.get_or_create(
name=name,
datatype=datatype,
defaults={'is_private': private}
)
if not created and attribute_type.datatype != datatype:
# If an AttributeType with the same name but different datatype exists, create a new one.
attribute_type = AttributeType.objects.create(name=name, datatype=datatype, is_private=private)
return attribute_type
@login_required
def download_file(request, file_id):
file = get_object_or_404(File, id=file_id)
user = request.user
for rule in file.rule_set.all():
if rule.is_satisfied_by(user):
return serve_file(file)
return HttpResponseForbidden('You do not have permission to download this file.')
def check_permission(user, file, mode = None):
if mode:
pass
else:
# a) Check if the user is the owner of the file.
if user == file.owner:
return True
# b) Check if the user is a superuser.
if user.is_superuser:
return True
# c) Check the user's attributes against the file's rules.
for rule in file.rule_set.all():
if rule.is_satisfied_by(user):
return True
def serve_file(file):
# Serve the file using FileResponse.
response = FileResponse(file.file.open('rb'))
response['Content-Disposition'] = f'attachment; filename="{file.name}"'
return response
@login_required
@user_passes_test(lambda u: u.is_superuser)
def user_management(request):
if request.method == "POST":
username = request.POST.get('username')
password = request.POST.get('password')
is_superuser = 'is_superuser' in request.POST # The checkbox for superuser will be in request.POST if checked.
if not username or not password:
messages.error(request, 'Username and password are required.')
else:
User.objects.create_user(username=username, password=password, is_superuser=is_superuser)
messages.success(request, f'User {username} created successfully.')
return redirect('abac:user_management')
users = User.objects.all().order_by('-is_superuser', 'username')
return render(request, 'user_management.html', {'users': users})
@login_required
def upload_public_key(request, username):
if request.method == 'POST':
public_key = request.POST.get('public_key')
request.user.public_key = public_key
request.user.save()
messages.success(request, 'Public key uploaded successfully.')
return redirect(reverse('abac:user_details', args=[username]))
return render(request, 'upload_public_key.html')
def get_hierarchy_data():
users = User.objects.all()
data = {
"name": "Users",
"children": []
}
for user in users:
user_data = {
'name': user.username,
'children': []
}
for file in user.file_set.all():
file_data = {
'name': file.name,
'children': []
}
for rule in file.rule_set.all():
rule_data = {
'name': rule.name,
'children': []
}
for rule_attribute in rule.ruleattribute_set.all():
rule_data['children'].append({
'name': f"{rule_attribute.attribute_type.name}; {rule_attribute.operator}; {rule_attribute.value}",
'children': []
})
file_data['children'].append(rule_data)
user_data['children'].append(file_data)
data['children'].append(user_data)
return json.dumps(data)
@login_required
@user_passes_test(lambda u: u.is_superuser)
def hierarchy_view(request):
context = {'data': get_hierarchy_data()}
return render(request, 'vis_tree.html', context=context)
@login_required
@user_passes_test(lambda u: u.is_superuser)
def trusted_authorities(request):
if request.method == "POST":
form = TrustedAuthorityForm(request.POST)
if form.is_valid():
form.save()
messages.success(request, 'Authority successfully added!')
return redirect(reverse('abac:trusted_authorities'))
else:
messages.warning(request, 'Invalid RSA Key')
return redirect(reverse('abac:trusted_authorities'))
authorities = TrustedAuthority.objects.all()
authorities_with_hashes = [(authority, hashlib.sha256(authority.public_key.encode()).hexdigest()) for authority in authorities]
form = TrustedAuthorityForm()
context = {
'authorities_with_hashes': authorities_with_hashes,
'form': form
}
return render(request, 'trusted_authorities.html', context)
@login_required
@user_passes_test(lambda u: u.is_superuser)
def delete_authority(request, authority_id):
authority = get_object_or_404(TrustedAuthority, id=authority_id)
if request.method == 'POST':
authority.delete()
messages.success(request, 'Authority successfully deleted!')
return redirect(reverse('abac:trusted_authorities'))
else:
return redirect(reverse('abac:trusted_authorities'))