spectra/app.py
Tanishq Dubey 4c993ebacd
All checks were successful
Docker Build and Publish / build (push) Successful in 6s
Secret Key
2024-11-05 19:36:44 -05:00

598 lines
19 KiB
Python

import atexit
import colorsys
import hashlib
import os
import random
import secrets
import threading
import time
from datetime import UTC
from datetime import datetime
from pathlib import Path
from logging import getLogger
import logging
from logging.config import dictConfig
import toml
from apscheduler.schedulers.background import BackgroundScheduler
from colorthief import ColorThief
from flask import (Flask, flash, g, jsonify, redirect, render_template,
request, send_from_directory, session, url_for)
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from PIL import ExifTags, Image
from watchdog.events import FileSystemEventHandler
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.utils import secure_filename
from models import Photo
from models import Session as DBSession
from models import SiteConfig, init_db
from steganography import embed_message, extract_message
# Add this function to handle secret key persistence
def get_or_create_secret_key():
secret_key_file = Path("secret.key")
if secret_key_file.exists():
return secret_key_file.read_bytes()
else:
secret_key = os.urandom(24)
secret_key_file.write_bytes(secret_key)
return secret_key
DEFAULT_CONFIG = {
"server": {"host": "0.0.0.0", "port": 5000},
"directories": {"upload": "uploads", "thumbnail": "thumbnails"},
"admin": {"password": secrets.token_urlsafe(16)}, # Generate secure random password
}
# Configure logging
dictConfig({
'version': 1,
'formatters': {
'default': {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
'formatter': 'default'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'spectra.log',
'formatter': 'default'
}
},
'root': {
'level': 'INFO',
'handlers': ['console', 'file']
}
})
# Get logger for this module
logger = getLogger(__name__)
# Create Flask app with persistent secret key
app = Flask(__name__)
app.secret_key = get_or_create_secret_key()
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_highlight_color(image_path):
color_thief = ColorThief(image_path)
palette = color_thief.get_palette(color_count=6, quality=1)
# Convert RGB to HSV and find the color with the highest saturation
highlight_color = max(palette, key=lambda rgb: colorsys.rgb_to_hsv(*rgb)[1])
return '#{:02x}{:02x}{:02x}'.format(*highlight_color)
def generate_thumbnails(filename):
original_path = os.path.join(UPLOAD_FOLDER, filename)
thumb_dir = os.path.join(THUMBNAIL_FOLDER, os.path.splitext(filename)[0])
os.makedirs(thumb_dir, exist_ok=True)
for size in THUMBNAIL_SIZES:
thumb_path = os.path.join(thumb_dir, f"{size}_{filename}")
if not os.path.exists(thumb_path):
with Image.open(original_path) as img:
# Extract EXIF data
exif_data = None
if "exif" in img.info:
exif_data = img.info["exif"]
# Resize image
img.thumbnail((size, size), Image.LANCZOS)
# Save image with EXIF data
if exif_data:
img.save(thumb_path, exif=exif_data, optimize=True, quality=85)
else:
img.save(thumb_path, optimize=True, quality=85)
def load_or_create_config():
"""Load config from file or create with defaults if missing"""
config_path = Path("config.toml")
logger.info(f"Loading config from {config_path}")
try:
# If config doesn't exist or is empty, create it with defaults
if not config_path.exists() or config_path.stat().st_size == 0:
config_data = DEFAULT_CONFIG.copy()
with open(config_path, "w") as f:
toml.dump(config_data, f)
logger.info(f"Created new config file with defaults at {config_path}")
logger.warning("Please update the config file with your own values.")
return config_data
# Load existing config
with open(config_path, "r") as f:
config_data = toml.load(f)
logger.info(f"Loaded config from {config_path}")
# Verify required fields
required_sections = ["server", "directories", "admin"]
required_fields = {
"server": ["host", "port"],
"directories": ["upload", "thumbnail"],
"admin": ["password"],
}
for section in required_sections:
if section not in config_data:
raise ValueError(f"Missing required section: {section}")
for field in required_fields[section]:
if field not in config_data[section]:
raise ValueError(f"Missing required field: {section}.{field}")
logger.info("Config verification passed")
return config_data
except Exception as e:
logger.error(f"Failed to load config: {str(e)}")
raise RuntimeError(f"Failed to load config: {str(e)}")
def get_site_config():
"""Get the current site configuration"""
db_session = DBSession()
site_config = db_session.query(SiteConfig).first()
if not site_config:
site_config = SiteConfig()
db_session.add(site_config)
db_session.commit()
config_dict = {
"appearance": {
"accent_color": site_config.accent_color,
"site_title": site_config.site_title,
},
"about": {
"name": site_config.author_name,
"location": site_config.author_location,
"profile_image": site_config.profile_image,
"bio": site_config.bio,
},
}
db_session.close()
return config_dict
def init_app_config():
"""Initialize application configuration"""
# Load hard config from file
hard_config = load_or_create_config()
# Load site config from database
site_config = get_site_config()
# Merge configs
config = hard_config.copy()
config.update(site_config)
return config
# Initialize config before any routes
config = init_app_config()
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
UPLOAD_FOLDER = config["directories"]["upload"]
THUMBNAIL_FOLDER = config["directories"]["thumbnail"]
ALLOWED_EXTENSIONS = {"jpg", "jpeg", "png", "gif"}
THUMBNAIL_SIZES = [256, 512, 768, 1024, 1536, 2048]
# Create upload and thumbnail directories if they don't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(THUMBNAIL_FOLDER, exist_ok=True)
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
app.config["THUMBNAIL_FOLDER"] = THUMBNAIL_FOLDER
app.config["MAX_CONTENT_LENGTH"] = 80 * 1024 * 1024 # 80MB limit
scheduler = BackgroundScheduler()
scheduler.start()
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["100 per minute"],
storage_uri="memory://",
)
# Generate a strong secret key at startup
app.secret_key = secrets.token_hex(32)
@app.before_request
def before_request():
g.csp_nonce = secrets.token_hex(16)
# Update the security headers middleware
@app.after_request
def add_security_headers(response):
nonce = g.csp_nonce # Use the nonce from the before_request hook
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "SAMEORIGIN"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains"
)
response.headers["Content-Security-Policy"] = (
f"default-src 'self'; "
f"script-src 'self' 'nonce-{nonce}' https://cdn.jsdelivr.net; "
f"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
f"font-src 'self' https://fonts.gstatic.com; "
f"img-src 'self' data:; "
f"connect-src 'self';"
)
# Make nonce available to templates
return response
# ----------------------------------------------------------------
# Public routes
# ----------------------------------------------------------------
@app.route("/")
def index():
return render_template(
"index.html",
accent_color=config["appearance"]["accent_color"],
site_title=config["appearance"]["site_title"],
about=config["about"],
nonce=g.csp_nonce,
)
# ----------------------------------------------------------------
# API routes
# ----------------------------------------------------------------
@app.route("/api/images")
def get_images():
page = int(request.args.get("page", 1))
per_page = 20
db_session = DBSession()
photos = (
db_session.query(Photo)
.order_by(Photo.date_taken.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
images = []
for photo in photos:
factor = random.randint(2, 3)
if photo.height < 4000 or photo.width < 4000:
factor = 1
if photo.orientation == 6 or photo.orientation == 8:
width, height = photo.height, photo.width
else:
width, height = photo.width, photo.height
images.append(
{
"imgSrc": f"/static/thumbnails/{os.path.splitext(photo.input_filename)[0]}/1536_{photo.input_filename}",
"width": width / factor,
"height": height / factor,
"caption": photo.input_filename,
"date": photo.date_taken.strftime("%y %m %d"),
"technicalInfo": f"{photo.focal_length}MM | F/{photo.aperture} | {photo.shutter_speed} | ISO{photo.iso}",
"highlightColor": photo.highlight_color,
}
)
has_more = db_session.query(Photo).count() > page * per_page
db_session.close()
return jsonify({"images": images, "hasMore": has_more})
# ----------------------------------------------------------------
# Admin routes
# ----------------------------------------------------------------
@app.route("/admin")
def admin():
if "logged_in" not in session:
return redirect(url_for("admin_login"))
db_session = DBSession()
photos = db_session.query(Photo).order_by(Photo.date_taken.desc()).all()
db_session.close()
# Pass the full config to the template
return render_template(
"admin.html",
photos=photos,
accent_color=config["appearance"]["accent_color"],
config=config,
)
@app.route("/admin/logout")
def admin_logout():
session.pop("logged_in", None)
flash("You have been logged out")
return redirect(url_for("admin_login"))
@app.route("/admin/update_photo/<int:photo_id>", methods=["POST"])
def update_photo(photo_id):
if "logged_in" not in session:
return jsonify({"success": False, "error": "Not logged in"}), 401
data = request.json
db_session = DBSession()
photo = db_session.query(Photo).get(photo_id)
if not photo:
db_session.close()
return jsonify({"success": False, "error": "Photo not found"}), 404
try:
for field, value in data.items():
if field == "date_taken":
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
elif field == "iso":
value = int(value)
setattr(photo, field, value)
db_session.commit()
db_session.close()
return jsonify({"success": True})
except Exception as e:
db_session.rollback()
db_session.close()
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/admin/delete_photo/<int:photo_id>", methods=["POST"])
def delete_photo(photo_id):
if "logged_in" not in session:
return jsonify({"success": False, "error": "Not logged in"}), 401
db_session = DBSession()
photo = db_session.query(Photo).get(photo_id)
if not photo:
db_session.close()
return jsonify({"success": False, "error": "Photo not found"}), 404
try:
# Delete the original file
original_path = os.path.join(UPLOAD_FOLDER, photo.input_filename)
if os.path.exists(original_path):
os.remove(original_path)
# Delete the thumbnail directory
thumb_dir = os.path.join(
THUMBNAIL_FOLDER, os.path.splitext(photo.input_filename)[0]
)
if os.path.exists(thumb_dir):
for thumb_file in os.listdir(thumb_dir):
os.remove(os.path.join(thumb_dir, thumb_file))
os.rmdir(thumb_dir)
# Delete the database entry
db_session.delete(photo)
db_session.commit()
db_session.close()
return jsonify({"success": True})
except Exception as e:
db_session.rollback()
db_session.close()
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/verify/<filename>", methods=["GET"])
def verify_image(filename):
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
if not os.path.exists(file_path):
return jsonify({"verified": False, "error": "Image not found"})
try:
extracted_key = extract_message(file_path, 16)
db_session = DBSession()
photo = db_session.query(Photo).filter_by(input_filename=filename).first()
db_session.close()
if photo and photo.unique_key == extracted_key:
return jsonify({"verified": True, "message": "Image ownership verified"})
else:
return jsonify(
{"verified": False, "message": "Image ownership could not be verified"}
)
except Exception as e:
return jsonify({"verified": False, "error": str(e)})
# Add rate limiting to sensitive endpoints
@app.route("/admin/login", methods=["POST", "GET"])
@limiter.limit("5 per minute")
def admin_login():
if request.method == "POST":
if request.form["password"] == config["admin"]["password"]:
session["logged_in"] = True
return redirect(url_for("admin"))
else:
flash("Invalid password")
return render_template(
"admin_login.html", accent_color=config["appearance"]["accent_color"]
)
@app.route("/admin/upload", methods=["POST"])
@limiter.limit("10 per minute")
def admin_upload():
if "logged_in" not in session:
return redirect(url_for("admin_login"))
if "file" not in request.files:
flash("No file part")
return redirect(url_for("admin"))
file = request.files["file"]
if file.filename == "":
flash("No selected file")
return redirect(url_for("admin"))
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(file_path)
# Extract EXIF data
exif = None
exifraw = None
with Image.open(file_path) as img:
exifraw = img.info["exif"]
width, height = img.size
exif = {
ExifTags.TAGS[k]: v
for k, v in img._getexif().items()
if k in ExifTags.TAGS
}
# Generate a unique key for the image
unique_key = hashlib.sha256(
f"{filename}{datetime.now().isoformat()}".encode()
).hexdigest()[:16]
# Embed the unique key into the image
try:
embed_message(file_path, unique_key, exifraw)
except ValueError as e:
flash(f"Error embedding key: {str(e)}")
os.remove(file_path)
return redirect(url_for("admin"))
# Generate thumbnails
generate_thumbnails(filename)
# Get image dimensions
with Image.open(file_path) as img:
width, height = img.size
exposure_time = exif["ExposureTime"]
if isinstance(exposure_time, tuple):
exposure_fraction = f"{exposure_time[0]}/{exposure_time[1]}"
else:
exposure_fraction = f"1/{int(1/float(exposure_time))}"
# Create database entry
db_session = DBSession()
new_photo = Photo(
input_filename=filename,
thumbnail_filename=f"{os.path.splitext(filename)[0]}/256_{filename}",
focal_length=str(
exif.get("FocalLengthIn35mmFilm", exif.get("FocalLength", ""))
),
aperture=str(exif.get("FNumber", "")),
shutter_speed=exposure_fraction,
date_taken=datetime.strptime(
str(exif.get("DateTime", "1970:01:01 00:00:00")), "%Y:%m:%d %H:%M:%S"
),
iso=int(exif.get("ISOSpeedRatings", 0)),
orientation=int(exif.get("Orientation", 1)),
width=width,
height=height,
highlight_color=get_highlight_color(
THUMBNAIL_FOLDER + f"/{os.path.splitext(filename)[0]}/256_{filename}"
),
unique_key=unique_key,
)
db_session.add(new_photo)
db_session.commit()
db_session.close()
flash("File uploaded successfully")
return redirect(url_for("admin"))
flash("Invalid file type")
return redirect(url_for("admin"))
# Add a new route to handle config updates
@app.route("/admin/update_config", methods=["POST"])
def update_config():
if "logged_in" not in session:
return jsonify({"success": False, "error": "Not logged in"}), 401
try:
data = request.json
db_session = DBSession()
site_config = db_session.query(SiteConfig).first()
# Map form fields to SiteConfig attributes
field_mapping = {
"appearance.site_title": "site_title",
"appearance.accent_color": "accent_color",
"about.name": "author_name",
"about.location": "author_location",
"about.profile_image": "profile_image",
"about.bio": "bio",
}
# Update site config
for form_field, db_field in field_mapping.items():
if form_field in data:
setattr(site_config, db_field, data[form_field])
site_config.updated_at = datetime.now(UTC)
db_session.commit()
db_session.close()
# Reload config
site_config = get_site_config()
config.update(site_config)
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
# Add a new endpoint to view config history
@app.route("/admin/config_history")
def config_history():
if "logged_in" not in session:
return redirect(url_for("admin_login"))
db_session = DBSession()
site_configs = (
db_session.query(SiteConfig).order_by(SiteConfig.updated_at.desc()).all()
)
db_session.close()
return render_template(
"config_history.html",
site_configs=site_configs,
accent_color=config["appearance"]["accent_color"],
)
# ----------------------------------------------------------------
# Static routes
# ----------------------------------------------------------------
@app.route("/static/thumbnails/<path:filename>")
def serve_thumbnail(filename):
return send_from_directory(THUMBNAIL_FOLDER, filename)
# Initialize database tables
init_db()
if __name__ == "__main__":
app.run(debug=False, port=config["server"]["port"], host=config["server"]["host"])