|
|
|
from django.shortcuts import render, redirect, get_object_or_404
|
|
|
|
from django.http.response import HttpResponseNotAllowed
|
|
|
|
from django.contrib.auth.decorators import permission_required, login_required, user_passes_test
|
|
|
|
from django.http import HttpResponse, FileResponse, HttpResponseForbidden, HttpResponseBadRequest, HttpResponseNotAllowed, JsonResponse
|
|
|
|
from django.contrib.auth import logout
|
|
|
|
from django.urls import reverse
|
|
|
|
from django.contrib import messages
|
|
|
|
from django.core.exceptions import ValidationError
|
|
|
|
from django.utils import timezone
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
import json
|
|
|
|
from phe import paillier
|
|
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
|
|
|
|
|
|
from .forms import FileUploadForm, UploadCertificateForm, RuleAttributeForm, TrustedAuthorityForm
|
|
|
|
from .models import File, Rule, UserAttribute, RuleAttribute, AttributeType, User, TrustedAuthority
|
|
|
|
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def landing_page(request):
|
|
|
|
files = File.objects.all()
|
|
|
|
return render(request, 'landing_page.html', {'files': files})
|
|
|
|
|
|
|
|
def logout_view(request):
|
|
|
|
logout(request)
|
|
|
|
return redirect('abac:login')
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def upload_file_view(request):
|
|
|
|
if request.method == 'POST':
|
|
|
|
form = FileUploadForm(request.POST, request.FILES)
|
|
|
|
if form.is_valid():
|
|
|
|
file_instance = form.save(commit=False)
|
|
|
|
file_instance.owner = request.user
|
|
|
|
file_instance.save()
|
|
|
|
return redirect('abac:home')
|
|
|
|
else:
|
|
|
|
form = FileUploadForm()
|
|
|
|
return render(request, 'file_upload.html', {'form': form})
|
|
|
|
|
|
|
|
def create_user_view(request):
|
|
|
|
return HttpResponse('Create User View')
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def upload_certificate_view(request):
|
|
|
|
if request.method == 'POST':
|
|
|
|
form = UploadCertificateForm(request.POST, request.FILES)
|
|
|
|
if form.is_valid():
|
|
|
|
certificate = form.cleaned_data['certificate']
|
|
|
|
signature = form.cleaned_data['signature'].read()
|
|
|
|
|
|
|
|
is_verified = False
|
|
|
|
for authority in TrustedAuthority.objects.all():
|
|
|
|
pub_key = serialization.load_pem_public_key(
|
|
|
|
authority.public_key.encode(),
|
|
|
|
backend=default_backend()
|
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
|
|
|
pub_key.verify(
|
|
|
|
signature,
|
|
|
|
certificate.read(),
|
|
|
|
padding.PSS(
|
|
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
|
|
),
|
|
|
|
hashes.SHA256()
|
|
|
|
)
|
|
|
|
is_verified = True
|
|
|
|
break
|
|
|
|
except:
|
|
|
|
continue
|
|
|
|
|
|
|
|
if is_verified:
|
|
|
|
certificate.seek(0)
|
|
|
|
certificate_data = json.load(certificate)
|
|
|
|
|
|
|
|
if not request.user.public_key:
|
|
|
|
messages.warning(request, 'Please upload your public key first.')
|
|
|
|
return redirect(reverse('abac:upload_public_key', args=[request.user.username]))
|
|
|
|
|
|
|
|
public_key_native = paillier.PaillierPublicKey(n=int(request.user.public_key))
|
|
|
|
|
|
|
|
|
|
|
|
for attribute_data in certificate_data:
|
|
|
|
name = attribute_data.get('name')
|
|
|
|
value = attribute_data.get('value')
|
|
|
|
attribute_type = get_or_create_attribute_type(name, value, private=True)
|
|
|
|
encrypted_value = str(public_key_native.encrypt(value).ciphertext())
|
|
|
|
|
|
|
|
attribute, created = UserAttribute.objects.update_or_create(
|
|
|
|
user=request.user,
|
|
|
|
attribute_type=attribute_type,
|
|
|
|
defaults={'value': encrypted_value, 'last_modified': timezone.now()}
|
|
|
|
)
|
|
|
|
|
|
|
|
messages.success(request, 'Certificate uploaded successfully.')
|
|
|
|
return redirect('abac:user_details', username=request.user.username)
|
|
|
|
else:
|
|
|
|
messages.error(request, 'Certificate verification failed.')
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
form = UploadCertificateForm()
|
|
|
|
|
|
|
|
return render(request, 'upload_certificate.html', {'form': form})
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def file_detail(request, file_id):
|
|
|
|
user = request.user
|
|
|
|
file = get_object_or_404(File, id=file_id)
|
|
|
|
if check_permission(user, file):
|
|
|
|
rules = Rule.objects.filter(file=file)
|
|
|
|
return render(request, 'file_detail.html', {'file': file, 'rules': rules})
|
|
|
|
else:
|
|
|
|
return HttpResponseForbidden("You don't have the permission to access this file")
|
|
|
|
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def create_rule(request, file_id):
|
|
|
|
file = get_object_or_404(File, id=file_id)
|
|
|
|
|
|
|
|
if request.method == "POST":
|
|
|
|
rule_name = request.POST.get('rule_name')
|
|
|
|
if rule_name:
|
|
|
|
rule = Rule.objects.create(name=rule_name, file=file)
|
|
|
|
url = reverse('abac:rule_detail', args=[file_id, rule.id])
|
|
|
|
return redirect(url)
|
|
|
|
|
|
|
|
# If the rule_name is not provided or if the method is GET, redirect to file_detail view.
|
|
|
|
return redirect('abac:file_detail', file_id=file_id)
|
|
|
|
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def rule_detail(request, file_id, rule_id=None):
|
|
|
|
file = get_object_or_404(File, id=file_id)
|
|
|
|
rule = get_object_or_404(Rule, id=rule_id) if rule_id else None
|
|
|
|
|
|
|
|
if request.method == "POST":
|
|
|
|
form = RuleAttributeForm(request.POST)
|
|
|
|
if form.is_valid():
|
|
|
|
rule_attribute = form.save(commit=False)
|
|
|
|
rule_attribute.rule = rule
|
|
|
|
rule_attribute.save()
|
|
|
|
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
|
|
|
|
else:
|
|
|
|
form = RuleAttributeForm()
|
|
|
|
|
|
|
|
rule_attributes = RuleAttribute.objects.filter(rule=rule) if rule else []
|
|
|
|
|
|
|
|
return render(request, 'rule_detail.html', {
|
|
|
|
'file': file,
|
|
|
|
'rule': rule,
|
|
|
|
'form': form,
|
|
|
|
'rule_attributes': rule_attributes
|
|
|
|
})
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def delete_rule_attribute(request, file_id, rule_id, rule_attribute_id):
|
|
|
|
rule_attribute = get_object_or_404(RuleAttribute, id=rule_attribute_id)
|
|
|
|
rule_attribute.delete()
|
|
|
|
return redirect('abac:rule_detail', file_id=file_id, rule_id=rule_id)
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def user_detail_view(request, username):
|
|
|
|
user = get_object_or_404(User, username=username)
|
|
|
|
|
|
|
|
# Check if the requested user is the same as the logged-in user or the logged-in user has the required permission
|
|
|
|
if user != request.user and not request.user.is_superuser:
|
|
|
|
return HttpResponseForbidden('You do not have permission to view this page.')
|
|
|
|
|
|
|
|
attributes = UserAttribute.objects.filter(user=user)
|
|
|
|
return render(request, 'user_detail.html', {'user': user, 'attributes': attributes})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_or_create_attribute_type(name, value, private=False):
|
|
|
|
# Determine the datatype from the value
|
|
|
|
if isinstance(value, int):
|
|
|
|
datatype = 'INTEGER'
|
|
|
|
elif isinstance(value, float):
|
|
|
|
datatype = 'FLOAT'
|
|
|
|
elif isinstance(value, bool):
|
|
|
|
datatype = 'BOOLEAN'
|
|
|
|
elif isinstance(value, str):
|
|
|
|
datatype = 'STRING'
|
|
|
|
else:
|
|
|
|
raise ValidationError('Invalid data type in certificate for attribute: {}'.format(name))
|
|
|
|
|
|
|
|
# Try to get the existing AttributeType object with matching name and datatype.
|
|
|
|
attribute_type, created = AttributeType.objects.get_or_create(
|
|
|
|
name=name,
|
|
|
|
datatype=datatype,
|
|
|
|
defaults={'is_private': private}
|
|
|
|
)
|
|
|
|
|
|
|
|
if not created and attribute_type.datatype != datatype:
|
|
|
|
# If an AttributeType with the same name but different datatype exists, create a new one.
|
|
|
|
attribute_type = AttributeType.objects.create(name=name, datatype=datatype, is_private=private)
|
|
|
|
|
|
|
|
return attribute_type
|
|
|
|
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def download_file(request, file_id):
|
|
|
|
file = get_object_or_404(File, id=file_id)
|
|
|
|
|
|
|
|
user = request.user
|
|
|
|
|
|
|
|
for rule in file.rule_set.all():
|
|
|
|
if rule.is_satisfied_by(user):
|
|
|
|
return serve_file(file)
|
|
|
|
|
|
|
|
return HttpResponseForbidden('You do not have permission to download this file.')
|
|
|
|
|
|
|
|
def check_permission(user, file, mode = None):
|
|
|
|
if mode:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
# a) Check if the user is the owner of the file.
|
|
|
|
if user == file.owner:
|
|
|
|
return True
|
|
|
|
|
|
|
|
# b) Check if the user is a superuser.
|
|
|
|
if user.is_superuser:
|
|
|
|
return True
|
|
|
|
|
|
|
|
# c) Check the user's attributes against the file's rules.
|
|
|
|
for rule in file.rule_set.all():
|
|
|
|
if rule.is_satisfied_by(user):
|
|
|
|
return True
|
|
|
|
|
|
|
|
def serve_file(file):
|
|
|
|
# Serve the file using FileResponse.
|
|
|
|
response = FileResponse(file.file.open('rb'))
|
|
|
|
response['Content-Disposition'] = f'attachment; filename="{file.name}"'
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
|
|
def user_management(request):
|
|
|
|
if request.method == "POST":
|
|
|
|
username = request.POST.get('username')
|
|
|
|
password = request.POST.get('password')
|
|
|
|
is_superuser = 'is_superuser' in request.POST # The checkbox for superuser will be in request.POST if checked.
|
|
|
|
|
|
|
|
if not username or not password:
|
|
|
|
messages.error(request, 'Username and password are required.')
|
|
|
|
else:
|
|
|
|
User.objects.create_user(username=username, password=password, is_superuser=is_superuser)
|
|
|
|
messages.success(request, f'User {username} created successfully.')
|
|
|
|
|
|
|
|
return redirect('abac:user_management')
|
|
|
|
|
|
|
|
users = User.objects.all().order_by('-is_superuser', 'username')
|
|
|
|
return render(request, 'user_management.html', {'users': users})
|
|
|
|
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
def upload_public_key(request, username):
|
|
|
|
if request.method == 'POST':
|
|
|
|
public_key = request.POST.get('public_key')
|
|
|
|
request.user.public_key = public_key
|
|
|
|
request.user.save()
|
|
|
|
messages.success(request, 'Public key uploaded successfully.')
|
|
|
|
return redirect(reverse('abac:user_details', args=[username]))
|
|
|
|
|
|
|
|
return render(request, 'upload_public_key.html')
|
|
|
|
|
|
|
|
|
|
|
|
def get_hierarchy_data():
|
|
|
|
users = User.objects.all()
|
|
|
|
data = {
|
|
|
|
"name": "Users",
|
|
|
|
"children": []
|
|
|
|
}
|
|
|
|
|
|
|
|
for user in users:
|
|
|
|
user_data = {
|
|
|
|
'name': user.username,
|
|
|
|
'children': []
|
|
|
|
}
|
|
|
|
|
|
|
|
for file in user.file_set.all():
|
|
|
|
file_data = {
|
|
|
|
'name': file.name,
|
|
|
|
'children': []
|
|
|
|
}
|
|
|
|
|
|
|
|
for rule in file.rule_set.all():
|
|
|
|
rule_data = {
|
|
|
|
'name': rule.name,
|
|
|
|
'children': []
|
|
|
|
}
|
|
|
|
|
|
|
|
for rule_attribute in rule.ruleattribute_set.all():
|
|
|
|
rule_data['children'].append({
|
|
|
|
'name': f"{rule_attribute.attribute_type.name}; {rule_attribute.operator}; {rule_attribute.value}",
|
|
|
|
'children': []
|
|
|
|
})
|
|
|
|
|
|
|
|
file_data['children'].append(rule_data)
|
|
|
|
|
|
|
|
user_data['children'].append(file_data)
|
|
|
|
|
|
|
|
data['children'].append(user_data)
|
|
|
|
|
|
|
|
return json.dumps(data)
|
|
|
|
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
|
|
def hierarchy_view(request):
|
|
|
|
context = {'data': get_hierarchy_data()}
|
|
|
|
return render(request, 'vis_tree.html', context=context)
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
|
|
def trusted_authorities(request):
|
|
|
|
if request.method == "POST":
|
|
|
|
form = TrustedAuthorityForm(request.POST)
|
|
|
|
if form.is_valid():
|
|
|
|
form.save()
|
|
|
|
messages.success(request, 'Authority successfully added!')
|
|
|
|
return redirect(reverse('abac:trusted_authorities'))
|
|
|
|
else:
|
|
|
|
messages.warning(request, 'Invalid RSA Key')
|
|
|
|
return redirect(reverse('abac:trusted_authorities'))
|
|
|
|
|
|
|
|
authorities = TrustedAuthority.objects.all()
|
|
|
|
authorities_with_hashes = [(authority, hashlib.sha256(authority.public_key.encode()).hexdigest()) for authority in authorities]
|
|
|
|
|
|
|
|
form = TrustedAuthorityForm()
|
|
|
|
context = {
|
|
|
|
'authorities_with_hashes': authorities_with_hashes,
|
|
|
|
'form': form
|
|
|
|
}
|
|
|
|
return render(request, 'trusted_authorities.html', context)
|
|
|
|
|
|
|
|
@login_required
|
|
|
|
@user_passes_test(lambda u: u.is_superuser)
|
|
|
|
def delete_authority(request, authority_id):
|
|
|
|
authority = get_object_or_404(TrustedAuthority, id=authority_id)
|
|
|
|
if request.method == 'POST':
|
|
|
|
authority.delete()
|
|
|
|
messages.success(request, 'Authority successfully deleted!')
|
|
|
|
return redirect(reverse('abac:trusted_authorities'))
|
|
|
|
|
|
|
|
else:
|
|
|
|
return redirect(reverse('abac:trusted_authorities'))
|