import atexit import colorsys import hashlib import os import random import secrets import threading import time from datetime import UTC from datetime import datetime from pathlib import Path from logging import getLogger import logging from logging.config import dictConfig import toml from apscheduler.schedulers.background import BackgroundScheduler from colorthief import ColorThief from flask import (Flask, flash, g, jsonify, redirect, render_template, request, send_from_directory, session, url_for) from flask_limiter import Limiter from flask_limiter.util import get_remote_address from PIL import ExifTags, Image from watchdog.events import FileSystemEventHandler from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.utils import secure_filename from models import Photo from models import Session as DBSession from models import SiteConfig, init_db from steganography import embed_message, extract_message # Add this function to handle secret key persistence def get_or_create_secret_key(): """Get existing secret key or create a new one""" secret_key_file = Path("secret.key") try: if secret_key_file.exists(): logger.info("Loading existing secret key") return secret_key_file.read_bytes() else: logger.info("Generating new secret key") secret_key = os.urandom(32) # Use 32 bytes for better security secret_key_file.write_bytes(secret_key) return secret_key except Exception as e: logger.error(f"Error handling secret key: {e}") # Fallback to a memory-only key if file operations fail return os.urandom(32) DEFAULT_CONFIG = { "server": {"host": "0.0.0.0", "port": 5000}, "directories": {"upload": "uploads", "thumbnail": "thumbnails"}, "admin": {"password": secrets.token_urlsafe(16)}, # Generate secure random password } # Configure logging dictConfig({ 'version': 1, 'formatters': { 'default': { 'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', } }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout', 'formatter': 'default' }, 'file': { 'class': 'logging.FileHandler', 'filename': 'spectra.log', 'formatter': 'default' } }, 'root': { 'level': 'INFO', 'handlers': ['console', 'file'] } }) # Get logger for this module logger = getLogger(__name__) # Create Flask app with persistent secret key app = Flask(__name__) app.secret_key = get_or_create_secret_key() def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def get_highlight_color(image_path): color_thief = ColorThief(image_path) palette = color_thief.get_palette(color_count=6, quality=1) # Convert RGB to HSV and find the color with the highest saturation highlight_color = max(palette, key=lambda rgb: colorsys.rgb_to_hsv(*rgb)[1]) return '#{:02x}{:02x}{:02x}'.format(*highlight_color) def generate_thumbnails(filename): original_path = os.path.join(UPLOAD_FOLDER, filename) thumb_dir = os.path.join(THUMBNAIL_FOLDER, os.path.splitext(filename)[0]) os.makedirs(thumb_dir, exist_ok=True) for size in THUMBNAIL_SIZES: thumb_path = os.path.join(thumb_dir, f"{size}_{filename}") if not os.path.exists(thumb_path): with Image.open(original_path) as img: # Extract EXIF data exif_data = None if "exif" in img.info: exif_data = img.info["exif"] # Resize image img.thumbnail((size, size), Image.LANCZOS) # Save image with EXIF data if exif_data: img.save(thumb_path, exif=exif_data, optimize=True, quality=85) else: img.save(thumb_path, optimize=True, quality=85) def load_or_create_config(): """Load config from file or create with defaults if missing""" config_path = Path("config.toml") logger.info(f"Loading config from {config_path}") try: # If config doesn't exist or is empty, create it with defaults if not config_path.exists() or config_path.stat().st_size == 0: config_data = DEFAULT_CONFIG.copy() with open(config_path, "w") as f: toml.dump(config_data, f) logger.info(f"Created new config file with defaults at {config_path}") logger.warning("Please update the config file with your own values.") return config_data # Load existing config with open(config_path, "r") as f: config_data = toml.load(f) logger.info(f"Loaded config from {config_path}") # Verify required fields required_sections = ["server", "directories", "admin"] required_fields = { "server": ["host", "port"], "directories": ["upload", "thumbnail"], "admin": ["password"], } for section in required_sections: if section not in config_data: raise ValueError(f"Missing required section: {section}") for field in required_fields[section]: if field not in config_data[section]: raise ValueError(f"Missing required field: {section}.{field}") logger.info("Config verification passed") return config_data except Exception as e: logger.error(f"Failed to load config: {str(e)}") raise RuntimeError(f"Failed to load config: {str(e)}") def get_site_config(): """Get the current site configuration""" db_session = DBSession() site_config = db_session.query(SiteConfig).first() if not site_config: site_config = SiteConfig() db_session.add(site_config) db_session.commit() config_dict = { "appearance": { "accent_color": site_config.accent_color, "site_title": site_config.site_title, }, "about": { "name": site_config.author_name, "location": site_config.author_location, "profile_image": site_config.profile_image, "bio": site_config.bio, }, } db_session.close() return config_dict def init_app_config(): """Initialize application configuration""" # Load hard config from file hard_config = load_or_create_config() # Load site config from database site_config = get_site_config() # Merge configs config = hard_config.copy() config.update(site_config) return config # Initialize config before any routes config = init_app_config() app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) UPLOAD_FOLDER = config["directories"]["upload"] THUMBNAIL_FOLDER = config["directories"]["thumbnail"] ALLOWED_EXTENSIONS = {"jpg", "jpeg", "png", "gif"} THUMBNAIL_SIZES = [256, 512, 768, 1024, 1536, 2048] # Create upload and thumbnail directories if they don't exist os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(THUMBNAIL_FOLDER, exist_ok=True) app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER app.config["THUMBNAIL_FOLDER"] = THUMBNAIL_FOLDER app.config["MAX_CONTENT_LENGTH"] = 80 * 1024 * 1024 # 80MB limit scheduler = BackgroundScheduler() scheduler.start() limiter = Limiter( app=app, key_func=get_remote_address, default_limits=["100 per minute"], storage_uri="memory://", ) @app.before_request def before_request(): g.csp_nonce = secrets.token_hex(16) # Update the security headers middleware @app.after_request def add_security_headers(response): nonce = g.csp_nonce # Use the nonce from the before_request hook response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "SAMEORIGIN" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = ( "max-age=31536000; includeSubDomains" ) response.headers["Content-Security-Policy"] = ( f"default-src 'self'; " f"script-src 'self' 'nonce-{nonce}' https://cdn.jsdelivr.net; " f"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; " f"font-src 'self' https://fonts.gstatic.com; " f"img-src 'self' data:; " f"connect-src 'self';" ) # Make nonce available to templates return response # ---------------------------------------------------------------- # Public routes # ---------------------------------------------------------------- @app.route("/") def index(): return render_template( "index.html", accent_color=config["appearance"]["accent_color"], site_title=config["appearance"]["site_title"], about=config["about"], nonce=g.csp_nonce, ) # ---------------------------------------------------------------- # API routes # ---------------------------------------------------------------- @app.route("/api/images") def get_images(): page = int(request.args.get("page", 1)) per_page = 20 db_session = DBSession() photos = ( db_session.query(Photo) .order_by(Photo.date_taken.desc()) .offset((page - 1) * per_page) .limit(per_page) .all() ) images = [] for photo in photos: factor = random.randint(2, 3) if photo.height < 4000 and photo.width < 4000: factor = 1 if photo.orientation == 6 or photo.orientation == 8: width, height = photo.height, photo.width else: width, height = photo.width, photo.height images.append( { "imgSrc": f"/static/thumbnails/{os.path.splitext(photo.input_filename)[0]}/1536_{photo.input_filename}", "width": width / factor, "height": height / factor, "caption": photo.input_filename, "date": photo.date_taken.strftime("%y %m %d"), "technicalInfo": f"{photo.focal_length}MM | F/{photo.aperture} | {photo.shutter_speed} | ISO{photo.iso}", "highlightColor": photo.highlight_color, } ) has_more = db_session.query(Photo).count() > page * per_page db_session.close() return jsonify({"images": images, "hasMore": has_more}) # ---------------------------------------------------------------- # Admin routes # ---------------------------------------------------------------- @app.route("/admin") def admin(): if "logged_in" not in session: return redirect(url_for("admin_login")) db_session = DBSession() photos = db_session.query(Photo).order_by(Photo.date_taken.desc()).all() db_session.close() # Pass the full config to the template return render_template( "admin.html", photos=photos, accent_color=config["appearance"]["accent_color"], config=config, ) @app.route("/admin/logout") def admin_logout(): session.pop("logged_in", None) flash("You have been logged out") return redirect(url_for("admin_login")) @app.route("/admin/update_photo/", methods=["POST"]) def update_photo(photo_id): if "logged_in" not in session: return jsonify({"success": False, "error": "Not logged in"}), 401 data = request.json db_session = DBSession() photo = db_session.query(Photo).get(photo_id) if not photo: db_session.close() return jsonify({"success": False, "error": "Photo not found"}), 404 try: for field, value in data.items(): if field == "date_taken": value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") elif field == "iso": value = int(value) setattr(photo, field, value) db_session.commit() db_session.close() return jsonify({"success": True}) except Exception as e: db_session.rollback() db_session.close() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/admin/delete_photo/", methods=["POST"]) def delete_photo(photo_id): if "logged_in" not in session: return jsonify({"success": False, "error": "Not logged in"}), 401 db_session = DBSession() photo = db_session.query(Photo).get(photo_id) if not photo: db_session.close() return jsonify({"success": False, "error": "Photo not found"}), 404 try: # Delete the original file original_path = os.path.join(UPLOAD_FOLDER, photo.input_filename) if os.path.exists(original_path): os.remove(original_path) # Delete the thumbnail directory thumb_dir = os.path.join( THUMBNAIL_FOLDER, os.path.splitext(photo.input_filename)[0] ) if os.path.exists(thumb_dir): for thumb_file in os.listdir(thumb_dir): os.remove(os.path.join(thumb_dir, thumb_file)) os.rmdir(thumb_dir) # Delete the database entry db_session.delete(photo) db_session.commit() db_session.close() return jsonify({"success": True}) except Exception as e: db_session.rollback() db_session.close() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/verify/", methods=["GET"]) def verify_image(filename): file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename) if not os.path.exists(file_path): return jsonify({"verified": False, "error": "Image not found"}) try: extracted_key = extract_message(file_path, 16) db_session = DBSession() photo = db_session.query(Photo).filter_by(input_filename=filename).first() db_session.close() if photo and photo.unique_key == extracted_key: return jsonify({"verified": True, "message": "Image ownership verified"}) else: return jsonify( {"verified": False, "message": "Image ownership could not be verified"} ) except Exception as e: return jsonify({"verified": False, "error": str(e)}) # Add rate limiting to sensitive endpoints @app.route("/admin/login", methods=["POST", "GET"]) @limiter.limit("5 per minute") def admin_login(): if request.method == "POST": if request.form["password"] == config["admin"]["password"]: session["logged_in"] = True return redirect(url_for("admin")) else: flash("Invalid password") return render_template( "admin_login.html", accent_color=config["appearance"]["accent_color"] ) @app.route("/admin/upload", methods=["POST"]) @limiter.limit("10 per minute") def admin_upload(): if "logged_in" not in session: return redirect(url_for("admin_login")) if "file" not in request.files: flash("No file part") return redirect(url_for("admin")) file = request.files["file"] if file.filename == "": flash("No selected file") return redirect(url_for("admin")) if file and allowed_file(file.filename): filename = secure_filename(file.filename) file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename) file.save(file_path) # Extract EXIF data with error handling exif = {} exifraw = None width = height = 0 try: with Image.open(file_path) as img: width, height = img.size if hasattr(img, '_getexif') and img._getexif() is not None: exifraw = img.info.get("exif") exif = { ExifTags.TAGS[k]: v for k, v in img._getexif().items() if k in ExifTags.TAGS } except Exception as e: logger.warning(f"Error reading EXIF data for {filename}: {str(e)}") # Generate a unique key for the image unique_key = hashlib.sha256( f"{filename}{datetime.now().isoformat()}".encode() ).hexdigest()[:16] # Embed the unique key into the image try: embed_message(file_path, unique_key, exifraw) except ValueError as e: flash(f"Error embedding key: {str(e)}") os.remove(file_path) return redirect(url_for("admin")) # Generate thumbnails generate_thumbnails(filename) # Handle exposure time with error handling try: exposure_time = exif.get("ExposureTime", 0) if isinstance(exposure_time, tuple): exposure_fraction = f"{exposure_time[0]}/{exposure_time[1]}" else: exposure_fraction = f"1/{int(1/float(exposure_time))}" if exposure_time else "0" except (TypeError, ZeroDivisionError): exposure_fraction = "0" # Create database entry with safe defaults db_session = DBSession() new_photo = Photo( input_filename=filename, thumbnail_filename=f"{os.path.splitext(filename)[0]}/256_{filename}", focal_length=str( exif.get("FocalLengthIn35mmFilm", exif.get("FocalLength", "0")) ), aperture=str(exif.get("FNumber", "0")), shutter_speed=exposure_fraction, date_taken=datetime.strptime( str(exif.get("DateTime", "1970:01:01 00:00:00")), "%Y:%m:%d %H:%M:%S" ), iso=int(exif.get("ISOSpeedRatings", 0)), orientation=int(exif.get("Orientation", 1)), width=width, height=height, highlight_color=get_highlight_color( THUMBNAIL_FOLDER + f"/{os.path.splitext(filename)[0]}/256_{filename}" ), unique_key=unique_key, ) db_session.add(new_photo) db_session.commit() db_session.close() flash("File uploaded successfully") return redirect(url_for("admin")) flash("Invalid file type") return redirect(url_for("admin")) # Add a new route to handle config updates @app.route("/admin/update_config", methods=["POST"]) def update_config(): if "logged_in" not in session: return jsonify({"success": False, "error": "Not logged in"}), 401 try: data = request.json db_session = DBSession() site_config = db_session.query(SiteConfig).first() # Map form fields to SiteConfig attributes field_mapping = { "appearance.site_title": "site_title", "appearance.accent_color": "accent_color", "about.name": "author_name", "about.location": "author_location", "about.profile_image": "profile_image", "about.bio": "bio", } # Update site config for form_field, db_field in field_mapping.items(): if form_field in data: setattr(site_config, db_field, data[form_field]) site_config.updated_at = datetime.now(UTC) db_session.commit() db_session.close() # Reload config site_config = get_site_config() config.update(site_config) return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 # Add a new endpoint to view config history @app.route("/admin/config_history") def config_history(): if "logged_in" not in session: return redirect(url_for("admin_login")) db_session = DBSession() site_configs = ( db_session.query(SiteConfig).order_by(SiteConfig.updated_at.desc()).all() ) db_session.close() return render_template( "config_history.html", site_configs=site_configs, accent_color=config["appearance"]["accent_color"], ) # ---------------------------------------------------------------- # Static routes # ---------------------------------------------------------------- @app.route("/static/thumbnails/") def serve_thumbnail(filename): return send_from_directory(THUMBNAIL_FOLDER, filename) # Initialize database tables init_db() if __name__ == "__main__": app.run(debug=False, port=config["server"]["port"], host=config["server"]["host"])