1328 lines
43 KiB
Python
1328 lines
43 KiB
Python
# Standard library imports
|
|
import atexit
|
|
import csv
|
|
import io
|
|
import locale
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import secrets
|
|
import sqlite3
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from io import BytesIO
|
|
from time import sleep
|
|
from urllib.parse import urlparse
|
|
from zoneinfo import ZoneInfo
|
|
import warnings
|
|
|
|
# 3rd-Provider-Modules
|
|
import pytz
|
|
import requests
|
|
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from dotenv import load_dotenv
|
|
from flask import (
|
|
Flask,
|
|
Markup,
|
|
abort,
|
|
flash,
|
|
g,
|
|
jsonify,
|
|
make_response,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
send_file,
|
|
session,
|
|
url_for
|
|
)
|
|
from flask_login import (
|
|
LoginManager,
|
|
UserMixin,
|
|
current_user,
|
|
login_required,
|
|
login_user,
|
|
logout_user
|
|
)
|
|
from flask_migrate import Migrate
|
|
from flask_session import Session
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_wtf import CSRFProtect, FlaskForm
|
|
from redis import Redis
|
|
from reportlab.lib import colors
|
|
from reportlab.lib.pagesizes import A4, landscape, letter
|
|
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
|
|
from reportlab.lib.units import cm, inch, mm
|
|
from reportlab.lib.utils import ImageReader
|
|
from reportlab.pdfgen import canvas
|
|
from reportlab.platypus import (
|
|
Image,
|
|
Paragraph,
|
|
SimpleDocTemplate,
|
|
Spacer,
|
|
Table,
|
|
TableStyle
|
|
)
|
|
from sqlalchemy import MetaData, UniqueConstraint, event
|
|
from sqlalchemy.engine import Engine
|
|
from sqlalchemy.exc import IntegrityError, LegacyAPIWarning
|
|
from sqlalchemy.orm import joinedload
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
from wtforms import SelectField, StringField, TextAreaField, validators
|
|
|
|
# Config
|
|
load_dotenv(override=True)
|
|
warnings.simplefilter("ignore", category=LegacyAPIWarning)
|
|
|
|
# Logging-Config
|
|
logging.basicConfig(level=logging.INFO)
|
|
logging.getLogger('apscheduler').setLevel(logging.WARNING)
|
|
|
|
|
|
@event.listens_for(Engine, "connect")
|
|
def enable_foreign_keys(dbapi_connection, connection_record):
|
|
if isinstance(dbapi_connection, sqlite3.Connection):
|
|
cursor = dbapi_connection.cursor()
|
|
cursor.execute("PRAGMA foreign_keys=ON;")
|
|
cursor.close()
|
|
|
|
ITAD_API_KEY_PLACEHOLDER = "your_api_key_here"
|
|
TZ = os.getenv('TZ', 'UTC')
|
|
os.environ['TZ'] = TZ
|
|
app = Flask(__name__)
|
|
app.jinja_env.globals['getattr'] = getattr
|
|
|
|
@app.errorhandler(404)
|
|
def not_found_error(error):
|
|
return render_template('404.html'), 404
|
|
|
|
|
|
# UNIX-Systems (Linux, Docker)
|
|
try:
|
|
time.tzset()
|
|
except AttributeError:
|
|
pass # tzset not availabe on Windows
|
|
local_tz = pytz.timezone(TZ)
|
|
|
|
# Load Languages
|
|
import os
|
|
import json
|
|
|
|
|
|
TRANSLATION_DIR = os.path.join(os.getcwd(), 'translations')
|
|
SUPPORTED_LANGUAGES = ['de', 'en']
|
|
TRANSLATIONS = {}
|
|
|
|
for lang in SUPPORTED_LANGUAGES:
|
|
try:
|
|
with open(os.path.join(TRANSLATION_DIR, f'{lang}.json'), encoding='utf-8') as f:
|
|
TRANSLATIONS[lang] = json.load(f)
|
|
print(f"✅ Loaded {lang} translations")
|
|
except Exception:
|
|
print(f"❌ Failed loading {lang}.json: {str(e)}")
|
|
TRANSLATIONS[lang] = {}
|
|
|
|
def translate(key, lang=None, **kwargs):
|
|
lang = lang or session.get('lang', 'en')
|
|
fallback_lang = app.config.get('DEFAULT_LANGUAGE', 'en')
|
|
|
|
translations = TRANSLATIONS.get(lang, {})
|
|
fallback_translations = TRANSLATIONS.get(fallback_lang, {})
|
|
|
|
value = translations.get(key) or fallback_translations.get(key) or key
|
|
return value.format(**kwargs) if isinstance(value, str) else value
|
|
|
|
## DEBUG Translations
|
|
if app.debug:
|
|
print(f"Loaded translations for 'de': {TRANSLATIONS.get('de', {})}")
|
|
|
|
### Admin decorator
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated:
|
|
abort(403)
|
|
if not current_user.is_admin:
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
csrf = CSRFProtect(app)
|
|
|
|
convention = {
|
|
"ix": "ix_%(column_0_label)s",
|
|
"uq": "uq_%(table_name)s_%(column_0_name)s",
|
|
"ck": "ck_%(table_name)s_%(constraint_name)s",
|
|
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
|
|
"pk": "pk_%(table_name)s"
|
|
}
|
|
|
|
metadata = MetaData(naming_convention=convention)
|
|
load_dotenv(override=True)
|
|
|
|
# load variables from .env with override
|
|
load_dotenv(override=True)
|
|
|
|
# App-Configuration
|
|
app.config.update(
|
|
# Most Important
|
|
SECRET_KEY=os.getenv('SECRET_KEY'),
|
|
SQLALCHEMY_DATABASE_URI = 'sqlite:////app/data/games.db',
|
|
SQLALCHEMY_TRACK_MODIFICATIONS = False,
|
|
DEFAULT_LANGUAGE='en',
|
|
ITAD_COUNTRY = os.getenv("ITAD_COUNTRY", "DE"),
|
|
|
|
# SESSION-HANDLING (In Production: Use Redis!)
|
|
SESSION_TYPE='redis',
|
|
SESSION_PERMANENT = False,
|
|
SESSION_USE_SIGNER = True,
|
|
SESSION_REDIS=Redis.from_url(os.getenv("REDIS_URL", "redis://redis:6379/0")),
|
|
SESSION_FILE_DIR = '/app/data/flask-sessions',
|
|
SESSION_COOKIE_NAME = 'gamekeys_session',
|
|
SESSION_COOKIE_SECURE = os.getenv('SESSION_COOKIE_SECURE', 'False').lower() == 'true',
|
|
SESSION_COOKIE_HTTPONLY = True,
|
|
SESSION_COOKIE_SAMESITE = 'Lax',
|
|
PERMANENT_SESSION_LIFETIME = timedelta(days=30),
|
|
|
|
|
|
# LOGIN COOKIE STUFF
|
|
REMEMBER_COOKIE_DURATION=timedelta(days=30),
|
|
REMEMBER_COOKIE_HTTPONLY=True,
|
|
REMEMBER_COOKIE_SECURE=True if os.getenv('FORCE_HTTPS', 'False').lower() == 'true' else False,
|
|
REMEMBER_COOKIE_SAMESITE='Lax',
|
|
|
|
# CSRF-PROTECTION
|
|
WTF_CSRF_ENABLED = True,
|
|
WTF_CSRF_SECRET_KEY = os.getenv('CSRF_SECRET_KEY', os.urandom(32).hex()),
|
|
WTF_CSRF_TIME_LIMIT = 3600,
|
|
|
|
# SECURITYsa & PERFORMANCE
|
|
REGISTRATION_ENABLED = os.getenv('REGISTRATION_ENABLED', 'True').lower() == 'true',
|
|
SEND_FILE_MAX_AGE_DEFAULT = int(os.getenv('SEND_FILE_MAX_AGE_DEFAULT', 0)),
|
|
TEMPLATES_AUTO_RELOAD = os.getenv('TEMPLATES_AUTO_RELOAD', 'True').lower() == 'true',
|
|
PREFERRED_URL_SCHEME = 'https' if os.getenv('FORCE_HTTPS') else 'http'
|
|
)
|
|
|
|
|
|
Session(app)
|
|
|
|
interval_hours = int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12))
|
|
|
|
# Init
|
|
db = SQLAlchemy(app, metadata=metadata)
|
|
migrate = Migrate(app, db)
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = 'login'
|
|
|
|
# Logging
|
|
app.logger.addHandler(logging.StreamHandler())
|
|
app.logger.setLevel(logging.DEBUG)
|
|
|
|
@app.errorhandler(403)
|
|
def forbidden_error(error):
|
|
return render_template('403.html'), 403
|
|
|
|
|
|
@app.before_request
|
|
def set_language():
|
|
if 'lang' not in session or not session['lang']:
|
|
session['lang'] = app.config.get('DEFAULT_LANGUAGE', 'en')
|
|
g.lang = session['lang']
|
|
|
|
def enforce_https():
|
|
if os.getenv('FORCE_HTTPS', 'False').lower() == 'true' and not app.debug:
|
|
proto = request.headers.get('X-Forwarded-Proto', 'http')
|
|
if proto != 'https' and not request.is_secure:
|
|
url = request.url.replace('http://', 'https://', 1)
|
|
return redirect(url, code=301)
|
|
|
|
def debug_translations():
|
|
if app.debug:
|
|
app.logger.debug(f"Lang: {session.get('lang')}")
|
|
|
|
app.before_request(enforce_https)
|
|
|
|
|
|
|
|
@app.context_processor
|
|
def inject_template_globals():
|
|
return {
|
|
'_': lambda key, **kwargs: translate(key, lang=session.get('lang', 'en'), **kwargs),
|
|
'now': datetime.now(local_tz),
|
|
'app_version': os.getenv('APP_VERSION', '1.0.0'),
|
|
'local_tz': local_tz
|
|
}
|
|
|
|
@app.template_filter('strftime')
|
|
def _jinja2_filter_datetime(date, fmt='%d.%m.%Y'):
|
|
if date is None:
|
|
return ''
|
|
return date.strftime(fmt)
|
|
|
|
@app.errorhandler(403)
|
|
def forbidden(e):
|
|
return render_template('403.html'), 403
|
|
|
|
|
|
# DB Models
|
|
class ActivityLog(db.Model):
|
|
__tablename__ = 'activity_logs'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
|
action = db.Column(db.String(100), nullable=False)
|
|
details = db.Column(db.Text)
|
|
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(local_tz))
|
|
|
|
user = db.relationship('User', backref='activities')
|
|
|
|
|
|
class User(UserMixin, db.Model):
|
|
__tablename__ = 'users'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(80), unique=True, nullable=False)
|
|
password = db.Column(db.String(256), nullable=False)
|
|
is_admin = db.Column(db.Boolean, default=False)
|
|
games = db.relationship(
|
|
'Game',
|
|
back_populates='owner',
|
|
cascade='all, delete-orphan',
|
|
passive_deletes=True
|
|
)
|
|
|
|
|
|
class Game(db.Model):
|
|
__tablename__ = 'games'
|
|
__table_args__ = (
|
|
UniqueConstraint('steam_key', 'user_id', name='uq_steam_key_user'),
|
|
)
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
steam_key = db.Column(db.String(100), nullable=False, unique=True)
|
|
status = db.Column(db.String(50), nullable=False)
|
|
recipient = db.Column(db.String(100))
|
|
notes = db.Column(db.Text)
|
|
url = db.Column(db.String(200))
|
|
created_at = db.Column(db.DateTime, default=lambda: datetime.now(local_tz))
|
|
redeem_date = db.Column(db.DateTime)
|
|
steam_appid = db.Column(db.String(20))
|
|
platform = db.Column(db.String(50), default='pc')
|
|
current_price = db.Column(db.Float)
|
|
current_price_shop = db.Column(db.String(100))
|
|
historical_low = db.Column(db.Float)
|
|
release_date = db.Column(db.DateTime)
|
|
release_date = db.Column(db.DateTime)
|
|
itad_slug = db.Column(db.String(200))
|
|
steam_description_en = db.Column(db.Text)
|
|
steam_description_de = db.Column(db.Text)
|
|
|
|
# with users.id
|
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
owner = db.relationship(
|
|
'User',
|
|
back_populates='games'
|
|
)
|
|
|
|
redeem_tokens = db.relationship(
|
|
'RedeemToken',
|
|
back_populates='game',
|
|
cascade='all, delete-orphan',
|
|
passive_deletes=True
|
|
)
|
|
|
|
class RedeemToken(db.Model):
|
|
__tablename__ = 'redeem_tokens'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
token = db.Column(db.String(17), unique=True, nullable=False)
|
|
expires = db.Column(db.DateTime(timezone=True), nullable=False)
|
|
total_hours = db.Column(db.Integer, nullable=False)
|
|
|
|
# ForeignKey with CASCADE
|
|
game_id = db.Column(
|
|
db.Integer,
|
|
db.ForeignKey('games.id', ondelete='CASCADE'),
|
|
nullable=False
|
|
)
|
|
|
|
game = db.relationship('Game', back_populates='redeem_tokens')
|
|
|
|
def is_expired(self):
|
|
# use timeszone (from .env)
|
|
local_tz = pytz.timezone(os.getenv('TZ', 'UTC'))
|
|
now = datetime.now(local_tz)
|
|
return now > self.expires.astimezone(local_tz)
|
|
|
|
class GameForm(FlaskForm):
|
|
name = StringField('Name', [validators.DataRequired()])
|
|
steam_key = StringField('Steam Key')
|
|
status = SelectField('Status', choices=[
|
|
('nicht eingelöst', 'Nicht eingelöst'),
|
|
('eingelöst', 'Eingelöst'),
|
|
('geschenkt', 'Geschenkt')
|
|
])
|
|
recipient = StringField('Empfänger')
|
|
notes = TextAreaField('Notizen')
|
|
url = StringField('Store URL')
|
|
redeem_date = StringField('Einlösedatum')
|
|
steam_appid = StringField('Steam App ID')
|
|
|
|
PLATFORM_CHOICES = [
|
|
('steam', 'Steam'),
|
|
('gog', 'GOG'),
|
|
('xbox', 'XBox'),
|
|
('playstation', 'PlayStation'),
|
|
('switch', 'Nintendo Switch'),
|
|
('other', 'Other'),
|
|
('pc', 'PC')
|
|
]
|
|
|
|
STATUS_CHOICES = [
|
|
('nicht eingelöst', 'Nicht eingelöst'),
|
|
('eingelöst', 'Eingelöst'),
|
|
('geschenkt', 'Geschenkt')
|
|
]
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return db.session.get(User, int(user_id))
|
|
|
|
def extract_steam_appid(url):
|
|
match = re.search(r'store\.steampowered\.com/app/(\d+)', url or '')
|
|
return match.group(1) if match else ''
|
|
|
|
# 404
|
|
def get_or_404(model, id):
|
|
instance = db.session.get(model, id)
|
|
if not instance:
|
|
abort(404)
|
|
return instance
|
|
|
|
# Admin Audit Helper
|
|
def log_activity(user_id, action, details=None):
|
|
"""
|
|
Store an activity log entry for auditing purposes.
|
|
"""
|
|
log = ActivityLog(
|
|
user_id=user_id,
|
|
action=action,
|
|
details=details
|
|
)
|
|
db.session.add(log)
|
|
db.session.commit()
|
|
|
|
# Game Infos Helper
|
|
def fetch_steam_data(appid, lang='en'):
|
|
lang_map = {
|
|
'en': 'english',
|
|
'de': 'german'
|
|
}
|
|
steam_lang = lang_map.get(lang, 'english')
|
|
try:
|
|
response = requests.get(
|
|
"https://store.steampowered.com/api/appdetails",
|
|
params={"appids": appid, "l": steam_lang},
|
|
timeout=15
|
|
)
|
|
data = response.json().get(str(appid), {})
|
|
if data.get("success"):
|
|
return {
|
|
"name": data["data"].get("name"),
|
|
"detailed_description": data["data"].get("detailed_description"),
|
|
"release_date": data["data"].get("release_date", {}).get("date"),
|
|
}
|
|
except Exception as e:
|
|
app.logger.error(f"Steam API error: {str(e)}")
|
|
return None
|
|
|
|
def parse_steam_release_date(date_str):
|
|
"""Parsing Steam-Release-Date (the german us thingy, you know)"""
|
|
import locale
|
|
from datetime import datetime
|
|
|
|
# try german format
|
|
try:
|
|
locale.setlocale(locale.LC_TIME, "de_DE.UTF-8")
|
|
return datetime.strptime(date_str, "%d. %b. %Y")
|
|
except Exception:
|
|
pass
|
|
# Fallback: okay lets try the english one
|
|
try:
|
|
locale.setlocale(locale.LC_TIME, "en_US.UTF-8")
|
|
return datetime.strptime(date_str, "%d %b, %Y")
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def fetch_itad_slug(steam_appid: int) -> str | None:
|
|
api_key = os.getenv("ITAD_API_KEY")
|
|
if not api_key or api_key.strip() == "your-secret-key-here":
|
|
app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.")
|
|
return None
|
|
try:
|
|
response = requests.get(
|
|
"https://api.isthereanydeal.com/games/lookup/v1",
|
|
params={"key": api_key, "appid": steam_appid, "platform": "steam"},
|
|
timeout=10
|
|
)
|
|
data = response.json()
|
|
return data.get("game", {}).get("slug")
|
|
except Exception as e:
|
|
app.logger.error(f"ITAD Error: {str(e)}")
|
|
return None
|
|
|
|
|
|
def fetch_itad_game_id(steam_appid: int) -> str | None:
|
|
api_key = os.getenv("ITAD_API_KEY")
|
|
if not api_key or api_key.strip() == "your-secret-key-here":
|
|
app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.")
|
|
return None
|
|
|
|
try:
|
|
response = requests.get(
|
|
"https://api.isthereanydeal.com/games/lookup/v1",
|
|
params={"key": api_key, "appid": steam_appid, "platform": "steam"},
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if data.get("found") and data.get("game") and data["game"].get("id"):
|
|
return data["game"]["id"]
|
|
app.logger.error(f"ITAD Response Error: {data}")
|
|
return None
|
|
except Exception as e:
|
|
app.logger.error(f"ITAD Error: {str(e)}")
|
|
return None
|
|
|
|
def fetch_itad_prices(game_id: str) -> dict | None:
|
|
api_key = os.getenv("ITAD_API_KEY")
|
|
country = os.getenv("ITAD_COUNTRY", "DE")
|
|
|
|
if not api_key or api_key.strip() == "your-secret-key-here":
|
|
app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.")
|
|
return None
|
|
|
|
try:
|
|
response = requests.post(
|
|
"https://api.isthereanydeal.com/games/prices/v3",
|
|
params={
|
|
"key": api_key,
|
|
"country": country,
|
|
"shops": "steam",
|
|
"vouchers": "false"
|
|
},
|
|
json=[game_id],
|
|
headers={"Content-Type": "application/json"},
|
|
timeout=15
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()[0]
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"ITAD-Preisabfrage fehlgeschlagen: {str(e)}")
|
|
return None
|
|
|
|
|
|
@app.route('/')
|
|
@login_required
|
|
def index():
|
|
search_query = request.args.get('q', '')
|
|
query = Game.query.filter_by(user_id=current_user.id)
|
|
|
|
if search_query:
|
|
query = query.filter(Game.name.ilike(f'%{search_query}%'))
|
|
|
|
games = query.order_by(Game.created_at.desc()).all()
|
|
return render_template('index.html',
|
|
games=games,
|
|
format_date=lambda dt: dt.strftime('%d.%m.%Y') if dt else '',
|
|
search_query=search_query)
|
|
|
|
@app.route('/set-lang/<lang>')
|
|
def set_lang(lang):
|
|
if lang in SUPPORTED_LANGUAGES:
|
|
session['lang'] = lang
|
|
session.permanent = True
|
|
return redirect(request.referrer or url_for('index'))
|
|
|
|
@app.route('/set-theme/<theme>')
|
|
def set_theme(theme):
|
|
resp = make_response('', 204)
|
|
resp.set_cookie('theme', theme, max_age=60*60*24*365)
|
|
return resp
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if current_user.is_authenticated: # Prevent already logged-in users from accessing login page
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
remember = request.form.get('remember_me') == 'true'
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
|
|
if user and check_password_hash(user.password, password):
|
|
# Pass remember=True to login_user and set duration
|
|
# The duration will be taken from app.config['REMEMBER_COOKIE_DURATION']
|
|
login_user(user, remember=remember)
|
|
|
|
# Log activity
|
|
log_activity(user.id, 'user_login', f"User '{user.username}' logged in.")
|
|
|
|
next_page = request.args.get('next')
|
|
# Add security check for next_page to prevent open redirect
|
|
if not next_page or urlparse(next_page).netloc != '':
|
|
next_page = url_for('index')
|
|
flash(translate('Logged in successfully.'), 'success')
|
|
return redirect(next_page)
|
|
else:
|
|
flash(translate('Invalid username or password.'), 'danger')
|
|
return render_template('login.html')
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if not app.config['REGISTRATION_ENABLED']:
|
|
abort(403)
|
|
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
existing_user = User.query.filter_by(username=username).first()
|
|
if existing_user:
|
|
flash(translate('Username already exists'), 'error')
|
|
return redirect(url_for('register'))
|
|
|
|
# make the first user admin
|
|
is_admin = User.query.count() == 0
|
|
|
|
new_user = User(
|
|
username=username,
|
|
password=generate_password_hash(password),
|
|
is_admin=is_admin
|
|
)
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
login_user(new_user)
|
|
flash(translate('Registration successful'), 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
return render_template('register.html')
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/change-password', methods=['GET', 'POST'])
|
|
@login_required
|
|
def change_password():
|
|
if request.method == 'POST':
|
|
current_password = request.form['current_password']
|
|
new_password = request.form['new_password']
|
|
confirm_password = request.form['confirm_password']
|
|
|
|
if not check_password_hash(current_user.password, current_password):
|
|
flash(translate('Current passwort is wrong'), 'danger')
|
|
return redirect(url_for('change_password'))
|
|
|
|
if new_password != confirm_password:
|
|
flash(translate('New Passwords are not matching'), 'danger')
|
|
return redirect(url_for('change_password'))
|
|
|
|
current_user.password = generate_password_hash(new_password)
|
|
db.session.commit()
|
|
flash(translate('Password changed successfully', session.get('lang', 'en')), 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
return render_template('change_password.html')
|
|
|
|
@app.route('/add', methods=['GET', 'POST'])
|
|
@login_required
|
|
def add_game():
|
|
if request.method == 'POST':
|
|
try:
|
|
url = request.form.get('url', '')
|
|
steam_appid = request.form.get('steam_appid', '').strip()
|
|
|
|
if not steam_appid:
|
|
steam_appid = extract_steam_appid(url)
|
|
|
|
steam_key = request.form['steam_key']
|
|
if Game.query.filter_by(steam_key=steam_key).first():
|
|
flash(translate('Steam Key already exists!'), 'error')
|
|
return redirect(url_for('add_game'))
|
|
|
|
new_game = Game(
|
|
name=request.form['name'],
|
|
steam_key=steam_key,
|
|
status=request.form['status'],
|
|
recipient=request.form.get('recipient', ''),
|
|
notes=request.form.get('notes', ''),
|
|
url=url,
|
|
steam_appid=steam_appid,
|
|
redeem_date=datetime.strptime(request.form['redeem_date'], '%Y-%m-%d') if request.form['redeem_date'] else None,
|
|
user_id=current_user.id
|
|
)
|
|
|
|
db.session.add(new_game)
|
|
db.session.commit()
|
|
flash(translate('Game added successfully!'), 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
except IntegrityError as e:
|
|
db.session.rollback()
|
|
if "UNIQUE constraint failed: game.steam_key" in str(e):
|
|
flash(translate('Steam Key already exists!'), 'error')
|
|
else:
|
|
flash(translate('Database error: %(error)s', error=str(e)), 'error')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(translate('Error: %(error)s', error=str(e)), 'error')
|
|
|
|
return render_template(
|
|
'add_game.html',
|
|
platforms=PLATFORM_CHOICES,
|
|
statuses=STATUS_CHOICES
|
|
)
|
|
|
|
|
|
@app.route('/edit/<int:game_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_game(game_id):
|
|
# Eager Loading für Tokens
|
|
game = Game.query.options(joinedload(Game.redeem_tokens)).get_or_404(game_id)
|
|
|
|
def safe_parse_date(date_str):
|
|
try:
|
|
naive = datetime.strptime(date_str, '%Y-%m-%d') if date_str else None
|
|
return local_tz.localize(naive) if naive else None
|
|
except ValueError:
|
|
return None
|
|
|
|
if request.method == 'POST':
|
|
try:
|
|
# Validation
|
|
if not request.form.get('name') or not request.form.get('steam_key'):
|
|
flash(translate('Name and Steam Key are required'), 'error')
|
|
return redirect(url_for('edit_game', game_id=game_id))
|
|
|
|
# Duplicate check
|
|
existing = Game.query.filter(
|
|
Game.steam_key == request.form['steam_key'],
|
|
Game.id != game.id,
|
|
Game.user_id == current_user.id
|
|
).first()
|
|
if existing:
|
|
flash(translate('Steam Key already exists'), 'error')
|
|
return redirect(url_for('edit_game', game_id=game_id))
|
|
|
|
# Update fields
|
|
game.name = request.form['name']
|
|
game.steam_key = request.form['steam_key']
|
|
game.status = request.form['status']
|
|
game.platform = request.form.get('platform', 'pc')
|
|
game.recipient = request.form.get('recipient', '')
|
|
game.notes = request.form.get('notes', '')
|
|
game.url = request.form.get('url', '')
|
|
game.steam_appid = request.form.get('steam_appid', '')
|
|
game.redeem_date = safe_parse_date(request.form.get('redeem_date', ''))
|
|
|
|
# Token-Logic
|
|
if game.status == 'geschenkt':
|
|
# Vorhandene Tokens löschen
|
|
RedeemToken.query.filter_by(game_id=game.id).delete()
|
|
|
|
# Generate new Token
|
|
token = secrets.token_urlsafe(12)[:17]
|
|
expires = datetime.now(local_tz) + timedelta(hours=24)
|
|
new_token = RedeemToken(
|
|
token=token,
|
|
game_id=game.id,
|
|
expires=expires,
|
|
total_hours=24
|
|
)
|
|
db.session.add(new_token)
|
|
|
|
db.session.commit()
|
|
flash(translate('Changes saved successfully'), 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
except IntegrityError as e:
|
|
db.session.rollback()
|
|
app.logger.error(f"IntegrityError: {traceback.format_exc()}")
|
|
flash(translate('Database error: {error}', error=str(e.orig)), 'error')
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
app.logger.error(f"Unexpected error: {traceback.format_exc()}")
|
|
flash(translate('Unexpected error: {error}', error=str(e)), 'error')
|
|
|
|
return render_template(
|
|
'edit_game.html',
|
|
game=game,
|
|
platforms=PLATFORM_CHOICES,
|
|
statuses=STATUS_CHOICES,
|
|
redeem_date=game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else ''
|
|
)
|
|
|
|
|
|
@app.route('/delete/<int:game_id>', methods=['POST'])
|
|
@login_required
|
|
def delete_game(game_id):
|
|
game = Game.query.get_or_404(game_id)
|
|
db.session.delete(game)
|
|
db.session.commit()
|
|
flash(translate('Game deleted successfully'), 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/export', methods=['GET'])
|
|
@login_required
|
|
def export_games():
|
|
games = Game.query.filter_by(user_id=current_user.id).all()
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
writer.writerow(['Name', 'Steam Key', 'Status', 'Recipient', 'Notes', 'URL', 'Created', 'Redeem by', 'Steam AppID'])
|
|
|
|
for game in games:
|
|
writer.writerow([
|
|
game.name,
|
|
game.steam_key,
|
|
game.status,
|
|
game.recipient,
|
|
game.notes,
|
|
game.url,
|
|
game.created_at.strftime('%Y-%m-%d %H:%M:%S') if game.created_at else '',
|
|
game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '',
|
|
game.steam_appid
|
|
])
|
|
|
|
output.seek(0)
|
|
return send_file(
|
|
io.BytesIO(output.getvalue().encode('utf-8')),
|
|
mimetype='text/csv',
|
|
as_attachment=True,
|
|
download_name='games_export.csv'
|
|
)
|
|
|
|
|
|
@app.route('/export_pdf')
|
|
@login_required
|
|
def export_pdf():
|
|
excluded_statuses = ['eingelöst', 'verschenkt']
|
|
|
|
games = Game.query.filter(
|
|
Game.user_id == current_user.id,
|
|
Game.status.notin_(excluded_statuses)
|
|
).order_by(Game.created_at.desc()).all()
|
|
|
|
buffer = io.BytesIO()
|
|
doc = SimpleDocTemplate(buffer,
|
|
pagesize=landscape(A4),
|
|
leftMargin=40,
|
|
rightMargin=40,
|
|
topMargin=40,
|
|
bottomMargin=40
|
|
)
|
|
|
|
styles = getSampleStyleSheet()
|
|
elements = []
|
|
img_height = 2*cm
|
|
|
|
# Title
|
|
elements.append(Paragraph(
|
|
translate("Game List (without Keys)", lang=session.get('lang', 'en')),
|
|
styles['Title']
|
|
))
|
|
elements.append(Spacer(1, 12))
|
|
|
|
# Table header
|
|
col_widths = [
|
|
5*cm, 10*cm, 6*cm, 3*cm
|
|
]
|
|
data = [[
|
|
Paragraph('<b>Cover</b>', styles['Normal']),
|
|
Paragraph('<b>Name</b>', styles['Normal']),
|
|
Paragraph('<b>Shop-Link</b>', styles['Normal']),
|
|
Paragraph('<b>Einlösen bis</b>', styles['Normal'])
|
|
]]
|
|
|
|
for game in games:
|
|
img = None
|
|
if game.steam_appid:
|
|
try:
|
|
img_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{game.steam_appid}/header.jpg"
|
|
img_data = io.BytesIO(requests.get(img_url, timeout=5).content)
|
|
img = Image(img_data, width=3*cm, height=img_height)
|
|
except Exception:
|
|
img = Paragraph('', styles['Normal'])
|
|
elif game.url and 'gog.com' in game.url:
|
|
try:
|
|
img_path = os.path.join(app.root_path, 'static', 'gog_logo.webp')
|
|
img = Image(img_path, width=3*cm, height=img_height)
|
|
except Exception:
|
|
img = Paragraph('', styles['Normal'])
|
|
|
|
data.append([
|
|
img or '',
|
|
Paragraph(game.name, styles['Normal']),
|
|
Paragraph(game.url or '', styles['Normal']),
|
|
game.redeem_date.strftime('%d.%m.%y') if game.redeem_date else ''
|
|
])
|
|
|
|
# Table format
|
|
table = Table(data, colWidths=col_widths, repeatRows=1)
|
|
table.setStyle(TableStyle([
|
|
('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'),
|
|
('FONTSIZE', (0,0), (-1,0), 8),
|
|
('FONTSIZE', (0,1), (-1,-1), 8),
|
|
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
|
|
('ALIGN', (0,0), (-1,-1), 'LEFT'),
|
|
('GRID', (0,0), (-1,-1), 0.5, colors.lightgrey),
|
|
('WORDWRAP', (1,1), (1,-1), 'CJK'),
|
|
]))
|
|
|
|
elements.append(table)
|
|
doc.build(elements)
|
|
|
|
buffer.seek(0)
|
|
return send_file(
|
|
buffer,
|
|
mimetype='application/pdf',
|
|
as_attachment=True,
|
|
download_name=f'game_export_{datetime.now().strftime("%Y%m%d")}.pdf'
|
|
)
|
|
|
|
|
|
@app.route('/import', methods=['GET', 'POST'])
|
|
@login_required
|
|
def import_games():
|
|
if request.method == 'POST':
|
|
file = request.files.get('file')
|
|
|
|
if file and file.filename.endswith('.csv'):
|
|
stream = io.StringIO(file.stream.read().decode("UTF8"))
|
|
reader = csv.DictReader(stream)
|
|
new_games = 0
|
|
duplicates = 0
|
|
|
|
try:
|
|
with db.session.begin_nested():
|
|
for row in reader:
|
|
steam_key = row['Steam Key'].strip()
|
|
|
|
if Game.query.filter_by(steam_key=steam_key).first():
|
|
duplicates += 1
|
|
continue
|
|
|
|
game = Game(
|
|
name=row['Name'],
|
|
steam_key=steam_key,
|
|
status=row['Status'],
|
|
recipient=row.get('Recipient', ''),
|
|
notes=row.get('Notes', ''),
|
|
url=row.get('URL', ''),
|
|
created_at=datetime.strptime(row['Created'], '%Y-%m-%d %H:%M:%S') if row.get('Created') else datetime.utcnow(),
|
|
redeem_date=datetime.strptime(row['Redeem by'], '%Y-%m-%d') if row.get('Redeem by') else None,
|
|
steam_appid=row.get('Steam AppID', ''),
|
|
user_id=current_user.id
|
|
)
|
|
|
|
db.session.add(game)
|
|
new_games += 1
|
|
|
|
db.session.commit()
|
|
|
|
flash(translate("new_games_imported", new=new_games, dup=duplicates), 'success')
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(translate('Import error: {error}', error=str(e)), 'danger')
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
flash(translate('Please upload a valid CSV file.'), 'danger')
|
|
|
|
return render_template('import.html')
|
|
|
|
@app.route('/generate_redeem/<int:game_id>', methods=['POST'])
|
|
@login_required
|
|
def generate_redeem(game_id):
|
|
game = Game.query.get_or_404(game_id)
|
|
if game.user_id != current_user.id or game.status != 'geschenkt':
|
|
return jsonify({'error': translate('Forbidden')}), 403
|
|
|
|
try:
|
|
RedeemToken.query.filter_by(game_id=game_id).delete()
|
|
token = secrets.token_urlsafe(12)[:17]
|
|
expires = datetime.now(local_tz) + timedelta(hours=24)
|
|
new_token = RedeemToken(
|
|
token=token,
|
|
game_id=game_id,
|
|
expires=expires,
|
|
total_hours=24
|
|
)
|
|
db.session.add(new_token)
|
|
db.session.commit()
|
|
redeem_url = url_for('redeem', token=token, _external=True)
|
|
message = translate(
|
|
'Redeem link generated: <a href="{url}" target="_blank">{url}</a>',
|
|
url=redeem_url
|
|
)
|
|
return jsonify({'url': redeem_url, 'message': message})
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/redeem/<token>', endpoint='redeem')
|
|
def redeem_page(token):
|
|
redeem_token = RedeemToken.query.filter_by(token=token).first()
|
|
if not redeem_token:
|
|
abort(404)
|
|
expires_utc = redeem_token.expires.astimezone(pytz.UTC)
|
|
if datetime.now(pytz.UTC) > expires_utc:
|
|
db.session.delete(redeem_token)
|
|
db.session.commit()
|
|
abort(404)
|
|
game = Game.query.get(redeem_token.game_id)
|
|
redeem_token.used = True
|
|
db.session.commit()
|
|
|
|
# which Plattform
|
|
if game.platform == "steam" or game.steam_appid:
|
|
platform_link = 'https://store.steampowered.com/account/registerkey?key='
|
|
platform_label = "Steam"
|
|
elif game.platform == "gog":
|
|
platform_link = 'https://www.gog.com/redeem/'
|
|
platform_label = "GOG"
|
|
elif game.platform == "xbox":
|
|
platform_link = 'https://redeem.microsoft.com/'
|
|
platform_label = "XBOX"
|
|
elif game.platform == "playstation":
|
|
platform_link = 'https://store.playstation.com/redeem'
|
|
platform_label = "PlayStation"
|
|
else:
|
|
platform_link = '#'
|
|
platform_label = game.platform.capitalize() if game.platform else "Unknown"
|
|
|
|
return render_template(
|
|
'redeem.html',
|
|
game=game,
|
|
redeem_token=redeem_token,
|
|
expires_timestamp=int(expires_utc.timestamp() * 1000),
|
|
platform_link=platform_link,
|
|
platform_label=platform_label
|
|
)
|
|
|
|
@app.route('/admin/users')
|
|
@login_required
|
|
@admin_required
|
|
def admin_users():
|
|
users = User.query.all()
|
|
return render_template('admin_users.html', users=users)
|
|
|
|
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_delete_user(user_id):
|
|
if current_user.id == user_id:
|
|
flash(translate('You cannot delete yourself'), 'error')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
user = User.query.get_or_404(user_id)
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
log_activity(
|
|
current_user.id,
|
|
'user_deleted',
|
|
f"Deleted user: {user.username} (ID: {user.id})"
|
|
)
|
|
|
|
flash(translate('User deleted successfully'), 'success')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
@app.route('/admin/users/reset_password/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_reset_password(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
new_password = secrets.token_urlsafe(8)
|
|
user.password = generate_password_hash(new_password)
|
|
db.session.commit()
|
|
|
|
log_activity(
|
|
current_user.id,
|
|
'user_newpassword',
|
|
f"New password for user: {user.username} (ID: {user.id})"
|
|
)
|
|
|
|
|
|
flash(
|
|
translate('New password for {username}: {password}',
|
|
username=user.username,
|
|
password=new_password),
|
|
'info'
|
|
)
|
|
return redirect(url_for('admin_users'))
|
|
|
|
@app.route('/admin/audit-logs')
|
|
@login_required
|
|
@admin_required
|
|
def admin_audit_logs():
|
|
page = request.args.get('page', 1, type=int)
|
|
logs = ActivityLog.query.order_by(ActivityLog.timestamp.desc()).paginate(page=page, per_page=20)
|
|
return render_template('admin_audit_logs.html', logs=logs)
|
|
|
|
@app.route('/game/<int:game_id>/update', methods=['POST'])
|
|
@login_required
|
|
def update_game_data(game_id):
|
|
game = Game.query.get_or_404(game_id)
|
|
|
|
# 1. Getting Steam AppID
|
|
steam_appid = request.form.get('steam_appid', '').strip()
|
|
app.logger.info(f"🚀 Update gestartet für Game {game_id} mit AppID: {steam_appid}")
|
|
|
|
# 2. Steam-Data (Multilingual)
|
|
if steam_appid:
|
|
try:
|
|
app.logger.debug(f"🔍 Fetching Steam data for AppID: {steam_appid}")
|
|
for lang in ['en', 'de']:
|
|
steam_data = fetch_steam_data(steam_appid, lang=lang)
|
|
if steam_data:
|
|
if lang == 'en' and steam_data.get("name"):
|
|
game.name = steam_data.get("name", game.name)
|
|
setattr(game, f'steam_description_{lang}', steam_data.get("detailed_description") or "No Infos available")
|
|
if lang == 'en':
|
|
date_str = steam_data.get("release_date", {})
|
|
if date_str:
|
|
parsed_date = parse_steam_release_date(date_str)
|
|
if parsed_date:
|
|
game.release_date = local_tz.localize(parsed_date)
|
|
else:
|
|
app.logger.warning(f"Could not parse Steam release date: {date_str}")
|
|
app.logger.info("✅ Steam data successfully updated")
|
|
except Exception as e:
|
|
app.logger.error(f"💥 Kritischer Steam-Fehler: {str(e)}", exc_info=True)
|
|
flash(translate('Error during Steam query'), 'danger')
|
|
else:
|
|
app.logger.warning("⚠️ Keine Steam-AppID vorhanden, Steam-Daten werden nicht aktualisiert")
|
|
flash(translate('Steam-AppID missing, no Steam Data transferred'), 'warning')
|
|
|
|
|
|
# ITAD-Slug doings and such
|
|
itad_slug = fetch_itad_slug(steam_appid)
|
|
if itad_slug:
|
|
game.itad_slug = itad_slug
|
|
|
|
# 4. ITAD-Prices
|
|
price_data = None
|
|
if steam_appid:
|
|
try:
|
|
app.logger.debug("🔄 Starte ITAD-Abfrage...")
|
|
game.itad_game_id = fetch_itad_game_id(steam_appid)
|
|
|
|
if game.itad_game_id:
|
|
app.logger.info(f"🔑 ITAD Game ID: {game.itad_game_id}")
|
|
price_data = fetch_itad_prices(game.itad_game_id)
|
|
|
|
if price_data:
|
|
# Best price right now
|
|
all_deals = price_data.get("deals", [])
|
|
if all_deals:
|
|
best_deal = min(
|
|
all_deals,
|
|
key=lambda deal: deal.get("price", {}).get("amount", float('inf'))
|
|
)
|
|
game.current_price = best_deal.get("price", {}).get("amount")
|
|
game.current_price_shop = best_deal.get("shop", {}).get("name")
|
|
app.logger.info(f"💶 Current Best: {game.current_price}€ at {game.current_price_shop}")
|
|
else:
|
|
game.current_price = None
|
|
game.current_price_shop = None
|
|
|
|
app.logger.info(f"💶 Current Best: {game.current_price}€")
|
|
|
|
game.historical_low = price_data.get("historyLow", {}).get("all", {}).get("amount")
|
|
app.logger.info(f"📉 Historical Low: {game.historical_low}€")
|
|
else:
|
|
app.logger.warning("⚠️ Keine ITAD-Preisdaten erhalten")
|
|
else:
|
|
app.logger.warning("⚠️ Keine ITAD Game ID erhalten")
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"💥 ITAD-API-Fehler: {str(e)}", exc_info=True)
|
|
flash(translate('Fehler bei Preisabfrage'), 'danger')
|
|
|
|
try:
|
|
db.session.commit()
|
|
flash(translate('Externe Daten erfolgreich aktualisiert!'), 'success')
|
|
app.logger.info("💾 Datenbank-Update erfolgreich")
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
app.logger.error(f"💥 Datenbank-Fehler: {str(e)}", exc_info=True)
|
|
flash(translate('Fehler beim Speichern der Daten'), 'danger')
|
|
|
|
return redirect(url_for('edit_game', game_id=game_id))
|
|
|
|
|
|
@app.route('/game/<int:game_id>')
|
|
@login_required
|
|
def game_details(game_id):
|
|
game = Game.query.get_or_404(game_id)
|
|
return render_template('game_details.html', game=game)
|
|
|
|
|
|
@app.route('/debug-session')
|
|
def debug_session():
|
|
return jsonify(dict(session))
|
|
|
|
# Apprise Notifications
|
|
import apprise
|
|
|
|
def send_apprise_notification(user, game):
|
|
apprise_urls = os.getenv('APPRISE_URLS', '').strip()
|
|
if not apprise_urls:
|
|
app.logger.error("No APPRISE_URLS configured")
|
|
return False
|
|
|
|
apobj = apprise.Apprise()
|
|
for url in apprise_urls.replace(',', '\n').splitlines():
|
|
if url.strip():
|
|
apobj.add(url.strip())
|
|
|
|
edit_url = url_for('edit_game', game_id=game.id, _external=True)
|
|
result = apobj.notify(
|
|
title="Steam-Key läuft ab!",
|
|
body=f"Dein Key für '{game.name}' läuft in weniger als 48 Stunden ab!\n\nLink: {edit_url}",
|
|
)
|
|
return result
|
|
|
|
def send_notification(user, game):
|
|
return send_apprise_notification(user, game)
|
|
|
|
def check_expiring_keys():
|
|
now = datetime.now(local_tz)
|
|
expiry_threshold = now + timedelta(hours=48)
|
|
|
|
stmt = select(Game).where(
|
|
Game.status != 'eingelöst',
|
|
Game.redeem_date <= expiry_threshold,
|
|
Game.redeem_date > now
|
|
)
|
|
|
|
expiring_games = db.session.execute(stmt).scalars().all()
|
|
|
|
for game in expiring_games:
|
|
user = User.query.get(game.user_id)
|
|
if user.notification_service and user.notification_service != 'none':
|
|
send_notification(user, game)
|
|
|
|
|
|
# Optional: cleaning up old tokens
|
|
def cleanup_expired_tokens():
|
|
with app.app_context():
|
|
try:
|
|
now = datetime.now(local_tz)
|
|
expired = RedeemToken.query.filter(RedeemToken.expires < now).all()
|
|
for token in expired:
|
|
db.session.delete(token)
|
|
db.session.commit()
|
|
app.logger.info(f"Cleaned up {len(expired)} expired tokens.")
|
|
except Exception as e:
|
|
app.logger.error(f"Error during cleanup_expired_tokens: {e}")
|
|
db.session.rollback()
|
|
|
|
|
|
# Scheduler start
|
|
scheduler = BackgroundScheduler(timezone=str(local_tz))
|
|
|
|
def check_expiring_keys_job():
|
|
with app.app_context():
|
|
check_expiring_keys()
|
|
|
|
def cleanup_expired_tokens_job():
|
|
with app.app_context():
|
|
cleanup_expired_tokens()
|
|
|
|
# Add Jobs
|
|
scheduler.add_job(
|
|
check_expiring_keys_job,
|
|
'interval',
|
|
hours=int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12)),
|
|
id='check_expiring_keys'
|
|
)
|
|
scheduler.add_job(
|
|
cleanup_expired_tokens_job,
|
|
'interval',
|
|
hours=1,
|
|
id='cleanup_expired_tokens'
|
|
)
|
|
# price updates
|
|
def update_prices_job():
|
|
with app.app_context():
|
|
games = Game.query.filter(Game.steam_appid.isnot(None)).all()
|
|
for game in games:
|
|
# just update prices
|
|
itad_data = fetch_itad_data(f"app/{game.steam_appid}")
|
|
if itad_data:
|
|
game.current_price = itad_data.get('price_new')
|
|
game.historical_low = itad_data.get('price_low', {}).get('amount')
|
|
db.session.commit()
|
|
|
|
scheduler.add_job(
|
|
update_prices_job,
|
|
'interval',
|
|
hours=12,
|
|
id='update_prices'
|
|
)
|
|
|
|
|
|
def update_missing_steam_descriptions_job():
|
|
with app.app_context():
|
|
games = Game.query.filter(
|
|
(Game.steam_description_en == None) | (Game.steam_description_en == '') |
|
|
(Game.steam_description_de == None) | (Game.steam_description_de == '')
|
|
).all()
|
|
for game in games:
|
|
for lang in ['en', 'de']:
|
|
if not getattr(game, f'steam_description_{lang}', None):
|
|
steam_data = fetch_steam_data(game.steam_appid, lang=lang)
|
|
if steam_data:
|
|
setattr(game, f'steam_description_{lang}', steam_data.get('detailed_description'))
|
|
db.session.commit()
|
|
|
|
scheduler.add_job(
|
|
update_missing_steam_descriptions_job,
|
|
'interval',
|
|
hours=24,
|
|
id='update_missing_steam_descriptions'
|
|
)
|
|
|
|
# start Scheduler
|
|
scheduler.start()
|
|
atexit.register(lambda: scheduler.shutdown(wait=False))
|
|
|
|
if __name__ == '__main__':
|
|
with app.app_context():
|
|
db.create_all()
|
|
app.run(debug=True, host='0.0.0.0', port=5000)
|
|
|