202 lines
7.5 KiB
Python
202 lines
7.5 KiB
Python
from flask import Flask, render_template, request, jsonify, send_file
|
|
import os
|
|
import time
|
|
import tempfile
|
|
import zipfile
|
|
import io
|
|
from werkzeug.utils import secure_filename
|
|
from req_generator import (
|
|
get_all_imports, get_stdlib_modules, get_import_to_pkg_mapping,
|
|
resolve_package_name, get_package_releases, find_compatible_version,
|
|
get_file_dates, get_file_min_version
|
|
)
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
|
|
# 1.file date is like right now, how can we preserve the uploaded file mod
|
|
# date? Or change to use file creation date?
|
|
# 2.it would be better to have a local pypi repository (bandersnatch?)
|
|
# to reduce chances of getting too many request errors while scraping
|
|
# OR just scrape every so often and build a list of packages and their
|
|
# versions, and do a cron job to refresh it every week or so.
|
|
|
|
app = Flask(__name__)
|
|
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10MB max file size
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
os.makedirs('logs', exist_ok=True)
|
|
file_handler = RotatingFileHandler('logs/app.log', maxBytes=1024*1024, backupCount=5)
|
|
file_handler.setLevel(logging.INFO)
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
|
|
ALLOWED_EXTENSIONS = {'py', 'zip'}
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
def process_python_file(file_path):
|
|
'''
|
|
Tries to get requirements and python version for this file
|
|
'''
|
|
# Get all imports
|
|
imports = get_all_imports(file_path)
|
|
|
|
if not imports:
|
|
return {"error": "No imports found."}
|
|
|
|
# Filter out stdlib modules
|
|
stdlib_modules = get_stdlib_modules()
|
|
imports = {imp for imp in imports if imp not in stdlib_modules}
|
|
|
|
if not imports:
|
|
return {"error": "No non-stdlib imports found."}
|
|
interpreter_version = get_file_min_version(file_path=file_path)
|
|
|
|
# Get import to package mapping
|
|
import_mapping = get_import_to_pkg_mapping()
|
|
|
|
# Get dates
|
|
file_date, git_date = get_file_dates(file_path)
|
|
reference_date = git_date if git_date else file_date
|
|
|
|
# Process each import
|
|
requirements = []
|
|
unresolved_imports = []
|
|
|
|
for import_name in sorted(imports):
|
|
# Try to resolve package name
|
|
package_name = resolve_package_name(import_name, import_mapping)
|
|
|
|
if not package_name:
|
|
unresolved_imports.append(import_name)
|
|
continue
|
|
|
|
releases = get_package_releases(package_name)
|
|
|
|
if not releases:
|
|
continue
|
|
|
|
version = find_compatible_version(releases, reference_date)
|
|
if version:
|
|
req_line = f"{package_name}=={version} # https://pypi.org/project/{package_name}/"
|
|
requirements.append(req_line)
|
|
|
|
return {
|
|
"requirements": requirements,
|
|
"unresolved": unresolved_imports,
|
|
"reference_date": reference_date.isoformat(),
|
|
"interpreter_version": interpreter_version
|
|
}
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/analyze', methods=['POST'])
|
|
def analyze():
|
|
if 'file' not in request.files:
|
|
return jsonify({"error": "No file provided"}), 400
|
|
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
return jsonify({"error": "No file selected"}), 400
|
|
|
|
if not allowed_file(file.filename):
|
|
return jsonify({"error": "Invalid file type"}), 400
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
if file.filename.endswith('.zip'):
|
|
scanned_files = []
|
|
# Handle zip file, preserving original file dates from the archive
|
|
zip_path = os.path.join(temp_dir, 'upload.zip')
|
|
file.save(zip_path)
|
|
|
|
requirements = []
|
|
unresolved = set()
|
|
min_py_vers = []
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(temp_dir)
|
|
|
|
# Get list of all files in zip archive
|
|
zip_info_list = zip_ref.filelist
|
|
|
|
# Create mapping of filenames to their dates from the archive
|
|
file_dates = {
|
|
os.path.join(temp_dir, info.filename): info.date_time
|
|
for info in zip_info_list
|
|
}
|
|
|
|
for root, _, files in os.walk(temp_dir):
|
|
for filename in files:
|
|
if filename.endswith('.py'):
|
|
file_path = os.path.join(root, filename)
|
|
# Set the file's modification time to match the zip archive
|
|
if file_path in file_dates:
|
|
dt = file_dates[file_path]
|
|
timestamp = time.mktime(dt + (0, 0, 0)) # Add hours, minutes, seconds
|
|
os.utime(file_path, (timestamp, timestamp))
|
|
result = process_python_file(file_path)
|
|
scanned_files.append(file_path + " | Vermin reported version: " + result["interpreter_version"])
|
|
if "error" not in result:
|
|
requirements.extend(result["requirements"])
|
|
unresolved.update(result["unresolved"])
|
|
min_py_vers.append(result["interpreter_version"])
|
|
|
|
|
|
if not requirements and not unresolved:
|
|
return jsonify({"error": "No Python files found in zip"}), 400
|
|
logging.info(f"Analyzed {len(scanned_files)} with {len(requirements)} requirements. Unresolved: {unresolved}")
|
|
return jsonify({
|
|
"requirements": sorted(set(requirements)),
|
|
"unresolved": sorted(list(unresolved)),
|
|
"file_list": sorted(list(scanned_files)),
|
|
#"interpreter_version": " | ".join(min_py_vers)
|
|
})
|
|
|
|
else:
|
|
# Handle single Python file, we need to use our custom header
|
|
# X-Last-Modified-Date (UTC String Format)
|
|
file_path = os.path.join(temp_dir, secure_filename(file.filename))
|
|
file.save(file_path)
|
|
try:
|
|
lastModifiedStr = request.headers.get("X-Last-Modified-Date")
|
|
modification_time = time.strptime(lastModifiedStr, "%a, %d %b %Y %H:%M:%S GMT")
|
|
modification_timestamp = time.mktime(modification_time)
|
|
os.utime(file_path, (modification_timestamp, modification_timestamp))
|
|
except Exception as e:
|
|
logging.debug(e)
|
|
result = process_python_file(file_path)
|
|
|
|
if "error" in result:
|
|
return jsonify({"error": result["error"]}), 400
|
|
|
|
return jsonify(result)
|
|
|
|
@app.route('/download', methods=['POST'])
|
|
def download():
|
|
data = request.json
|
|
if not data or 'requirements' not in data:
|
|
return jsonify({"error": "No requirements provided"}), 400
|
|
|
|
content = "\n".join(data['requirements'])
|
|
if 'unresolved' in data and data['unresolved']:
|
|
content += "\n\n# Unresolved Requirements:\n"
|
|
content += "\n".join(f"#{imp}==???" for imp in data['unresolved'])
|
|
|
|
buffer = io.BytesIO()
|
|
buffer.write(content.encode('utf-8'))
|
|
buffer.seek(0)
|
|
|
|
return send_file(
|
|
buffer,
|
|
mimetype='text/plain',
|
|
as_attachment=True,
|
|
download_name='requirements.txt'
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|