import os import logging import warnings from sqlalchemy.exc import LegacyAPIWarning warnings.simplefilter("ignore", category=LegacyAPIWarning) from flask import Flask, render_template, request, redirect, url_for, flash, make_response, session, abort, send_file, jsonify from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from flask_babel import Babel, _ from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime, timedelta from flask_wtf import CSRFProtect from flask import abort import io import warnings import re import io import csv import secrets import requests from dotenv import load_dotenv load_dotenv(override=True) from sqlalchemy.exc import IntegrityError from apscheduler.schedulers.background import BackgroundScheduler import atexit from flask_migrate import Migrate from sqlalchemy import MetaData from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import A4, landscape, letter from reportlab.platypus import ( SimpleDocTemplate, Table, TableStyle, Paragraph, Image, Spacer ) from reportlab.lib import colors from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.utils import ImageReader from reportlab.lib.units import cm, inch, mm from io import BytesIO import reportlab.lib app = Flask(__name__) csrf = CSRFProtect(app) convention = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "pk": "pk_%(table_name)s" } metadata = MetaData(naming_convention=convention) load_dotenv(override=True) # Lade Umgebungsvariablen aus .env mit override load_dotenv(override=True) # App-Configuration app.config.update( SECRET_KEY=os.getenv('SECRET_KEY'), SQLALCHEMY_DATABASE_URI=('sqlite:////app/data/games.db'), SQLALCHEMY_TRACK_MODIFICATIONS=False, BABEL_DEFAULT_LOCALE=os.getenv('BABEL_DEFAULT_LOCALE'), BABEL_SUPPORTED_LOCALES=os.getenv('BABEL_SUPPORTED_LOCALES').split(','), BABEL_TRANSLATION_DIRECTORIES=os.getenv('BABEL_TRANSLATION_DIRECTORIES'), SESSION_COOKIE_SECURE=os.getenv('SESSION_COOKIE_SECURE') == 'True', WTF_CSRF_ENABLED=os.getenv('CSRF_ENABLED') == 'True', REGISTRATION_ENABLED=os.getenv('REGISTRATION_ENABLED', 'True').lower() == 'true' ) interval_hours = int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12)) # Initialisation db = SQLAlchemy(app, metadata=metadata) migrate = Migrate(app, db) login_manager = LoginManager(app) login_manager.login_view = 'login' babel = Babel(app) # Logging app.logger.addHandler(logging.StreamHandler()) app.logger.setLevel(logging.INFO) @babel.localeselector def get_locale(): if 'lang' in session and session['lang'] in app.config['BABEL_SUPPORTED_LOCALES']: return session['lang'] return request.accept_languages.best_match(app.config['BABEL_SUPPORTED_LOCALES']) @app.context_processor def inject_template_vars(): return dict( get_locale=get_locale, theme='dark' if request.cookies.get('dark_mode') == 'true' else 'light' ) # DB Models class User(db.Model, UserMixin): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password = db.Column(db.String(256), nullable=False) games = db.relationship('Game', back_populates='owner', lazy=True) class Game(db.Model): id = db.Column(db.Integer, primary_key=True) owner = db.relationship('User', back_populates='games') name = db.Column(db.String(100), nullable=False) steam_key = db.Column(db.String(100), nullable=False, unique=True) status = db.Column(db.String(50), nullable=False) recipient = db.Column(db.String(100)) notes = db.Column(db.Text) url = db.Column(db.String(200)) created_at = db.Column(db.DateTime, default=datetime.utcnow) redeem_date = db.Column(db.DateTime) user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) steam_appid = db.Column(db.String(20)) class RedeemToken(db.Model): id = db.Column(db.Integer, primary_key=True) token = db.Column(db.String(17), unique=True, nullable=False) game_id = db.Column(db.Integer, db.ForeignKey('game.id'), nullable=False) expires = db.Column(db.DateTime, nullable=False) used = db.Column(db.Boolean, default=False) total_hours = db.Column(db.Integer, nullable=False) with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return db.session.get(User, int(user_id)) def extract_steam_appid(url): match = re.search(r'store\.steampowered\.com/app/(\d+)', url or '') return match.group(1) if match else '' # 404 def get_or_404(model, id): instance = db.session.get(model, id) if not instance: abort(404) return instance @app.route('/') @login_required def index(): search_query = request.args.get('q', '') query = Game.query.filter_by(user_id=current_user.id) if search_query: query = query.filter(Game.name.ilike(f'%{search_query}%')) games = query.order_by(Game.created_at.desc()).all() return render_template('index.html', games=games, format_date=lambda dt: dt.strftime('%d.%m.%Y') if dt else '', search_query=search_query) @app.route('/set-lang/') def set_lang(lang): if lang in app.config['BABEL_SUPPORTED_LOCALES']: session['lang'] = lang return redirect(request.referrer or url_for('index')) @app.route('/set-theme/') def set_theme(theme): resp = make_response('', 204) resp.set_cookie('dark_mode', 'true' if theme == 'dark' else 'false', max_age=60*60*24*365) return resp @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): login_user(user) return redirect(url_for('index')) flash(_('Invalid credentials'), 'danger') return render_template('login.html') @app.route('/register', methods=['GET', 'POST']) def register(): if not app.config['REGISTRATION_ENABLED']: flash(_('Registrierungen sind deaktiviert'), 'danger') return redirect(url_for('login')) if request.method == 'POST': username = request.form['username'] password = generate_password_hash(request.form['password']) if User.query.filter_by(username=username).first(): flash(_('Username already exists'), 'danger') return redirect(url_for('register')) new_user = User(username=username, password=password) db.session.add(new_user) db.session.commit() login_user(new_user) return redirect(url_for('index')) return render_template('register.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) @app.route('/change-password', methods=['GET', 'POST']) @login_required def change_password(): if request.method == 'POST': current_password = request.form['current_password'] new_password = request.form['new_password'] confirm_password = request.form['confirm_password'] if not check_password_hash(current_user.password, current_password): flash(_('Aktuelles Passwort ist falsch'), 'danger') return redirect(url_for('change_password')) if new_password != confirm_password: flash(_('Neue Passwörter stimmen nicht überein'), 'danger') return redirect(url_for('change_password')) current_user.password = generate_password_hash(new_password) db.session.commit() flash(_('Passwort erfolgreich geändert'), 'success') return redirect(url_for('index')) return render_template('change_password.html') @app.route('/add', methods=['GET', 'POST']) @login_required def add_game(): if request.method == 'POST': try: url = request.form.get('url', '') steam_appid = request.form.get('steam_appid', '').strip() if not steam_appid: steam_appid = extract_steam_appid(url) new_game = Game( name=request.form['name'], steam_key=request.form['steam_key'], status=request.form['status'], recipient=request.form.get('recipient', ''), notes=request.form.get('notes', ''), url=url, steam_appid=steam_appid, redeem_date=datetime.strptime(request.form['redeem_date'], '%Y-%m-%d') if request.form['redeem_date'] else None, user_id=current_user.id ) db.session.add(new_game) db.session.commit() flash(_('Game added successfully!'), 'success') return redirect(url_for('index')) except IntegrityError: db.session.rollback() flash(_('Steam Key already exists!'), 'danger') except Exception as e: db.session.rollback() flash(_('Error: ') + str(e), 'danger') return render_template('add_game.html') @app.route('/edit/', methods=['GET', 'POST']) @login_required def edit_game(game_id): game = db.session.get(Game, game_id) if not game or game.owner != current_user: abort(404) if not game or game.owner != current_user: abort(403) active_redeem = RedeemToken.query.filter( RedeemToken.game_id == game_id, RedeemToken.expires > datetime.utcnow() ).first() redeem_url = url_for('redeem_page', token=active_redeem.token, _external=True) if active_redeem else None if request.method == 'POST': try: url = request.form.get('url', '') steam_appid = request.form.get('steam_appid', '').strip() if not steam_appid: steam_appid = extract_steam_appid(url) game.name = request.form['name'] game.steam_key = request.form['steam_key'] game.status = request.form['status'] game.recipient = request.form.get('recipient', '') game.notes = request.form.get('notes', '') game.url = url game.steam_appid = steam_appid game.redeem_date = datetime.strptime(request.form['redeem_date'], '%Y-%m-%d') if request.form['redeem_date'] else None db.session.commit() flash(_('Changes saved!'), 'success') return redirect(url_for('index')) except Exception as e: db.session.rollback() flash(_('Error: ') + str(e), 'danger') return render_template('edit_game.html', game=game, redeem_url=redeem_url, active_redeem=active_redeem, redeem_date=game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '') @app.route('/delete/', methods=['POST']) @login_required def delete_game(game_id): game = db.session.get(Game, game_id) if not game or game.owner != current_user: abort(404) if game.owner != current_user: abort(403) try: db.session.delete(game) db.session.commit() except Exception as e: db.session.rollback() return redirect(url_for('index')) @app.route('/export', methods=['GET']) @login_required def export_games(): games = Game.query.filter_by(user_id=current_user.id).all() output = io.StringIO() writer = csv.writer(output) writer.writerow(['Name', 'Steam Key', 'Status', 'Recipient', 'Notes', 'URL', 'Created', 'Redeem by', 'Steam AppID']) for game in games: writer.writerow([ game.name, game.steam_key, game.status, game.recipient, game.notes, game.url, game.created_at.strftime('%Y-%m-%d %H:%M:%S') if game.created_at else '', game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '', game.steam_appid ]) output.seek(0) return send_file( io.BytesIO(output.getvalue().encode('utf-8')), mimetype='text/csv', as_attachment=True, download_name='games_export.csv' ) @app.route('/export_pdf') @login_required def export_pdf(): excluded_statuses = ['eingelöst', 'verschenkt'] games = Game.query.filter( Game.user_id == current_user.id, Game.status.notin_(excluded_statuses) ).order_by(Game.created_at.desc()).all() buffer = io.BytesIO() doc = SimpleDocTemplate(buffer, pagesize=landscape(A4), leftMargin=40, rightMargin=40, topMargin=40, bottomMargin=40 ) styles = getSampleStyleSheet() elements = [] img_height = 2*cm # Titel elements.append(Paragraph(_("Game List (without Keys)"), styles['Title'])) elements.append(Spacer(1, 12)) # Tabellenkopf col_widths = [ 5*cm, 10*cm, 6*cm, 3*cm ] data = [[ Paragraph('Cover', styles['Normal']), Paragraph('Name', styles['Normal']), Paragraph('Shop-Link', styles['Normal']), Paragraph('Einlösen bis', styles['Normal']) ]] for game in games: img = None if game.steam_appid: try: img_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{game.steam_appid}/header.jpg" img_data = io.BytesIO(requests.get(img_url, timeout=5).content) img = Image(img_data, width=3*cm, height=img_height) except Exception: img = Paragraph('', styles['Normal']) data.append([ img or '', Paragraph(game.name, styles['Normal']), Paragraph(game.url or '', styles['Normal']), game.redeem_date.strftime('%d.%m.%y') if game.redeem_date else '' ]) # Table format table = Table(data, colWidths=col_widths, repeatRows=1) table.setStyle(TableStyle([ ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'), ('FONTSIZE', (0,0), (-1,0), 8), ('FONTSIZE', (0,1), (-1,-1), 8), ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), ('ALIGN', (0,0), (-1,-1), 'LEFT'), ('GRID', (0,0), (-1,-1), 0.5, colors.lightgrey), ('WORDWRAP', (1,1), (1,-1), 'CJK'), ])) elements.append(table) doc.build(elements) buffer.seek(0) return send_file( buffer, mimetype='application/pdf', as_attachment=True, download_name=f'game_export_{datetime.now().strftime("%Y%m%d")}.pdf' ) @app.route('/import', methods=['GET', 'POST']) @login_required def import_games(): if request.method == 'POST': file = request.files.get('file') if file and file.filename.endswith('.csv'): stream = io.StringIO(file.stream.read().decode("UTF8")) reader = csv.DictReader(stream) new_games = 0 duplicates = 0 try: with db.session.begin_nested(): for row in reader: steam_key = row['Steam Key'].strip() if Game.query.filter_by(steam_key=steam_key).first(): duplicates += 1 continue game = Game( name=row['Name'], steam_key=steam_key, status=row['Status'], recipient=row.get('Recipient', ''), notes=row.get('Notes', ''), url=row.get('URL', ''), created_at=datetime.strptime(row['Created'], '%Y-%m-%d %H:%M:%S') if row.get('Created') else datetime.utcnow(), redeem_date=datetime.strptime(row['Redeem by'], '%Y-%m-%d') if row.get('Redeem by') else None, steam_appid=row.get('Steam AppID', ''), user_id=current_user.id ) db.session.add(game) new_games += 1 db.session.commit() flash(_('%(new)d neue Spiele importiert, %(dup)d Duplikate übersprungen', new=new_games, dup=duplicates), 'success') except Exception as e: db.session.rollback() flash(_('Importfehler: %(error)s', error=str(e)), 'danger') return redirect(url_for('index')) flash(_('Bitte eine gültige CSV-Datei hochladen.'), 'danger') return render_template('import.html') @app.route('/generate_redeem/', methods=['POST']) @login_required def generate_redeem(game_id): game = db.session.get(Game, game_id) if not game or game.owner != current_user: abort(403) if game.owner != current_user or game.status != 'verschenkt': abort(403) try: token = secrets.token_urlsafe(12)[:17] expires = datetime.utcnow() + timedelta(hours=24) total_hours = 24 RedeemToken.query.filter_by(game_id=game_id).delete() new_token = RedeemToken( token=token, game_id=game_id, expires=expires, total_hours=24 ) db.session.add(new_token) db.session.commit() redeem_url = url_for('redeem_page', token=token, _external=True) return jsonify({'url': redeem_url}) except Exception as e: app.logger.error(f"Redeem error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/redeem/') def redeem_page(token): redeem_token = RedeemToken.query.filter_by(token=token).first() if not redeem_token: abort(404) if redeem_token.expires < datetime.utcnow(): db.session.delete(redeem_token) db.session.commit() abort(404) game = Game.query.get(redeem_token.game_id) redeem_token.used = True db.session.commit() return render_template('redeem.html', game=game, redeem_token=redeem_token, platform_link='https://store.steampowered.com/account/registerkey?key=' if game.steam_appid else 'https://www.gog.com/redeem') # Benachrichtigungsfunktionen def send_pushover_notification(user, game): """Sendet Pushover-Benachrichtigung für ablaufenden Key""" if not app.config['PUSHOVER_APP_TOKEN'] or not app.config['PUSHOVER_USER_KEY']: return False payload = { "token": os.getenv('PUSHOVER_APP_TOKEN'), "user": os.getenv('PUSHOVER_USER_KEY'), "title": "Steam-Key läuft ab!", "message": f"Dein Key für '{game.name}' läuft in weniger als 48 Stunden ab!", "url": url_for('edit_game', game_id=game.id, _external=True), "url_title": "Zum Spiel", "priority": 1 } try: response = requests.post( 'https://api.pushover.net/1/messages.json', data=payload ) return response.status_code == 200 except Exception as e: app.logger.error(f"Pushover error: {str(e)}") return False def send_gotify_notification(user, game): """Sendet Gotify-Benachrichtigung für ablaufenden Key""" if not GOTIFY_URL or not GOTIFY_TOKEN: return False payload = { "title": "Steam-Key läuft ab!", "message": f"Dein Key für '{game.name}' läuft in weniger als 48 Stunden ab!", "priority": 5 } try: response = requests.post( f"{GOTIFY_URL}/message?token={GOTIFY_TOKEN}", json=payload ) return response.status_code == 200 except Exception as e: app.logger.error(f"Gotify error: {str(e)}") return False def send_matrix_notification(user, game): """Sendet Matrix-Benachrichtigung für ablaufenden Key""" if not MATRIX_HOMESERVER or not MATRIX_ACCESS_TOKEN or not MATRIX_ROOM_ID: return False try: from matrix_client.client import MatrixClient client = MatrixClient(MATRIX_HOMESERVER, token=MATRIX_ACCESS_TOKEN) room = client.join_room(MATRIX_ROOM_ID) message = f"🎮 Dein Key für '{game.name}' läuft in weniger als 48 Stunden ab!" room.send_text(message) return True except Exception as e: app.logger.error(f"Matrix error: {str(e)}") return False def send_notification(user, game): """Sendet Benachrichtigung über den bevorzugten Dienst des Benutzers""" if user.notification_service == 'pushover': return send_pushover_notification(user, game) elif user.notification_service == 'gotify': return send_gotify_notification(user, game) elif user.notification_service == 'matrix': return send_matrix_notification(user, game) return False def check_expiring_keys(): with app.app_context(): now = datetime.utcnow() expiry_threshold = now + timedelta(hours=48) # Moderner Select-Aufruf stmt = select(Game).where( Game.status != 'eingelöst', Game.redeem_date <= expiry_threshold, Game.redeem_date > now ) expiring_games = db.session.execute(stmt).scalars().all() for game in expiring_games: user = User.query.get(game.user_id) if user.notification_service and user.notification_service != 'none': send_notification(user, game) # Optional: cleaning up old tokens def cleanup_expired_tokens(): now = datetime.utcnow() expired = RedeemToken.query.filter(RedeemToken.expires < now).all() for token in expired: db.session.delete(token) db.session.commit() # Scheduler start scheduler = BackgroundScheduler() scheduler.add_job(func=check_expiring_keys, trigger="interval", hours=interval_hours) scheduler.add_job(func=cleanup_expired_tokens, trigger="interval", hours=1) scheduler.start() # Shutdown of the Schedulers when stopping the app atexit.register(lambda: scheduler.shutdown()) if __name__ == '__main__': with app.app_context(): db.create_all() app.run(host='0.0.0.0', port=5000)