Switch to gunicorn , need to add config file and proper password loading
All checks were successful
Datadog Software Composition Analysis / Datadog SBOM Generation and Upload (push) Successful in 16s
Datadog Static Analysis / Datadog Static Analyzer (push) Successful in 28s

This commit is contained in:
Tanishq Dubey 2025-02-28 23:23:16 -05:00
parent c5caa0848b
commit 4d79f86df4
7 changed files with 429 additions and 28 deletions

View File

@ -3,6 +3,7 @@ from src.routes.routes import RouteManager
from src.config.args import create_parser from src.config.args import create_parser
from src.config.config import Configuration from src.config.config import Configuration
from src.rendering.helpers import TemplateHelpers from src.rendering.helpers import TemplateHelpers
from src.server.file_manager import create_filemanager_blueprint
def main(): def main():
@ -15,6 +16,7 @@ def main():
r = RouteManager(c) r = RouteManager(c)
t = TemplateHelpers(c) t = TemplateHelpers(c)
print("here")
server = Server() server = Server()
server.register_template_function("get_sibling_content_files", t.get_sibling_content_files) server.register_template_function("get_sibling_content_files", t.get_sibling_content_files)
@ -27,7 +29,13 @@ def main():
server.register_route("/", r.default_route, defaults={"path": ""}) server.register_route("/", r.default_route, defaults={"path": ""})
server.register_route("/<path:path>", r.default_route) server.register_route("/<path:path>", r.default_route)
file_manager_bp = create_filemanager_blueprint(c.content_dir, url_prefix='/admin', auth_password="password")
server.app.register_blueprint(file_manager_bp)
try:
server.run() server.run()
except Exception as e:
print(e)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,10 +1,9 @@
from dataclasses import dataclass from dataclasses import dataclass
from src.config.config import Configuration from src.config.config import Configuration
from src.rendering import GENERIC_FILE_MAPPING from src.rendering import GENERIC_FILE_MAPPING
from src.rendering.markdown import render_markdown from src.rendering.markdown import render_markdown, read_raw_markdown, rendered_markdown_to_plain_text
from enum import Enum from enum import Enum
from thumbhash import image_to_thumbhash
from PIL import Image from PIL import Image
from datetime import datetime from datetime import datetime
import frontmatter import frontmatter
@ -78,9 +77,9 @@ class TemplateHelpers:
ret.typeMeta = MarkdownMetadata({}, "", "") ret.typeMeta = MarkdownMetadata({}, "", "")
ret.typeMeta.fontmatter = frontmatter.load(file_path) ret.typeMeta.fontmatter = frontmatter.load(file_path)
ret.typeMeta.content = render_markdown(file_path) ret.typeMeta.content = render_markdown(file_path)
ret.typeMeta.preview = ret.typeMeta.content[:100] ret.typeMeta.rawContent = read_raw_markdown(file_path)
if "#" in ret.typeMeta.preview: ret.typeMeta.rawText = rendered_markdown_to_plain_text(ret.typeMeta.content)
ret.typeMeta.preview = ret.typeMeta.preview.split("#")[0] ret.typeMeta.preview = ret.typeMeta.rawText[:500] + "..."
return ret return ret
return None return None

View File

@ -1,6 +1,7 @@
import frontmatter import frontmatter
import mistune import mistune
from pathlib import Path from pathlib import Path
from bs4 import BeautifulSoup
mistune_render = mistune.create_markdown( mistune_render = mistune.create_markdown(
@ -10,7 +11,18 @@ mistune_render = mistune.create_markdown(
def render_markdown(path: Path) -> str: def render_markdown(path: Path) -> str:
with open(path, "r") as f: with open(path, "r", encoding="utf-8") as f:
obj = frontmatter.load(f) obj = frontmatter.load(f)
res = mistune_render(obj.content) res = mistune_render(obj.content)
return res return res
def read_raw_markdown(path: Path) -> str:
with open(path, "r") as f:
obj = frontmatter.load(f)
return obj.content
def rendered_markdown_to_plain_text(html):
text = "\n\n".join(BeautifulSoup(html, features="html.parser").stripped_strings)
return text

View File

@ -1,6 +1,6 @@
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from pprint import pprint as pp from pprint import pprint
from flask import render_template_string, send_file from flask import render_template_string, send_file
from src.rendering import GENERIC_FILE_MAPPING from src.rendering import GENERIC_FILE_MAPPING
@ -23,15 +23,15 @@ def count_file_extensions(path):
def determine_type(path: Path) -> tuple[str, str, Optional[str]]: def determine_type(path: Path) -> tuple[str, str, Optional[str]]:
if path.is_file(): if path.is_file():
# Find extension in GENERIC_FILE_MAPPING, it may match multiple # Find extension in GENERIC_FILE_MAPPING, it may match multiple
generic_mapping = None generic_mapping = []
for file_type, extensions in GENERIC_FILE_MAPPING.items(): for file_type, extensions in GENERIC_FILE_MAPPING.items():
if path.suffix[1:] in extensions: if path.suffix[1:] in extensions:
if generic_mapping is None: if not generic_mapping:
generic_mapping = [file_type] generic_mapping = [file_type]
else: else:
generic_mapping.append(file_type) generic_mapping.append(file_type)
generic_mapping.reverse() generic_mapping.reverse()
if generic_mapping is None: if not generic_mapping:
generic_mapping = ["other"] generic_mapping = ["other"]
extension = path.suffix[1:] extension = path.suffix[1:]
return "file", generic_mapping, extension return "file", generic_mapping, extension
@ -39,7 +39,7 @@ def determine_type(path: Path) -> tuple[str, str, Optional[str]]:
files_map = count_file_extensions(path) files_map = count_file_extensions(path)
if files_map: if files_map:
most_seen_extension = max(files_map, key=files_map.get) most_seen_extension = max(files_map, key=files_map.get)
generic_mapping = None generic_mapping = []
for file_type, extensions in GENERIC_FILE_MAPPING.items(): for file_type, extensions in GENERIC_FILE_MAPPING.items():
if most_seen_extension[1:] in extensions: if most_seen_extension[1:] in extensions:
if generic_mapping is None: if generic_mapping is None:
@ -97,10 +97,10 @@ def render_page(
): ):
if not path.exists(): if not path.exists():
return render_error_page( return render_error_page(
404, error_code=404,
"Not Found", error_message="Not Found",
"The requested resource was not found on this server.", error_description="The requested resource was not found on this server.",
template_path, template_path=template_path,
) )
target_path = path target_path = path
target_file = path target_file = path

View File

@ -51,7 +51,6 @@ class RouteManager:
Any illegal path should raise an exception Any illegal path should raise an exception
""" """
file_path: Path = self.config.content_dir / (path if path else "index.md") file_path: Path = self.config.content_dir / (path if path else "index.md")
print(file_path)
if file_path < self.config.content_dir: if file_path < self.config.content_dir:
raise Exception("Illegal path") raise Exception("Illegal path")
@ -65,11 +64,7 @@ class RouteManager:
def default_route(self, path: str): def default_route(self, path: str):
try: try:
self._ensure_route(path) self._ensure_route(path)
print("all good")
print(path)
print("=============")
except Exception as e: except Exception as e:
print(e)
return render_error_page( return render_error_page(
404, 404,
"Not Found", "Not Found",

357
src/server/file_manager.py Normal file
View File

@ -0,0 +1,357 @@
import os
import shutil
from flask import Blueprint, request, render_template_string, send_from_directory, redirect, url_for, flash, session
from werkzeug.utils import secure_filename
def create_filemanager_blueprint(base_dir, url_prefix='/files', auth_password=None):
"""
Creates a Flask blueprint providing a simple file manager with a clipboard-style
move operation (cut/paste) for the given base directory.
"""
base_dir = os.path.abspath(base_dir)
os.makedirs(base_dir, exist_ok=True)
filemanager = Blueprint('filemanager', __name__, url_prefix=url_prefix)
def secure_path(path):
"""Ensure that the provided relative path stays within the base_dir."""
safe_path = os.path.abspath(os.path.join(base_dir, path))
if not safe_path.startswith(base_dir):
raise Exception("Invalid path")
return safe_path
@filemanager.before_request
def require_login():
if auth_password is not None:
# Allow access to login and logout pages without being authenticated.
if request.endpoint in ['filemanager.login', 'filemanager.logout']:
return None
if not session.get('filemanager_authenticated'):
return redirect(url_for('filemanager.login', next=request.url))
return None
@filemanager.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
password = request.form.get('password', '')
if password == auth_password:
session['filemanager_authenticated'] = True
flash("Logged in successfully")
next_url = request.args.get('next') or url_for('filemanager.index')
return redirect(next_url)
else:
flash("Incorrect password")
return render_template_string('''
<!doctype html>
<html>
<head><title>Login</title></head>
<body>
<h1>Login</h1>
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul>
{% for message in messages %}
<li>{{ message }}</li>
{% endfor %}
</ul>
{% endif %}
{% endwith %}
<form method="post">
<input type="password" name="password" placeholder="Password">
<input type="submit" value="Login">
</form>
</body>
</html>
''')
@filemanager.route('/logout', methods=['POST'])
def logout():
session.pop('filemanager_authenticated', None)
flash("Logged out")
return redirect(url_for('filemanager.login'))
@filemanager.route('/')
def index():
# Determine current directory from query parameter; defaults to the base.
rel_path = request.args.get('path', '')
try:
abs_path = secure_path(rel_path)
except Exception:
return "Invalid path", 400
if not os.path.isdir(abs_path):
return "Not a directory", 400
# Build a list of items (files and folders) in the current directory.
items = []
for entry in os.listdir(abs_path):
entry_path = os.path.join(abs_path, entry)
rel_entry_path = os.path.join(rel_path, entry) if rel_path else entry
items.append({
'name': entry,
'is_dir': os.path.isdir(entry_path),
'path': rel_entry_path
})
items.sort(key=lambda x: (not x['is_dir'], x['name'].lower()))
parent = os.path.dirname(rel_path) if rel_path else ''
# Minimal HTML template that displays the file tree along with a clipboard if active.
template = """
<!doctype html>
<html>
<head>
<title>File Manager</title>
</head>
<body>
<h1>File Manager</h1>
<p>Current Directory: /{{ rel_path }}</p>
{% if rel_path %}
<p><a href="{{ url_for('filemanager.index', path=parent) }}">Go up</a></p>
{% endif %}
{% if session.clipboard %}
<div style="border:1px solid #ccc; padding:10px; margin:10px 0;">
<h3>Clipboard</h3>
<ul>
{% for item in session.clipboard %}
<li>{{ item }}</li>
{% endfor %}
</ul>
<!-- Complete Move: paste the clipboard items into the current folder -->
<form method="post" action="{{ url_for('filemanager.paste') }}">
<input type="hidden" name="dest" value="{{ rel_path }}">
<button type="submit">Complete Move Here</button>
</form>
<!-- Cancel the move -->
<form method="post" action="{{ url_for('filemanager.cancel_move') }}">
<button type="submit">Cancel Move</button>
</form>
</div>
{% endif %}
<ul>
{% for item in items %}
<li>
<!-- Checkbox for bulk operations -->
<input type="checkbox" class="bulk" value="{{ item.path }}">
{% if item.is_dir %}
📁 <a href="{{ url_for('filemanager.index', path=item.path) }}">{{ item.name }}</a>
[<a href="#" onclick="deleteItem('{{ item.path }}'); return false;">Delete</a>]
[<a href="#" onclick="renameItem('{{ item.path }}'); return false;">Rename</a>]
[<a href="#" onclick="cutItem('{{ item.path }}'); return false;">Cut</a>]
{% else %}
📄 {{ item.name }}
[<a href="{{ url_for('filemanager.download', path=item.path) }}">Download</a>]
[<a href="#" onclick="deleteItem('{{ item.path }}'); return false;">Delete</a>]
[<a href="#" onclick="renameItem('{{ item.path }}'); return false;">Rename</a>]
[<a href="#" onclick="cutItem('{{ item.path }}'); return false;">Cut</a>]
{% endif %}
</li>
{% endfor %}
</ul>
<button onclick="bulkCut()">Bulk Cut Selected</button>
<hr>
<h2>Upload File</h2>
<form action="{{ url_for('filemanager.upload') }}" method="post" enctype="multipart/form-data">
<input type="hidden" name="path" value="{{ rel_path }}">
<input type="file" name="file">
<input type="submit" value="Upload">
</form>
<hr>
<h2>Create New Directory</h2>
<form action="{{ url_for('filemanager.mkdir') }}" method="post">
<!-- Pass the current directory as a hidden value -->
<input type="hidden" name="path" value="{{ rel_path }}">
<input type="text" name="dirname" placeholder="New Directory Name">
<input type="submit" value="Create Directory">
</form>
<script>
function deleteItem(path) {
if(confirm("Are you sure you want to delete " + path + "?")) {
var form = document.createElement("form");
form.method = "post";
form.action = "{{ url_for('filemanager.delete') }}";
var input = document.createElement("input");
input.type = "hidden";
input.name = "path";
input.value = path;
form.appendChild(input);
document.body.appendChild(form);
form.submit();
}
}
function renameItem(path) {
var newName = prompt("Enter new name for " + path);
if(newName) {
var form = document.createElement("form");
form.method = "post";
form.action = "{{ url_for('filemanager.rename') }}";
var input1 = document.createElement("input");
input1.type = "hidden";
input1.name = "path";
input1.value = path;
form.appendChild(input1);
var input2 = document.createElement("input");
input2.type = "hidden";
input2.name = "new_name";
input2.value = newName;
form.appendChild(input2);
document.body.appendChild(form);
form.submit();
}
}
function cutItem(path) {
// Submit a form to add a single item to the clipboard.
var form = document.createElement("form");
form.method = "post";
form.action = "{{ url_for('filemanager.cut') }}";
var input = document.createElement("input");
input.type = "hidden";
input.name = "paths";
input.value = path;
form.appendChild(input);
document.body.appendChild(form);
form.submit();
}
function bulkCut() {
// Gather all selected items and submit them to the clipboard.
var checkboxes = document.querySelectorAll('.bulk:checked');
if(checkboxes.length === 0) {
alert("No items selected");
return;
}
var form = document.createElement("form");
form.method = "post";
form.action = "{{ url_for('filemanager.cut') }}";
checkboxes.forEach(function(box) {
var input = document.createElement("input");
input.type = "hidden";
input.name = "paths";
input.value = box.value;
form.appendChild(input);
});
document.body.appendChild(form);
form.submit();
}
</script>
</body>
</html>
"""
return render_template_string(template, items=items, rel_path=rel_path, parent=parent)
@filemanager.route('/download')
def download():
rel_path = request.args.get('path', '')
try:
abs_path = secure_path(rel_path)
except Exception:
return "Invalid path", 400
if not os.path.isfile(abs_path):
return "File not found", 404
directory = os.path.dirname(abs_path)
filename = os.path.basename(abs_path)
return send_from_directory(directory, filename, as_attachment=True)
@filemanager.route('/delete', methods=['POST'])
def delete():
rel_path = request.form.get('path', '')
try:
abs_path = secure_path(rel_path)
except Exception:
return "Invalid path", 400
if os.path.isfile(abs_path):
os.remove(abs_path)
elif os.path.isdir(abs_path):
shutil.rmtree(abs_path)
else:
return "Not found", 404
flash("Deleted successfully")
parent = os.path.dirname(rel_path)
return redirect(url_for('filemanager.index', path=parent))
@filemanager.route('/upload', methods=['POST'])
def upload():
rel_path = request.form.get('path', '')
try:
abs_path = secure_path(rel_path)
except Exception:
return "Invalid path", 400
if not os.path.isdir(abs_path):
return "Not a directory", 400
file = request.files.get('file')
if file:
filename = secure_filename(file.filename)
file.save(os.path.join(abs_path, filename))
flash("Uploaded successfully")
return redirect(url_for('filemanager.index', path=rel_path))
@filemanager.route('/rename', methods=['POST'])
def rename():
rel_path = request.form.get('path', '')
new_name = request.form.get('new_name', '')
try:
abs_path = secure_path(rel_path)
new_rel_path = os.path.join(os.path.dirname(rel_path), new_name)
new_abs_path = secure_path(new_rel_path)
except Exception:
return "Invalid path", 400
os.rename(abs_path, new_abs_path)
flash("Renamed successfully")
parent = os.path.dirname(rel_path)
return redirect(url_for('filemanager.index', path=parent))
@filemanager.route('/cut', methods=['POST'])
def cut():
# Add one or more items to the clipboard.
paths = request.form.getlist('paths')
session['clipboard'] = paths
flash("Item(s) added to clipboard")
return redirect(request.referrer or url_for('filemanager.index'))
@filemanager.route('/paste', methods=['POST'])
def paste():
# Move clipboard items to the destination (current folder).
dest = request.form.get('dest', '')
try:
dest_abs = secure_path(dest)
except Exception:
return "Invalid destination", 400
if not os.path.isdir(dest_abs):
return "Destination not a directory", 400
clipboard = session.get('clipboard', [])
for rel_path in clipboard:
try:
abs_path = secure_path(rel_path)
filename = os.path.basename(abs_path)
shutil.move(abs_path, os.path.join(dest_abs, filename))
except Exception as e:
flash(f"Error moving {rel_path}: {str(e)}")
session.pop('clipboard', None)
flash("Moved items successfully")
return redirect(url_for('filemanager.index', path=dest))
@filemanager.route('/mkdir', methods=['POST'])
def mkdir():
# The current directory is passed as a hidden form field.
rel_path = request.form.get('path', '')
# The new directory name is passed from the form.
dirname = request.form.get('dirname', '')
if not dirname:
flash("Directory name cannot be empty")
return redirect(url_for('filemanager.index', path=rel_path))
try:
# Use secure_filename to sanitize the new directory name.
safe_dirname = secure_filename(dirname)
# Build the target path relative to the current directory.
new_dir_path = secure_path(os.path.join(rel_path, safe_dirname))
os.makedirs(new_dir_path, exist_ok=False)
flash(f"Directory '{dirname}' created successfully")
except Exception as e:
flash(f"Error creating directory: {str(e)}")
return redirect(url_for('filemanager.index', path=rel_path))
@filemanager.route('/cancel_move', methods=['POST'])
def cancel_move():
session.pop('clipboard', None)
flash("Move cancelled")
return redirect(request.referrer or url_for('filemanager.index'))
return filemanager

View File

@ -1,28 +1,58 @@
from flask import Flask from flask import Flask
from typing import Callable, Dict from typing import Callable, Dict
from src.server.file_manager import create_filemanager_blueprint
from gunicorn.app.base import BaseApplication
import multiprocessing
class Server: class Server(BaseApplication):
def __init__( def __init__(
self, self,
debug: bool = True, debug: bool = True,
host: str = "0.0.0.0", host: str = "0.0.0.0",
port: int = 8080, port: int = 8080,
template_functions: Dict[str, Callable] = {}, template_functions: Dict[str, Callable] = None,
enable_admin_browser: bool = False,
workers: int = multiprocessing.cpu_count() // 2 + 1,
options=None,
): ):
if template_functions is None:
template_functions = {}
self.debug = debug self.debug = debug
self.host = host self.host = host
self.port = port self.port = port
self.app = Flask(__name__) self.app = Flask(__name__)
self.application = self.app
self.app.secret_key = "your_secret_key"
self.options = options or {
"bind": f"{self.host}:{self.port}",
"reload": True, # Enable automatic reloading
"threads": workers,
"accesslog": "-",
}
for name, func in template_functions.items(): for name, func in template_functions.items():
self.register_template_function(name, func) self.register_template_function(name, func)
super().__init__()
def run(self): for name, func in template_functions.items():
self.app.debug = self.debug self.register_template_function(name, func)
self.app.run(host=self.host, port=self.port) super(Server, self).__init__()
def register_template_function(self, name, func): def register_template_function(self, name, func):
self.app.jinja_env.globals.update({name: func}) self.app.jinja_env.globals.update({name: func})
def load_config(self):
config = {
key: value
for key, value in self.options.items()
if key in self.cfg.settings and value is not None
}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
def register_route(self, route, func, defaults=None): def register_route(self, route, func, defaults=None):
self.app.add_url_rule(route, func.__name__, func, defaults=defaults) self.app.add_url_rule(route, func.__name__, func, defaults=defaults)