GameKeyManager/steam-gift-manager/app.py

671 lines
22 KiB
Python

import os
import warnings
from sqlalchemy.exc import LegacyAPIWarning
warnings.simplefilter("ignore", category=LegacyAPIWarning)
from flask import Flask, render_template, request, redirect, url_for, flash, make_response, session, abort, send_file, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
from flask_wtf import CSRFProtect
from flask import abort
from flask import request, redirect
import io
import warnings
import re
import io
import csv
import secrets
import requests
from dotenv import load_dotenv
load_dotenv(override=True)
from sqlalchemy.exc import IntegrityError
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
from flask_migrate import Migrate
from sqlalchemy import MetaData
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4, landscape, letter
from reportlab.platypus import (
SimpleDocTemplate,
Table,
TableStyle,
Paragraph,
Image,
Spacer
)
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.utils import ImageReader
from reportlab.lib.units import cm, inch, mm
from io import BytesIO
import reportlab.lib
import logging
logging.basicConfig()
app = Flask(__name__)
# Load Languages
import os
import json
TRANSLATION_DIR = os.path.join(os.path.dirname(__file__), 'translations')
SUPPORTED_LANGUAGES = ['de', 'en']
TRANSLATIONS = {}
for lang in SUPPORTED_LANGUAGES:
try:
with open(os.path.join(TRANSLATION_DIR, f'{lang}.json'), encoding='utf-8') as f:
TRANSLATIONS[lang] = json.load(f)
except Exception:
TRANSLATIONS[lang] = {}
def translate(key, lang=None, **kwargs):
if not lang:
lang = session.get('lang', 'en')
value = TRANSLATIONS.get(lang, {}).get(key)
if value is None and lang != 'en':
value = TRANSLATIONS.get('en', {}).get(key, key)
else:
value = value or key
return value.format(**kwargs) if kwargs and isinstance(value, str) else value
## DEBUG Translations
if app.debug:
print(f"Loaded translations for 'de': {TRANSLATIONS.get('de', {})}")
csrf = CSRFProtect(app)
convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=convention)
load_dotenv(override=True)
# Lade Umgebungsvariablen aus .env mit override
load_dotenv(override=True)
# App-Configuration
app.config.update(
SECRET_KEY=os.getenv('SECRET_KEY'),
SQLALCHEMY_DATABASE_URI='sqlite:////app/data/games.db',
SQLALCHEMY_TRACK_MODIFICATIONS=False,
SESSION_COOKIE_SECURE=os.getenv('SESSION_COOKIE_SECURE', 'False') == 'True',
SESSION_COOKIE_SAMESITE='Lax',
PERMANENT_SESSION_LIFETIME=timedelta(days=30),
SESSION_REFRESH_EACH_REQUEST=False,
WTF_CSRF_ENABLED=os.getenv('CSRF_ENABLED', 'True') == 'True',
REGISTRATION_ENABLED=os.getenv('REGISTRATION_ENABLED', 'True').lower() == 'true',
SEND_FILE_MAX_AGE_DEFAULT=int(os.getenv('SEND_FILE_MAX_AGE_DEFAULT', 0)),
TEMPLATES_AUTO_RELOAD=os.getenv('TEMPLATES_AUTO_RELOAD', 'True') == 'True'
)
interval_hours = int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12))
# Initialisation
db = SQLAlchemy(app, metadata=metadata)
migrate = Migrate(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# Logging
app.logger.addHandler(logging.StreamHandler())
app.logger.setLevel(logging.INFO)
@app.before_request
def enforce_https():
if os.getenv('FORCE_HTTPS', 'False').lower() == 'true':
if request.headers.get('X-Forwarded-Proto', 'http') != 'https' and not request.is_secure:
url = request.url.replace('http://', 'https://', 1)
app.logger.info(f"Redirecting to HTTPS: {url}")
return redirect(url, code=301)
@app.context_processor
def inject_template_vars():
def _(key, **kwargs):
lang = session.get('lang', 'en')
return translate(key, lang, **kwargs)
theme = request.cookies.get('theme', 'light')
return dict(_=_, theme=theme)
# DB Models
class User(db.Model, UserMixin):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(256), nullable=False)
games = db.relationship('Game', back_populates='owner', lazy=True)
class Game(db.Model):
id = db.Column(db.Integer, primary_key=True)
owner = db.relationship('User', back_populates='games')
name = db.Column(db.String(100), nullable=False)
steam_key = db.Column(db.String(100), nullable=False, unique=True)
status = db.Column(db.String(50), nullable=False)
recipient = db.Column(db.String(100))
notes = db.Column(db.Text)
url = db.Column(db.String(200))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
redeem_date = db.Column(db.DateTime)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
steam_appid = db.Column(db.String(20))
class RedeemToken(db.Model):
id = db.Column(db.Integer, primary_key=True)
token = db.Column(db.String(17), unique=True, nullable=False)
game_id = db.Column(db.Integer, db.ForeignKey('game.id'), nullable=False)
expires = db.Column(db.DateTime, nullable=False)
used = db.Column(db.Boolean, default=False)
total_hours = db.Column(db.Integer, nullable=False)
with app.app_context():
db.create_all()
@login_manager.user_loader
def load_user(user_id):
return db.session.get(User, int(user_id))
def extract_steam_appid(url):
match = re.search(r'store\.steampowered\.com/app/(\d+)', url or '')
return match.group(1) if match else ''
# 404
def get_or_404(model, id):
instance = db.session.get(model, id)
if not instance:
abort(404)
return instance
@app.route('/')
@login_required
def index():
search_query = request.args.get('q', '')
query = Game.query.filter_by(user_id=current_user.id)
if search_query:
query = query.filter(Game.name.ilike(f'%{search_query}%'))
games = query.order_by(Game.created_at.desc()).all()
return render_template('index.html',
games=games,
format_date=lambda dt: dt.strftime('%d.%m.%Y') if dt else '',
search_query=search_query)
@app.route('/set-lang/<lang>')
def set_lang(lang):
if lang in SUPPORTED_LANGUAGES:
session['lang'] = lang
return redirect(request.referrer or url_for('index'))
@app.route('/set-theme/<theme>')
def set_theme(theme):
resp = make_response('', 204)
# Von 'dark_mode' zu 'theme' ändern
resp.set_cookie('theme', theme, max_age=60*60*24*365)
return resp
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password, password):
login_user(user)
return redirect(url_for('index'))
flash(_('Invalid credentials'), 'danger')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if not app.config['REGISTRATION_ENABLED']:
flash(_('No new registrations. They are deactivated!'), 'danger')
return redirect(url_for('login'))
if request.method == 'POST':
username = request.form['username']
password = generate_password_hash(request.form['password'])
if User.query.filter_by(username=username).first():
flash(_('Username already exists'), 'danger')
return redirect(url_for('register'))
new_user = User(username=username, password=password)
db.session.add(new_user)
db.session.commit()
login_user(new_user)
return redirect(url_for('index'))
return render_template('register.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
@app.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
if request.method == 'POST':
current_password = request.form['current_password']
new_password = request.form['new_password']
confirm_password = request.form['confirm_password']
if not check_password_hash(current_user.password, current_password):
flash(_('Current passwort is wrong'), 'danger')
return redirect(url_for('change_password'))
if new_password != confirm_password:
flash(_('New Passwords are not matching'), 'danger')
return redirect(url_for('change_password'))
current_user.password = generate_password_hash(new_password)
db.session.commit()
flash(_('Password changed successfully'), 'success')
return redirect(url_for('index'))
return render_template('change_password.html')
@app.route('/add', methods=['GET', 'POST'])
@login_required
def add_game():
if request.method == 'POST':
try:
url = request.form.get('url', '')
steam_appid = request.form.get('steam_appid', '').strip()
if not steam_appid:
steam_appid = extract_steam_appid(url)
new_game = Game(
name=request.form['name'],
steam_key=request.form['steam_key'],
status=request.form['status'],
recipient=request.form.get('recipient', ''),
notes=request.form.get('notes', ''),
url=url,
steam_appid=steam_appid,
redeem_date=datetime.strptime(request.form['redeem_date'], '%Y-%m-%d') if request.form['redeem_date'] else None,
user_id=current_user.id
)
db.session.add(new_game)
db.session.commit()
flash(_('Game added successfully!'), 'success')
return redirect(url_for('index'))
except IntegrityError:
db.session.rollback()
flash(_('Steam Key already exists!'), 'danger')
except Exception as e:
db.session.rollback()
flash(_('Error: ') + str(e), 'danger')
return render_template('add_game.html')
@app.route('/edit/<int:game_id>', methods=['GET', 'POST'])
@login_required
def edit_game(game_id):
game = db.session.get(Game, game_id)
if not game or game.owner != current_user:
abort(404)
if not game or game.owner != current_user:
abort(403)
active_redeem = RedeemToken.query.filter(
RedeemToken.game_id == game_id,
RedeemToken.expires > datetime.utcnow()
).first()
redeem_url = url_for('redeem_page', token=active_redeem.token, _external=True) if active_redeem else None
if request.method == 'POST':
try:
url = request.form.get('url', '')
steam_appid = request.form.get('steam_appid', '').strip()
if not steam_appid:
steam_appid = extract_steam_appid(url)
game.name = request.form['name']
game.steam_key = request.form['steam_key']
game.status = request.form['status']
game.recipient = request.form.get('recipient', '')
game.notes = request.form.get('notes', '')
game.url = url
game.steam_appid = steam_appid
game.redeem_date = datetime.strptime(request.form['redeem_date'], '%Y-%m-%d') if request.form['redeem_date'] else None
db.session.commit()
flash(_('Changes saved!'), 'success')
return redirect(url_for('index'))
except Exception as e:
db.session.rollback()
flash(_('Error: ') + str(e), 'danger')
return render_template('edit_game.html',
game=game,
redeem_url=redeem_url,
active_redeem=active_redeem,
redeem_date=game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '')
@app.route('/delete/<int:game_id>', methods=['POST'])
@login_required
def delete_game(game_id):
game = db.session.get(Game, game_id)
if not game or game.owner != current_user:
abort(404)
if game.owner != current_user:
abort(403)
try:
db.session.delete(game)
db.session.commit()
except Exception as e:
db.session.rollback()
return redirect(url_for('index'))
@app.route('/export', methods=['GET'])
@login_required
def export_games():
games = Game.query.filter_by(user_id=current_user.id).all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['Name', 'Steam Key', 'Status', 'Recipient', 'Notes', 'URL', 'Created', 'Redeem by', 'Steam AppID'])
for game in games:
writer.writerow([
game.name,
game.steam_key,
game.status,
game.recipient,
game.notes,
game.url,
game.created_at.strftime('%Y-%m-%d %H:%M:%S') if game.created_at else '',
game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '',
game.steam_appid
])
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8')),
mimetype='text/csv',
as_attachment=True,
download_name='games_export.csv'
)
@app.route('/export_pdf')
@login_required
def export_pdf():
excluded_statuses = ['eingelöst', 'verschenkt']
games = Game.query.filter(
Game.user_id == current_user.id,
Game.status.notin_(excluded_statuses)
).order_by(Game.created_at.desc()).all()
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer,
pagesize=landscape(A4),
leftMargin=40,
rightMargin=40,
topMargin=40,
bottomMargin=40
)
styles = getSampleStyleSheet()
elements = []
img_height = 2*cm
# Titel
elements.append(Paragraph(_("Game List (without Keys)"), styles['Title']))
elements.append(Spacer(1, 12))
# Tabellenkopf
col_widths = [
5*cm, 10*cm, 6*cm, 3*cm
]
data = [[
Paragraph('<b>Cover</b>', styles['Normal']),
Paragraph('<b>Name</b>', styles['Normal']),
Paragraph('<b>Shop-Link</b>', styles['Normal']),
Paragraph('<b>Einlösen bis</b>', styles['Normal'])
]]
for game in games:
img = None
if game.steam_appid:
try:
img_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{game.steam_appid}/header.jpg"
img_data = io.BytesIO(requests.get(img_url, timeout=5).content)
img = Image(img_data, width=3*cm, height=img_height)
except Exception:
img = Paragraph('', styles['Normal'])
elif game.url and 'gog.com' in game.url:
try:
img_path = os.path.join(app.root_path, 'static', 'gog_logo.webp')
img = Image(img_path, width=3*cm, height=img_height)
except Exception:
img = Paragraph('', styles['Normal'])
data.append([
img or '',
Paragraph(game.name, styles['Normal']),
Paragraph(game.url or '', styles['Normal']),
game.redeem_date.strftime('%d.%m.%y') if game.redeem_date else ''
])
# Table format (korrekte Einrückung)
table = Table(data, colWidths=col_widths, repeatRows=1)
table.setStyle(TableStyle([
('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'),
('FONTSIZE', (0,0), (-1,0), 8),
('FONTSIZE', (0,1), (-1,-1), 8),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('ALIGN', (0,0), (-1,-1), 'LEFT'),
('GRID', (0,0), (-1,-1), 0.5, colors.lightgrey),
('WORDWRAP', (1,1), (1,-1), 'CJK'),
]))
elements.append(table)
doc.build(elements)
buffer.seek(0)
return send_file(
buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=f'game_export_{datetime.now().strftime("%Y%m%d")}.pdf'
)
@app.route('/import', methods=['GET', 'POST'])
@login_required
def import_games():
if request.method == 'POST':
file = request.files.get('file')
if file and file.filename.endswith('.csv'):
stream = io.StringIO(file.stream.read().decode("UTF8"))
reader = csv.DictReader(stream)
new_games = 0
duplicates = 0
try:
with db.session.begin_nested():
for row in reader:
steam_key = row['Steam Key'].strip()
if Game.query.filter_by(steam_key=steam_key).first():
duplicates += 1
continue
game = Game(
name=row['Name'],
steam_key=steam_key,
status=row['Status'],
recipient=row.get('Recipient', ''),
notes=row.get('Notes', ''),
url=row.get('URL', ''),
created_at=datetime.strptime(row['Created'], '%Y-%m-%d %H:%M:%S') if row.get('Created') else datetime.utcnow(),
redeem_date=datetime.strptime(row['Redeem by'], '%Y-%m-%d') if row.get('Redeem by') else None,
steam_appid=row.get('Steam AppID', ''),
user_id=current_user.id
)
db.session.add(game)
new_games += 1
db.session.commit()
flash(_('%(new)d new games imported, %(dup)d skipped duplicates', new=new_games, dup=duplicates), 'success')
except Exception as e:
db.session.rollback()
flash(_('Import error: %(error)s', error=str(e)), 'danger')
return redirect(url_for('index'))
flash(_('Please upload a valid CSV file.'), 'danger')
return render_template('import.html')
@app.route('/generate_redeem/<int:game_id>', methods=['POST'])
@login_required
def generate_redeem(game_id):
game = db.session.get(Game, game_id)
if not game or game.owner != current_user:
abort(403)
if game.owner != current_user or game.status != 'verschenkt':
abort(403)
try:
token = secrets.token_urlsafe(12)[:17]
expires = datetime.utcnow() + timedelta(hours=24)
total_hours = 24
RedeemToken.query.filter_by(game_id=game_id).delete()
new_token = RedeemToken(
token=token,
game_id=game_id,
expires=expires,
total_hours=24
)
db.session.add(new_token)
db.session.commit()
redeem_url = url_for('redeem_page', token=token, _external=True)
return jsonify({'url': redeem_url})
except Exception as e:
app.logger.error(f"Redeem error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/redeem/<token>')
def redeem_page(token):
redeem_token = RedeemToken.query.filter_by(token=token).first()
if not redeem_token:
abort(404)
if redeem_token.expires < datetime.utcnow():
db.session.delete(redeem_token)
db.session.commit()
abort(404)
game = Game.query.get(redeem_token.game_id)
redeem_token.used = True
db.session.commit()
return render_template('redeem.html',
game=game,
redeem_token=redeem_token,
platform_link='https://store.steampowered.com/account/registerkey?key=' if game.steam_appid else 'https://www.gog.com/redeem')
# Apprise Notifications
import apprise
def send_apprise_notification(user, game):
apprise_urls = os.getenv('APPRISE_URLS', '').strip()
if not apprise_urls:
app.logger.error("No APPRISE_URLS configured")
return False
apobj = apprise.Apprise()
for url in apprise_urls.replace(',', '\n').splitlines():
if url.strip():
apobj.add(url.strip())
edit_url = url_for('edit_game', game_id=game.id, _external=True)
result = apobj.notify(
title="Steam-Key läuft ab!",
body=f"Dein Key für '{game.name}' läuft in weniger als 48 Stunden ab!\n\nLink: {edit_url}",
)
return result
def send_notification(user, game):
return send_apprise_notification(user, game)
def check_expiring_keys():
with app.app_context():
now = datetime.utcnow()
expiry_threshold = now + timedelta(hours=48)
# Moderner Select-Aufruf
stmt = select(Game).where(
Game.status != 'eingelöst',
Game.redeem_date <= expiry_threshold,
Game.redeem_date > now
)
expiring_games = db.session.execute(stmt).scalars().all()
for game in expiring_games:
user = User.query.get(game.user_id)
if user.notification_service and user.notification_service != 'none':
send_notification(user, game)
# Optional: cleaning up old tokens
def cleanup_expired_tokens():
now = datetime.utcnow()
expired = RedeemToken.query.filter(RedeemToken.expires < now).all()
for token in expired:
db.session.delete(token)
db.session.commit()
# Scheduler start
scheduler = BackgroundScheduler()
scheduler.add_job(func=check_expiring_keys, trigger="interval", hours=interval_hours)
scheduler.add_job(func=cleanup_expired_tokens, trigger="interval", hours=1)
scheduler.start()
# Shutdown of the Schedulers when stopping the app
atexit.register(lambda: scheduler.shutdown())
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(host='0.0.0.0', port=5000)