Image security (needs to be improved)
This commit is contained in:
		
							
								
								
									
										45
									
								
								app.py
									
									
									
									
									
								
							
							
						
						
									
										45
									
								
								app.py
									
									
									
									
									
								
							| @ -9,6 +9,9 @@ from apscheduler.schedulers.background import BackgroundScheduler | ||||
| import random | ||||
| from colorthief import ColorThief | ||||
| import colorsys | ||||
| from steganography import embed_message, extract_message | ||||
| import hashlib | ||||
|  | ||||
| app = Flask(__name__) | ||||
| app.secret_key = os.urandom(24) | ||||
| config = load_or_create_config() | ||||
| @ -147,12 +150,11 @@ def admin_upload(): | ||||
|         file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | ||||
|         file.save(file_path) | ||||
|  | ||||
|         # Generate thumbnails | ||||
|         generate_thumbnails(filename) | ||||
|          | ||||
|         # Extract EXIF data | ||||
|         exif = None | ||||
|         exifraw = None | ||||
|         with Image.open(file_path) as img: | ||||
|             exifraw = img.info['exif'] | ||||
|             width, height = img.size | ||||
|             exif = { | ||||
|                 ExifTags.TAGS[k]: v | ||||
| @ -160,6 +162,21 @@ def admin_upload(): | ||||
|                 if k in ExifTags.TAGS | ||||
|             } | ||||
|          | ||||
|         # Generate a unique key for the image | ||||
|         unique_key = hashlib.sha256(f"{filename}{datetime.now().isoformat()}".encode()).hexdigest()[:16] | ||||
|          | ||||
|         # Embed the unique key into the image | ||||
|         try: | ||||
|             embed_message(file_path, unique_key, exifraw) | ||||
|         except ValueError as e: | ||||
|             flash(f"Error embedding key: {str(e)}") | ||||
|             os.remove(file_path) | ||||
|             return redirect(url_for('admin')) | ||||
|  | ||||
|          | ||||
|         # Generate thumbnails | ||||
|         generate_thumbnails(filename) | ||||
|          | ||||
|         # Get image dimensions | ||||
|         with Image.open(file_path) as img: | ||||
|             width, height = img.size | ||||
| @ -183,7 +200,8 @@ def admin_upload(): | ||||
|             orientation=int(exif.get('Orientation', 1)), | ||||
|             width=width, | ||||
|             height=height, | ||||
|             highlight_color=get_highlight_color(THUMBNAIL_FOLDER + f"/{os.path.splitext(filename)[0]}/256_{filename}") | ||||
|             highlight_color=get_highlight_color(THUMBNAIL_FOLDER + f"/{os.path.splitext(filename)[0]}/256_{filename}"), | ||||
|             unique_key=unique_key | ||||
|         ) | ||||
|         db_session.add(new_photo) | ||||
|         db_session.commit() | ||||
| @ -270,6 +288,25 @@ def delete_photo(photo_id): | ||||
|         db_session.close() | ||||
|         return jsonify({'success': False, 'error': str(e)}), 500 | ||||
|  | ||||
| @app.route('/verify/<filename>', methods=['GET']) | ||||
| def verify_image(filename): | ||||
|     file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | ||||
|     if not os.path.exists(file_path): | ||||
|         return jsonify({'verified': False, 'error': 'Image not found'}) | ||||
|      | ||||
|     try: | ||||
|         extracted_key = extract_message(file_path, 16) | ||||
|         db_session = DBSession() | ||||
|         photo = db_session.query(Photo).filter_by(input_filename=filename).first() | ||||
|         db_session.close() | ||||
|          | ||||
|         if photo and photo.unique_key == extracted_key: | ||||
|             return jsonify({'verified': True, 'message': 'Image ownership verified'}) | ||||
|         else: | ||||
|             return jsonify({'verified': False, 'message': 'Image ownership could not be verified'}) | ||||
|     except Exception as e: | ||||
|         return jsonify({'verified': False, 'error': str(e)}) | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     app.run( | ||||
|         debug=True,  | ||||
|  | ||||
| @ -19,6 +19,7 @@ class Photo(Base): | ||||
|     height = Column(Integer) | ||||
|     highlight_color = Column(String) | ||||
|     orientation = Column(Integer) | ||||
|     unique_key = Column(String(16), nullable=False)  # Add this line | ||||
|  | ||||
| engine = create_engine('sqlite:///photos.db') | ||||
| Base.metadata.create_all(engine) | ||||
|  | ||||
							
								
								
									
										51
									
								
								steganography.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										51
									
								
								steganography.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,51 @@ | ||||
| from PIL import Image | ||||
| import numpy as np | ||||
|  | ||||
| def string_to_binary(message): | ||||
|     return ''.join(format(ord(char), '08b') for char in message) | ||||
|  | ||||
| def embed_message(image_path, message, exifraw): | ||||
|     # Open the image | ||||
|     img = Image.open(image_path) | ||||
|     # Convert image to numpy array | ||||
|     img_array = np.array(img) | ||||
|  | ||||
|     # Flatten the array | ||||
|     flat_array = img_array.flatten() | ||||
|  | ||||
|     # Convert message to binary | ||||
|     binary_message = string_to_binary(message) | ||||
|      | ||||
|     # Check if the message can fit in the image | ||||
|     if len(binary_message) > len(flat_array): | ||||
|         raise ValueError("Message is too long to be embedded in this image") | ||||
|  | ||||
|     # Embed the message | ||||
|     for i, bit in enumerate(binary_message): | ||||
|         flat_array[i] = (flat_array[i] & 0xFE) | int(bit) | ||||
|  | ||||
|     # Reshape the array back to the original image shape | ||||
|     stego_array = flat_array.reshape(img_array.shape) | ||||
|  | ||||
|     # Create a new image from the modified array | ||||
|     stego_img = Image.fromarray(stego_array.astype('uint8'), img.mode) | ||||
|      | ||||
|     # Save the image | ||||
|     stego_img.save(image_path, exif=exifraw) | ||||
|  | ||||
| def extract_message(image_path, message_length): | ||||
|     # Open the image | ||||
|     img = Image.open(image_path) | ||||
|     # Convert image to numpy array | ||||
|     img_array = np.array(img) | ||||
|  | ||||
|     # Flatten the array | ||||
|     flat_array = img_array.flatten() | ||||
|  | ||||
|     # Extract the binary message | ||||
|     binary_message = ''.join([str(pixel & 1) for pixel in flat_array[:message_length * 8]]) | ||||
|  | ||||
|     # Convert binary to string | ||||
|     message = ''.join([chr(int(binary_message[i:i+8], 2)) for i in range(0, len(binary_message), 8)]) | ||||
|  | ||||
|     return message | ||||
		Reference in New Issue
	
	Block a user