# Standard library imports import atexit import csv import io import locale import logging import os import random import re import secrets import sqlite3 import time import traceback from datetime import datetime, timedelta from functools import wraps from io import BytesIO from time import sleep from urllib.parse import urlparse from zoneinfo import ZoneInfo import warnings # 3rd-Provider-Modules import pytz import requests from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler from dotenv import load_dotenv from flask import ( Flask, Markup, abort, flash, g, jsonify, make_response, redirect, render_template, request, send_file, session, url_for ) from flask_login import ( LoginManager, UserMixin, current_user, login_required, login_user, logout_user ) from flask_migrate import Migrate from flask_session import Session from flask_sqlalchemy import SQLAlchemy from flask_wtf import CSRFProtect, FlaskForm from redis import Redis from reportlab.lib import colors from reportlab.lib.pagesizes import A4, landscape, letter from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet from reportlab.lib.units import cm, inch, mm from reportlab.lib.utils import ImageReader from reportlab.pdfgen import canvas from reportlab.platypus import ( Image, Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle ) from sqlalchemy import MetaData, UniqueConstraint, event from sqlalchemy.engine import Engine from sqlalchemy.exc import IntegrityError, LegacyAPIWarning from sqlalchemy.orm import joinedload from werkzeug.security import check_password_hash, generate_password_hash from wtforms import SelectField, StringField, TextAreaField, validators # Config load_dotenv(override=True) warnings.simplefilter("ignore", category=LegacyAPIWarning) # Logging-Config logging.basicConfig(level=logging.INFO) logging.getLogger('apscheduler').setLevel(logging.WARNING) @event.listens_for(Engine, "connect") def enable_foreign_keys(dbapi_connection, connection_record): if isinstance(dbapi_connection, sqlite3.Connection): cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON;") cursor.close() ITAD_API_KEY_PLACEHOLDER = "your_api_key_here" TZ = os.getenv('TZ', 'UTC') os.environ['TZ'] = TZ app = Flask(__name__) app.jinja_env.globals['getattr'] = getattr @app.errorhandler(404) def not_found_error(error): return render_template('404.html'), 404 # UNIX-Systems (Linux, Docker) try: time.tzset() except AttributeError: pass # tzset not availabe on Windows local_tz = pytz.timezone(TZ) # Load Languages import os import json TRANSLATION_DIR = os.path.join(os.getcwd(), 'translations') SUPPORTED_LANGUAGES = ['de', 'en'] TRANSLATIONS = {} for lang in SUPPORTED_LANGUAGES: try: with open(os.path.join(TRANSLATION_DIR, f'{lang}.json'), encoding='utf-8') as f: TRANSLATIONS[lang] = json.load(f) print(f"✅ Loaded {lang} translations") except Exception: print(f"❌ Failed loading {lang}.json: {str(e)}") TRANSLATIONS[lang] = {} def translate(key, lang=None, **kwargs): lang = lang or session.get('lang', 'en') fallback_lang = app.config.get('DEFAULT_LANGUAGE', 'en') translations = TRANSLATIONS.get(lang, {}) fallback_translations = TRANSLATIONS.get(fallback_lang, {}) value = translations.get(key) or fallback_translations.get(key) or key return value.format(**kwargs) if isinstance(value, str) else value ## DEBUG Translations if app.debug: print(f"Loaded translations for 'de': {TRANSLATIONS.get('de', {})}") ### Admin decorator def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: abort(403) if not current_user.is_admin: abort(403) return f(*args, **kwargs) return decorated_function csrf = CSRFProtect(app) convention = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "pk": "pk_%(table_name)s" } metadata = MetaData(naming_convention=convention) load_dotenv(override=True) # load variables from .env with override load_dotenv(override=True) # App-Configuration app.config.update( # Most Important SECRET_KEY=os.getenv('SECRET_KEY'), SQLALCHEMY_DATABASE_URI = 'sqlite:////app/data/games.db', SQLALCHEMY_TRACK_MODIFICATIONS = False, DEFAULT_LANGUAGE='en', ITAD_COUNTRY = os.getenv("ITAD_COUNTRY", "DE"), # SESSION-HANDLING (In Production: Use Redis!) SESSION_TYPE='redis', SESSION_PERMANENT = False, SESSION_USE_SIGNER = True, SESSION_REDIS=Redis.from_url(os.getenv("REDIS_URL", "redis://redis:6379/0")), SESSION_FILE_DIR = '/app/data/flask-sessions', SESSION_COOKIE_NAME = 'gamekeys_session', SESSION_COOKIE_SECURE = os.getenv('SESSION_COOKIE_SECURE', 'False').lower() == 'true', SESSION_COOKIE_HTTPONLY = True, SESSION_COOKIE_SAMESITE = 'Lax', PERMANENT_SESSION_LIFETIME = timedelta(days=30), # LOGIN COOKIE STUFF REMEMBER_COOKIE_DURATION=timedelta(days=30), REMEMBER_COOKIE_HTTPONLY=True, REMEMBER_COOKIE_SECURE=True if os.getenv('FORCE_HTTPS', 'False').lower() == 'true' else False, REMEMBER_COOKIE_SAMESITE='Lax', # CSRF-PROTECTION WTF_CSRF_ENABLED = True, WTF_CSRF_SECRET_KEY = os.getenv('CSRF_SECRET_KEY', os.urandom(32).hex()), WTF_CSRF_TIME_LIMIT = 3600, # SECURITYsa & PERFORMANCE REGISTRATION_ENABLED = os.getenv('REGISTRATION_ENABLED', 'True').lower() == 'true', SEND_FILE_MAX_AGE_DEFAULT = int(os.getenv('SEND_FILE_MAX_AGE_DEFAULT', 0)), TEMPLATES_AUTO_RELOAD = os.getenv('TEMPLATES_AUTO_RELOAD', 'True').lower() == 'true', PREFERRED_URL_SCHEME = 'https' if os.getenv('FORCE_HTTPS') else 'http' ) Session(app) interval_hours = int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12)) # Init db = SQLAlchemy(app, metadata=metadata) migrate = Migrate(app, db) login_manager = LoginManager(app) login_manager.login_view = 'login' # Logging app.logger.addHandler(logging.StreamHandler()) app.logger.setLevel(logging.DEBUG) @app.errorhandler(403) def forbidden_error(error): return render_template('403.html'), 403 @app.before_request def set_language(): if 'lang' not in session or not session['lang']: session['lang'] = app.config.get('DEFAULT_LANGUAGE', 'en') g.lang = session['lang'] def enforce_https(): if os.getenv('FORCE_HTTPS', 'False').lower() == 'true' and not app.debug: proto = request.headers.get('X-Forwarded-Proto', 'http') if proto != 'https' and not request.is_secure: url = request.url.replace('http://', 'https://', 1) return redirect(url, code=301) def debug_translations(): if app.debug: app.logger.debug(f"Lang: {session.get('lang')}") app.before_request(enforce_https) @app.context_processor def inject_template_globals(): return { '_': lambda key, **kwargs: translate(key, lang=session.get('lang', 'en'), **kwargs), 'now': datetime.now(local_tz), 'app_version': os.getenv('APP_VERSION', '1.0.0'), 'local_tz': local_tz } @app.template_filter('strftime') def _jinja2_filter_datetime(date, fmt='%d.%m.%Y'): if date is None: return '' return date.strftime(fmt) @app.errorhandler(403) def forbidden(e): return render_template('403.html'), 403 # DB Models class ActivityLog(db.Model): __tablename__ = 'activity_logs' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('users.id')) action = db.Column(db.String(100), nullable=False) details = db.Column(db.Text) timestamp = db.Column(db.DateTime, default=lambda: datetime.now(local_tz)) user = db.relationship('User', backref='activities') class User(UserMixin, db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password = db.Column(db.String(256), nullable=False) is_admin = db.Column(db.Boolean, default=False) games = db.relationship( 'Game', back_populates='owner', cascade='all, delete-orphan', passive_deletes=True ) class Game(db.Model): __tablename__ = 'games' __table_args__ = ( UniqueConstraint('steam_key', 'user_id', name='uq_steam_key_user'), ) id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) steam_key = db.Column(db.String(100), nullable=False, unique=True) status = db.Column(db.String(50), nullable=False) recipient = db.Column(db.String(100)) notes = db.Column(db.Text) url = db.Column(db.String(200)) created_at = db.Column(db.DateTime, default=lambda: datetime.now(local_tz)) redeem_date = db.Column(db.DateTime) steam_appid = db.Column(db.String(20)) platform = db.Column(db.String(50), default='pc') current_price = db.Column(db.Float) current_price_shop = db.Column(db.String(100)) historical_low = db.Column(db.Float) release_date = db.Column(db.DateTime) release_date = db.Column(db.DateTime) itad_slug = db.Column(db.String(200)) steam_description_en = db.Column(db.Text) steam_description_de = db.Column(db.Text) # with users.id user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False) owner = db.relationship( 'User', back_populates='games' ) redeem_tokens = db.relationship( 'RedeemToken', back_populates='game', cascade='all, delete-orphan', passive_deletes=True ) class RedeemToken(db.Model): __tablename__ = 'redeem_tokens' id = db.Column(db.Integer, primary_key=True) token = db.Column(db.String(17), unique=True, nullable=False) expires = db.Column(db.DateTime(timezone=True), nullable=False) total_hours = db.Column(db.Integer, nullable=False) # ForeignKey with CASCADE game_id = db.Column( db.Integer, db.ForeignKey('games.id', ondelete='CASCADE'), nullable=False ) game = db.relationship('Game', back_populates='redeem_tokens') def is_expired(self): # use timeszone (from .env) local_tz = pytz.timezone(os.getenv('TZ', 'UTC')) now = datetime.now(local_tz) return now > self.expires.astimezone(local_tz) class GameForm(FlaskForm): name = StringField('Name', [validators.DataRequired()]) steam_key = StringField('Steam Key') status = SelectField('Status', choices=[ ('nicht eingelöst', 'Nicht eingelöst'), ('eingelöst', 'Eingelöst'), ('geschenkt', 'Geschenkt') ]) recipient = StringField('Empfänger') notes = TextAreaField('Notizen') url = StringField('Store URL') redeem_date = StringField('Einlösedatum') steam_appid = StringField('Steam App ID') PLATFORM_CHOICES = [ ('steam', 'Steam'), ('gog', 'GOG'), ('xbox', 'XBox'), ('playstation', 'PlayStation'), ('switch', 'Nintendo Switch'), ('other', 'Other'), ('pc', 'PC') ] STATUS_CHOICES = [ ('nicht eingelöst', 'Nicht eingelöst'), ('eingelöst', 'Eingelöst'), ('geschenkt', 'Geschenkt') ] with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return db.session.get(User, int(user_id)) def extract_steam_appid(url): match = re.search(r'store\.steampowered\.com/app/(\d+)', url or '') return match.group(1) if match else '' # 404 def get_or_404(model, id): instance = db.session.get(model, id) if not instance: abort(404) return instance # Admin Audit Helper def log_activity(user_id, action, details=None): """ Store an activity log entry for auditing purposes. """ log = ActivityLog( user_id=user_id, action=action, details=details ) db.session.add(log) db.session.commit() # Game Infos Helper def fetch_steam_data(appid, lang='en'): lang_map = { 'en': 'english', 'de': 'german' } steam_lang = lang_map.get(lang, 'english') try: response = requests.get( "https://store.steampowered.com/api/appdetails", params={"appids": appid, "l": steam_lang}, timeout=15 ) data = response.json().get(str(appid), {}) if data.get("success"): return { "name": data["data"].get("name"), "detailed_description": data["data"].get("detailed_description"), "release_date": data["data"].get("release_date", {}).get("date"), } except Exception as e: app.logger.error(f"Steam API error: {str(e)}") return None def parse_steam_release_date(date_str): """Parsing Steam-Release-Date (the german us thingy, you know)""" import locale from datetime import datetime # try german format try: locale.setlocale(locale.LC_TIME, "de_DE.UTF-8") return datetime.strptime(date_str, "%d. %b. %Y") except Exception: pass # Fallback: okay lets try the english one try: locale.setlocale(locale.LC_TIME, "en_US.UTF-8") return datetime.strptime(date_str, "%d %b, %Y") except Exception: pass return None def fetch_itad_slug(steam_appid: int) -> str | None: api_key = os.getenv("ITAD_API_KEY") if not api_key or api_key.strip() == "your-secret-key-here": app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.") return None try: response = requests.get( "https://api.isthereanydeal.com/games/lookup/v1", params={"key": api_key, "appid": steam_appid, "platform": "steam"}, timeout=10 ) data = response.json() return data.get("game", {}).get("slug") except Exception as e: app.logger.error(f"ITAD Error: {str(e)}") return None def fetch_itad_game_id(steam_appid: int) -> str | None: api_key = os.getenv("ITAD_API_KEY") if not api_key or api_key.strip() == "your-secret-key-here": app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.") return None try: response = requests.get( "https://api.isthereanydeal.com/games/lookup/v1", params={"key": api_key, "appid": steam_appid, "platform": "steam"}, timeout=10 ) response.raise_for_status() data = response.json() if data.get("found") and data.get("game") and data["game"].get("id"): return data["game"]["id"] app.logger.error(f"ITAD Response Error: {data}") return None except Exception as e: app.logger.error(f"ITAD Error: {str(e)}") return None def fetch_itad_prices(game_id: str) -> dict | None: api_key = os.getenv("ITAD_API_KEY") country = os.getenv("ITAD_COUNTRY", "DE") if not api_key or api_key.strip() == "your-secret-key-here": app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.") return None try: response = requests.post( "https://api.isthereanydeal.com/games/prices/v3", params={ "key": api_key, "country": country, "shops": "steam", "vouchers": "false" }, json=[game_id], headers={"Content-Type": "application/json"}, timeout=15 ) response.raise_for_status() return response.json()[0] except Exception as e: app.logger.error(f"ITAD-Preisabfrage fehlgeschlagen: {str(e)}") return None @app.route('/') @login_required def index(): search_query = request.args.get('q', '') query = Game.query.filter_by(user_id=current_user.id) if search_query: query = query.filter(Game.name.ilike(f'%{search_query}%')) games = query.order_by(Game.created_at.desc()).all() return render_template('index.html', games=games, format_date=lambda dt: dt.strftime('%d.%m.%Y') if dt else '', search_query=search_query) @app.route('/set-lang/') def set_lang(lang): if lang in SUPPORTED_LANGUAGES: session['lang'] = lang session.permanent = True return redirect(request.referrer or url_for('index')) @app.route('/set-theme/') def set_theme(theme): resp = make_response('', 204) resp.set_cookie('theme', theme, max_age=60*60*24*365) return resp @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: # Prevent already logged-in users from accessing login page return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') remember = request.form.get('remember_me') == 'true' user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): # Pass remember=True to login_user and set duration # The duration will be taken from app.config['REMEMBER_COOKIE_DURATION'] login_user(user, remember=remember) # Log activity log_activity(user.id, 'user_login', f"User '{user.username}' logged in.") next_page = request.args.get('next') # Add security check for next_page to prevent open redirect if not next_page or urlparse(next_page).netloc != '': next_page = url_for('index') flash(translate('Logged in successfully.'), 'success') return redirect(next_page) else: flash(translate('Invalid username or password.'), 'danger') return render_template('login.html') @app.route('/register', methods=['GET', 'POST']) def register(): if not app.config['REGISTRATION_ENABLED']: abort(403) if request.method == 'POST': username = request.form['username'] password = request.form['password'] existing_user = User.query.filter_by(username=username).first() if existing_user: flash(translate('Username already exists'), 'error') return redirect(url_for('register')) # make the first user admin is_admin = User.query.count() == 0 new_user = User( username=username, password=generate_password_hash(password), is_admin=is_admin ) db.session.add(new_user) db.session.commit() login_user(new_user) flash(translate('Registration successful'), 'success') return redirect(url_for('index')) return render_template('register.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) @app.route('/change-password', methods=['GET', 'POST']) @login_required def change_password(): if request.method == 'POST': current_password = request.form['current_password'] new_password = request.form['new_password'] confirm_password = request.form['confirm_password'] if not check_password_hash(current_user.password, current_password): flash(translate('Current passwort is wrong'), 'danger') return redirect(url_for('change_password')) if new_password != confirm_password: flash(translate('New Passwords are not matching'), 'danger') return redirect(url_for('change_password')) current_user.password = generate_password_hash(new_password) db.session.commit() flash(translate('Password changed successfully', session.get('lang', 'en')), 'success') return redirect(url_for('index')) return render_template('change_password.html') @app.route('/add', methods=['GET', 'POST']) @login_required def add_game(): if request.method == 'POST': try: url = request.form.get('url', '') steam_appid = request.form.get('steam_appid', '').strip() if not steam_appid: steam_appid = extract_steam_appid(url) steam_key = request.form['steam_key'] if Game.query.filter_by(steam_key=steam_key).first(): flash(translate('Steam Key already exists!'), 'error') return redirect(url_for('add_game')) new_game = Game( name=request.form['name'], steam_key=steam_key, status=request.form['status'], recipient=request.form.get('recipient', ''), notes=request.form.get('notes', ''), url=url, steam_appid=steam_appid, redeem_date=datetime.strptime(request.form['redeem_date'], '%Y-%m-%d') if request.form['redeem_date'] else None, user_id=current_user.id ) db.session.add(new_game) db.session.commit() flash(translate('Game added successfully!'), 'success') return redirect(url_for('index')) except IntegrityError as e: db.session.rollback() if "UNIQUE constraint failed: game.steam_key" in str(e): flash(translate('Steam Key already exists!'), 'error') else: flash(translate('Database error: %(error)s', error=str(e)), 'error') except Exception as e: db.session.rollback() flash(translate('Error: %(error)s', error=str(e)), 'error') return render_template( 'add_game.html', platforms=PLATFORM_CHOICES, statuses=STATUS_CHOICES ) @app.route('/edit/', methods=['GET', 'POST']) @login_required def edit_game(game_id): # Eager Loading für Tokens game = Game.query.options(joinedload(Game.redeem_tokens)).get_or_404(game_id) def safe_parse_date(date_str): try: naive = datetime.strptime(date_str, '%Y-%m-%d') if date_str else None return local_tz.localize(naive) if naive else None except ValueError: return None if request.method == 'POST': try: # Validation if not request.form.get('name') or not request.form.get('steam_key'): flash(translate('Name and Steam Key are required'), 'error') return redirect(url_for('edit_game', game_id=game_id)) # Duplicate check existing = Game.query.filter( Game.steam_key == request.form['steam_key'], Game.id != game.id, Game.user_id == current_user.id ).first() if existing: flash(translate('Steam Key already exists'), 'error') return redirect(url_for('edit_game', game_id=game_id)) # Update fields game.name = request.form['name'] game.steam_key = request.form['steam_key'] game.status = request.form['status'] game.platform = request.form.get('platform', 'pc') game.recipient = request.form.get('recipient', '') game.notes = request.form.get('notes', '') game.url = request.form.get('url', '') game.steam_appid = request.form.get('steam_appid', '') game.redeem_date = safe_parse_date(request.form.get('redeem_date', '')) # Token-Logic if game.status == 'geschenkt': # Vorhandene Tokens löschen RedeemToken.query.filter_by(game_id=game.id).delete() # Generate new Token token = secrets.token_urlsafe(12)[:17] expires = datetime.now(local_tz) + timedelta(hours=24) new_token = RedeemToken( token=token, game_id=game.id, expires=expires, total_hours=24 ) db.session.add(new_token) db.session.commit() flash(translate('Changes saved successfully'), 'success') return redirect(url_for('index')) except IntegrityError as e: db.session.rollback() app.logger.error(f"IntegrityError: {traceback.format_exc()}") flash(translate('Database error: {error}', error=str(e.orig)), 'error') except Exception as e: db.session.rollback() app.logger.error(f"Unexpected error: {traceback.format_exc()}") flash(translate('Unexpected error: {error}', error=str(e)), 'error') return render_template( 'edit_game.html', game=game, platforms=PLATFORM_CHOICES, statuses=STATUS_CHOICES, redeem_date=game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '' ) @app.route('/delete/', methods=['POST']) @login_required def delete_game(game_id): game = Game.query.get_or_404(game_id) db.session.delete(game) db.session.commit() flash(translate('Game deleted successfully'), 'success') return redirect(url_for('index')) @app.route('/export', methods=['GET']) @login_required def export_games(): games = Game.query.filter_by(user_id=current_user.id).all() output = io.StringIO() writer = csv.writer(output) writer.writerow(['Name', 'Steam Key', 'Status', 'Recipient', 'Notes', 'URL', 'Created', 'Redeem by', 'Steam AppID']) for game in games: writer.writerow([ game.name, game.steam_key, game.status, game.recipient, game.notes, game.url, game.created_at.strftime('%Y-%m-%d %H:%M:%S') if game.created_at else '', game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '', game.steam_appid ]) output.seek(0) return send_file( io.BytesIO(output.getvalue().encode('utf-8')), mimetype='text/csv', as_attachment=True, download_name='games_export.csv' ) @app.route('/export_pdf') @login_required def export_pdf(): excluded_statuses = ['eingelöst', 'verschenkt'] games = Game.query.filter( Game.user_id == current_user.id, Game.status.notin_(excluded_statuses) ).order_by(Game.created_at.desc()).all() buffer = io.BytesIO() doc = SimpleDocTemplate(buffer, pagesize=landscape(A4), leftMargin=40, rightMargin=40, topMargin=40, bottomMargin=40 ) styles = getSampleStyleSheet() elements = [] img_height = 2*cm # Title elements.append(Paragraph( translate("Game List (without Keys)", lang=session.get('lang', 'en')), styles['Title'] )) elements.append(Spacer(1, 12)) # Table header col_widths = [ 5*cm, 10*cm, 6*cm, 3*cm ] data = [[ Paragraph('Cover', styles['Normal']), Paragraph('Name', styles['Normal']), Paragraph('Shop-Link', styles['Normal']), Paragraph('Einlösen bis', styles['Normal']) ]] for game in games: img = None if game.steam_appid: try: img_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{game.steam_appid}/header.jpg" img_data = io.BytesIO(requests.get(img_url, timeout=5).content) img = Image(img_data, width=3*cm, height=img_height) except Exception: img = Paragraph('', styles['Normal']) elif game.url and 'gog.com' in game.url: try: img_path = os.path.join(app.root_path, 'static', 'gog_logo.webp') img = Image(img_path, width=3*cm, height=img_height) except Exception: img = Paragraph('', styles['Normal']) data.append([ img or '', Paragraph(game.name, styles['Normal']), Paragraph(game.url or '', styles['Normal']), game.redeem_date.strftime('%d.%m.%y') if game.redeem_date else '' ]) # Table format table = Table(data, colWidths=col_widths, repeatRows=1) table.setStyle(TableStyle([ ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'), ('FONTSIZE', (0,0), (-1,0), 8), ('FONTSIZE', (0,1), (-1,-1), 8), ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), ('ALIGN', (0,0), (-1,-1), 'LEFT'), ('GRID', (0,0), (-1,-1), 0.5, colors.lightgrey), ('WORDWRAP', (1,1), (1,-1), 'CJK'), ])) elements.append(table) doc.build(elements) buffer.seek(0) return send_file( buffer, mimetype='application/pdf', as_attachment=True, download_name=f'game_export_{datetime.now().strftime("%Y%m%d")}.pdf' ) @app.route('/import', methods=['GET', 'POST']) @login_required def import_games(): if request.method == 'POST': file = request.files.get('file') if file and file.filename.endswith('.csv'): stream = io.StringIO(file.stream.read().decode("UTF8")) reader = csv.DictReader(stream) new_games = 0 duplicates = 0 try: with db.session.begin_nested(): for row in reader: steam_key = row['Steam Key'].strip() if Game.query.filter_by(steam_key=steam_key).first(): duplicates += 1 continue game = Game( name=row['Name'], steam_key=steam_key, status=row['Status'], recipient=row.get('Recipient', ''), notes=row.get('Notes', ''), url=row.get('URL', ''), created_at=datetime.strptime(row['Created'], '%Y-%m-%d %H:%M:%S') if row.get('Created') else datetime.utcnow(), redeem_date=datetime.strptime(row['Redeem by'], '%Y-%m-%d') if row.get('Redeem by') else None, steam_appid=row.get('Steam AppID', ''), user_id=current_user.id ) db.session.add(game) new_games += 1 db.session.commit() flash(translate("new_games_imported", new=new_games, dup=duplicates), 'success') except Exception as e: db.session.rollback() flash(translate('Import error: {error}', error=str(e)), 'danger') return redirect(url_for('index')) flash(translate('Please upload a valid CSV file.'), 'danger') return render_template('import.html') @app.route('/generate_redeem/', methods=['POST']) @login_required def generate_redeem(game_id): game = Game.query.get_or_404(game_id) if game.user_id != current_user.id or game.status != 'geschenkt': return jsonify({'error': translate('Forbidden')}), 403 try: RedeemToken.query.filter_by(game_id=game_id).delete() token = secrets.token_urlsafe(12)[:17] expires = datetime.now(local_tz) + timedelta(hours=24) new_token = RedeemToken( token=token, game_id=game_id, expires=expires, total_hours=24 ) db.session.add(new_token) db.session.commit() redeem_url = url_for('redeem', token=token, _external=True) message = translate( 'Redeem link generated: {url}', url=redeem_url ) return jsonify({'url': redeem_url, 'message': message}) except Exception as e: db.session.rollback() return jsonify({'error': str(e)}), 500 @app.route('/redeem/', endpoint='redeem') def redeem_page(token): redeem_token = RedeemToken.query.filter_by(token=token).first() if not redeem_token: abort(404) expires_utc = redeem_token.expires.astimezone(pytz.UTC) if datetime.now(pytz.UTC) > expires_utc: db.session.delete(redeem_token) db.session.commit() abort(404) game = Game.query.get(redeem_token.game_id) redeem_token.used = True db.session.commit() # which Plattform if game.platform == "steam" or game.steam_appid: platform_link = 'https://store.steampowered.com/account/registerkey?key=' platform_label = "Steam" elif game.platform == "gog": platform_link = 'https://www.gog.com/redeem/' platform_label = "GOG" elif game.platform == "xbox": platform_link = 'https://redeem.microsoft.com/' platform_label = "XBOX" elif game.platform == "playstation": platform_link = 'https://store.playstation.com/redeem' platform_label = "PlayStation" else: platform_link = '#' platform_label = game.platform.capitalize() if game.platform else "Unknown" return render_template( 'redeem.html', game=game, redeem_token=redeem_token, expires_timestamp=int(expires_utc.timestamp() * 1000), platform_link=platform_link, platform_label=platform_label ) @app.route('/admin/users') @login_required @admin_required def admin_users(): users = User.query.all() return render_template('admin_users.html', users=users) @app.route('/admin/users/delete/', methods=['POST']) @login_required @admin_required def admin_delete_user(user_id): if current_user.id == user_id: flash(translate('You cannot delete yourself'), 'error') return redirect(url_for('admin_users')) user = User.query.get_or_404(user_id) db.session.delete(user) db.session.commit() log_activity( current_user.id, 'user_deleted', f"Deleted user: {user.username} (ID: {user.id})" ) flash(translate('User deleted successfully'), 'success') return redirect(url_for('admin_users')) @app.route('/admin/users/reset_password/', methods=['POST']) @login_required @admin_required def admin_reset_password(user_id): user = User.query.get_or_404(user_id) new_password = secrets.token_urlsafe(8) user.password = generate_password_hash(new_password) db.session.commit() log_activity( current_user.id, 'user_newpassword', f"New password for user: {user.username} (ID: {user.id})" ) flash( translate('New password for {username}: {password}', username=user.username, password=new_password), 'info' ) return redirect(url_for('admin_users')) @app.route('/admin/audit-logs') @login_required @admin_required def admin_audit_logs(): page = request.args.get('page', 1, type=int) logs = ActivityLog.query.order_by(ActivityLog.timestamp.desc()).paginate(page=page, per_page=20) return render_template('admin_audit_logs.html', logs=logs) @app.route('/game//update', methods=['POST']) @login_required def update_game_data(game_id): game = Game.query.get_or_404(game_id) # 1. Getting Steam AppID steam_appid = request.form.get('steam_appid', '').strip() app.logger.info(f"🚀 Update gestartet für Game {game_id} mit AppID: {steam_appid}") # 2. Steam-Data (Multilingual) if steam_appid: try: app.logger.debug(f"🔍 Fetching Steam data for AppID: {steam_appid}") for lang in ['en', 'de']: steam_data = fetch_steam_data(steam_appid, lang=lang) if steam_data: if lang == 'en' and steam_data.get("name"): game.name = steam_data.get("name", game.name) setattr(game, f'steam_description_{lang}', steam_data.get("detailed_description") or "No Infos available") if lang == 'en': date_str = steam_data.get("release_date", {}) if date_str: parsed_date = parse_steam_release_date(date_str) if parsed_date: game.release_date = local_tz.localize(parsed_date) else: app.logger.warning(f"Could not parse Steam release date: {date_str}") app.logger.info("✅ Steam data successfully updated") except Exception as e: app.logger.error(f"💥 Kritischer Steam-Fehler: {str(e)}", exc_info=True) flash(translate('Error during Steam query'), 'danger') else: app.logger.warning("⚠️ Keine Steam-AppID vorhanden, Steam-Daten werden nicht aktualisiert") flash(translate('Steam-AppID missing, no Steam Data transferred'), 'warning') # ITAD-Slug doings and such itad_slug = fetch_itad_slug(steam_appid) if itad_slug: game.itad_slug = itad_slug # 4. ITAD-Prices price_data = None if steam_appid: try: app.logger.debug("🔄 Starte ITAD-Abfrage...") game.itad_game_id = fetch_itad_game_id(steam_appid) if game.itad_game_id: app.logger.info(f"🔑 ITAD Game ID: {game.itad_game_id}") price_data = fetch_itad_prices(game.itad_game_id) if price_data: # Best price right now all_deals = price_data.get("deals", []) if all_deals: best_deal = min( all_deals, key=lambda deal: deal.get("price", {}).get("amount", float('inf')) ) game.current_price = best_deal.get("price", {}).get("amount") game.current_price_shop = best_deal.get("shop", {}).get("name") app.logger.info(f"💶 Current Best: {game.current_price}€ at {game.current_price_shop}") else: game.current_price = None game.current_price_shop = None app.logger.info(f"💶 Current Best: {game.current_price}€") game.historical_low = price_data.get("historyLow", {}).get("all", {}).get("amount") app.logger.info(f"📉 Historical Low: {game.historical_low}€") else: app.logger.warning("⚠️ Keine ITAD-Preisdaten erhalten") else: app.logger.warning("⚠️ Keine ITAD Game ID erhalten") except Exception as e: app.logger.error(f"💥 ITAD-API-Fehler: {str(e)}", exc_info=True) flash(translate('Fehler bei Preisabfrage'), 'danger') try: db.session.commit() flash(translate('Externe Daten erfolgreich aktualisiert!'), 'success') app.logger.info("💾 Datenbank-Update erfolgreich") except Exception as e: db.session.rollback() app.logger.error(f"💥 Datenbank-Fehler: {str(e)}", exc_info=True) flash(translate('Fehler beim Speichern der Daten'), 'danger') return redirect(url_for('edit_game', game_id=game_id)) @app.route('/game/') @login_required def game_details(game_id): game = Game.query.get_or_404(game_id) return render_template('game_details.html', game=game) @app.route('/debug-session') def debug_session(): return jsonify(dict(session)) # Apprise Notifications import apprise def send_apprise_notification(user, game): apprise_urls = os.getenv('APPRISE_URLS', '').strip() if not apprise_urls: app.logger.error("No APPRISE_URLS configured") return False apobj = apprise.Apprise() for url in apprise_urls.replace(',', '\n').splitlines(): if url.strip(): apobj.add(url.strip()) edit_url = url_for('edit_game', game_id=game.id, _external=True) result = apobj.notify( title="Steam-Key läuft ab!", body=f"Dein Key für '{game.name}' läuft in weniger als 48 Stunden ab!\n\nLink: {edit_url}", ) return result def send_notification(user, game): return send_apprise_notification(user, game) def check_expiring_keys(): now = datetime.now(local_tz) expiry_threshold = now + timedelta(hours=48) stmt = select(Game).where( Game.status != 'eingelöst', Game.redeem_date <= expiry_threshold, Game.redeem_date > now ) expiring_games = db.session.execute(stmt).scalars().all() for game in expiring_games: user = User.query.get(game.user_id) if user.notification_service and user.notification_service != 'none': send_notification(user, game) # Optional: cleaning up old tokens def cleanup_expired_tokens(): with app.app_context(): try: now = datetime.now(local_tz) expired = RedeemToken.query.filter(RedeemToken.expires < now).all() for token in expired: db.session.delete(token) db.session.commit() app.logger.info(f"Cleaned up {len(expired)} expired tokens.") except Exception as e: app.logger.error(f"Error during cleanup_expired_tokens: {e}") db.session.rollback() # Scheduler start scheduler = BackgroundScheduler(timezone=str(local_tz)) def check_expiring_keys_job(): with app.app_context(): check_expiring_keys() def cleanup_expired_tokens_job(): with app.app_context(): cleanup_expired_tokens() # Add Jobs scheduler.add_job( check_expiring_keys_job, 'interval', hours=int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12)), id='check_expiring_keys' ) scheduler.add_job( cleanup_expired_tokens_job, 'interval', hours=1, id='cleanup_expired_tokens' ) # price updates def update_prices_job(): with app.app_context(): games = Game.query.filter(Game.steam_appid.isnot(None)).all() for game in games: # just update prices itad_data = fetch_itad_data(f"app/{game.steam_appid}") if itad_data: game.current_price = itad_data.get('price_new') game.historical_low = itad_data.get('price_low', {}).get('amount') db.session.commit() scheduler.add_job( update_prices_job, 'interval', hours=12, id='update_prices' ) def update_missing_steam_descriptions_job(): with app.app_context(): games = Game.query.filter( (Game.steam_description_en == None) | (Game.steam_description_en == '') | (Game.steam_description_de == None) | (Game.steam_description_de == '') ).all() for game in games: for lang in ['en', 'de']: if not getattr(game, f'steam_description_{lang}', None): steam_data = fetch_steam_data(game.steam_appid, lang=lang) if steam_data: setattr(game, f'steam_description_{lang}', steam_data.get('detailed_description')) db.session.commit() scheduler.add_job( update_missing_steam_descriptions_job, 'interval', hours=24, id='update_missing_steam_descriptions' ) # start Scheduler scheduler.start() atexit.register(lambda: scheduler.shutdown(wait=False)) if __name__ == '__main__': with app.app_context(): db.create_all() app.run(debug=True, host='0.0.0.0', port=5000)