import os import json import random import secrets from time import strftime # Для (Time) from datetime import timedelta import asyncio import aiosqlite # BEGIN redis (оставлено синхронным как в исходнике) import redis put = os.path.dirname(os.path.realpath(__file__)) + "/" with open(f"{put}settings.json", "r", encoding="utf-8") as read_file: settings = json.load(read_file) r = redis.StrictRedis( host=settings["Redis"]["host"], port=settings["Redis"]["port"], password=settings["Redis"]["password"], encoding="utf-8", decode_responses=True, db=settings["Redis"]["db"], ) # END redis # ------------------------- # SQLite (async) backend # ------------------------- _SQLITE_PATH = ( settings.get("Sqlite", {}).get("path") or os.path.join(put, "recording_spark.sqlite3") ) # Если раньше в settings.json было Mariadb.select_commit — оставим аналог _SQLITE_SELECT_COMMIT = bool(settings.get("Sqlite", {}).get("select_commit", False)) _db: aiosqlite.Connection | None = None _db_init_lock = asyncio.Lock() _db_initialized = False async def _connect_db() -> aiosqlite.Connection: """ Возвращает глобальное соединение aiosqlite, инициализирует схему при первом обращении. """ global _db, _db_initialized async with _db_init_lock: if _db is None: _db = await aiosqlite.connect(_SQLITE_PATH) _db.row_factory = aiosqlite.Row await _db.execute("PRAGMA foreign_keys = ON;") await _db.execute("PRAGMA journal_mode = WAL;") await _db.execute("PRAGMA synchronous = NORMAL;") await _db.execute("PRAGMA busy_timeout = 5000;") if not _db_initialized: await _init_schema(_db) _db_initialized = True return _db async def _init_schema(db: aiosqlite.Connection) -> None: """ Создаёт таблицы и индексы в SQLite. Заменяет MySQL/MariaDB схему recording_spark.* на rs_* (без schema prefix). """ await db.executescript( """ CREATE TABLE IF NOT EXISTS rs_group ( group_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, rights INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS rs_user ( user_id INTEGER PRIMARY KEY AUTOINCREMENT, user_name TEXT NOT NULL, email TEXT, password TEXT, avatar INTEGER NOT NULL DEFAULT 0, active INTEGER NOT NULL DEFAULT 0, group_id INTEGER NOT NULL, FOREIGN KEY (group_id) REFERENCES rs_group (group_id) ); CREATE TABLE IF NOT EXISTS rs_item ( item_id INTEGER PRIMARY KEY AUTOINCREMENT, item_name TEXT NOT NULL, status TEXT, user_id_1 INTEGER NOT NULL, user_id_2 INTEGER NOT NULL, icon TEXT DEFAULT NULL, comments TEXT, inventory_id INTEGER, FOREIGN KEY (user_id_1) REFERENCES rs_user (user_id), FOREIGN KEY (user_id_2) REFERENCES rs_user (user_id) ); CREATE TABLE IF NOT EXISTS rs_ownership_over_group ( group_id INTEGER NOT NULL, user_id INTEGER NOT NULL, FOREIGN KEY (group_id) REFERENCES rs_group (group_id), FOREIGN KEY (user_id) REFERENCES rs_user (user_id) ); CREATE INDEX IF NOT EXISTS idx_rs_user_group_id ON rs_user(group_id); CREATE INDEX IF NOT EXISTS idx_rs_item_user1 ON rs_item(user_id_1); CREATE INDEX IF NOT EXISTS idx_rs_item_user2 ON rs_item(user_id_2); CREATE INDEX IF NOT EXISTS idx_rs_oog_group_id ON rs_ownership_over_group(group_id); CREATE INDEX IF NOT EXISTS idx_rs_oog_user_id ON rs_ownership_over_group(user_id); """ ) await db.commit() async def close_db() -> None: global _db, _db_initialized if _db is not None: await _db.close() _db = None _db_initialized = False async def ping_db() -> None: """ В MariaDB это был ping/reconnect. В SQLite (aiosqlite) просто гарантируем, что соединение поднято. """ await _connect_db() async def _fetchone(q: str, params: tuple = ()) -> aiosqlite.Row | None: db = await _connect_db() if _SQLITE_SELECT_COMMIT: await db.commit() async with db.execute(q, params) as cur: return await cur.fetchone() async def _fetchall(q: str, params: tuple = ()) -> list[aiosqlite.Row]: db = await _connect_db() if _SQLITE_SELECT_COMMIT: await db.commit() async with db.execute(q, params) as cur: return await cur.fetchall() async def _execute(q: str, params: tuple = (), *, commit: bool = False) -> aiosqlite.Cursor: db = await _connect_db() cur = await db.execute(q, params) if commit: await db.commit() return cur # ------------------------- # Rights / permissions # ------------------------- # Биты прав как в исходнике (mariadb): # item_r: bits 0,2,4 # item_w: bits 1,3,5 # user_r: bits 6,8,10 # user_w: bits 7,9,11 _BITS = { "item_r": (0, 2, 4), "item_w": (1, 3, 5), "user_r": (6, 8, 10), "user_w": (7, 9, 11), } async def _get_user_rights(user_id: int) -> int | None: row = await _fetchone( """ SELECT g.rights FROM rs_user u JOIN rs_group g ON u.group_id = g.group_id WHERE u.user_id = ? """, (int(user_id),), ) return None if row is None else int(row["rights"]) def _rights_tuple(rights: int, kind: str) -> tuple[int, int, int]: b0, b1, b2 = _BITS[kind] return ((rights >> b0) & 1, (rights >> b1) & 1, (rights >> b2) & 1) async def permissions(user_id: int, kind: int) -> tuple[int, int, int] | None: """ kind: 0=item_r, 1=item_w, 2=user_r, 3=user_w Возвращает 3 флага (self, group, all) как в исходнике. """ rights = await _get_user_rights(int(user_id)) if rights is None: return None if kind == 0: return _rights_tuple(rights, "item_r") if kind == 1: return _rights_tuple(rights, "item_w") if kind == 2: return _rights_tuple(rights, "user_r") if kind == 3: return _rights_tuple(rights, "user_w") raise ValueError("Unknown kind") async def permissions_item_r(u_id: int): return await permissions(u_id, 0) async def permissions_item_w(u_id: int): return await permissions(u_id, 1) async def permissions_user_r(u_id: int): return await permissions(u_id, 2) async def permissions_user_w(u_id: int): return await permissions(u_id, 3) async def _owned_group_ids(owner_user_id: int) -> list[int]: rows = await _fetchall( "SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?", (int(owner_user_id),), ) return [int(r["group_id"]) for r in rows] async def _owned_user_ids(owner_user_id: int) -> list[int]: gids = await _owned_group_ids(owner_user_id) if not gids: return [] placeholders = ",".join(["?"] * len(gids)) rows = await _fetchall( f"SELECT user_id FROM rs_user WHERE group_id IN ({placeholders})", tuple(gids), ) return [int(r["user_id"]) for r in rows] # ------------------------- # Token / sessions (Redis) — оставлено как было # ------------------------- def update_short_token(user_id, live_token): TTL = r.pttl(f"recording_spark:{user_id}:{live_token}:PC-INFO") / 1000 if TTL > 31 * 60: rm_short_token(user_id, live_token) list_A = r.lrange(f"recording_spark:{user_id}:{live_token}:PC-INFO", 0, -1) for key in r.scan_iter(f"recording_spark:{user_id}:{live_token}:*"): r.delete(key) short_token, live_token = add_live_token( user_id, list_A[0], list_A[1], list_A[2], int(TTL) ) return short_token, live_token return None, None def rm_live_token(user_id, live_token): short_token = r.get(f"recording_spark:{user_id}:{live_token}:short_token") r.delete(f"recording_spark:short_token:{short_token}") A = False for key in r.scan_iter(f"recording_spark:{user_id}:{live_token}:*"): A = True r.delete(key) return A def rm_short_token(user_id, live_token): short_token = r.get(f"recording_spark:{user_id}:{live_token}:short_token") r.delete(f"recording_spark:short_token:{short_token}") r.delete(f"recording_spark:{user_id}:{live_token}:short_token") def add_short_token(user_id, live_token): short_token = secrets.token_urlsafe(32) r.set(f"recording_spark:short_token:{short_token}", user_id, 30 * 60) r.set(f"recording_spark:{user_id}:{live_token}:short_token", short_token, 30 * 60) return short_token def add_live_token(user_id, ps_info_name, ps_info_ip, ps_info_date, TTL): live_token = secrets.token_urlsafe(128) r.rpush( f"recording_spark:{str(user_id)}:{live_token}:PC-INFO", ps_info_name, ps_info_ip, str(ps_info_date), ) r.expire(f"recording_spark:{user_id}:{live_token}:PC-INFO", timedelta(seconds=TTL)) short_token = add_short_token(user_id, live_token) return short_token, live_token def check_user_live_token(user_id, live_token): user_id = r.get(f"recording_spark:{user_id}:{live_token}:PC-INFO") if user_id is None: return None short_token = update_short_token(user_id, live_token) return short_token def check_user_short_token(short_token): user_id = r.get(f"recording_spark:short_token:{short_token}") return None if user_id is None else user_id def ls_sessions(user_id): A = 0 Q = [] for key in r.scan_iter(f"recording_spark:{user_id}:*:PC-INFO"): short_token = r.lrange(key, 0, -1) short_token.append(A) Q.append(short_token) A = A + 1 return Q def rm_live_token_position(user_id, position): A = 0 for key in r.scan_iter(f"recording_spark:{user_id}:*"): if position == A: short_token = r.get(f"recording_spark:{user_id}:{key}:short_token") r.delete(f"recording_spark:short_token:{short_token}") for key in r.scan_iter(f"recording_spark:{user_id}:{key}:*"): r.delete(key) A = A + 1 def full_sessions_kill(user_id): for key in r.scan_iter(f"recording_spark:{user_id}:*:short_token"): short_token = r.get(key) r.delete(f"recording_spark:short_token:{short_token}") r.delete(key) for key in r.scan_iter(f"recording_spark:{user_id}:*"): r.delete(key) def add_user_registration(email, password): G = int(strftime("%Y")) M = int(strftime("%m")) D = int(strftime("%d")) ch = int(strftime("%H")) m = int(strftime("%M")) s = int(strftime("%S")) id = f"{D}{ch}{m}{s}{random.randint(0, 100000)}" date = f"{G}-{M}-{D}" r.set(f"recording_spark:registration:{id}:email", email, 7 * 24 * 60 * 60) r.set(f"recording_spark:registration:{id}:password", password, 7 * 24 * 60 * 60) r.set(f"recording_spark:registration:{id}:date", date, 7 * 24 * 60 * 60) return id # ------------------------- # DB functions (async) — аналог прежних, но на SQLite # ------------------------- async def user_name(user_id: int): await ping_db() row = await _fetchone("SELECT user_name FROM rs_user WHERE user_id = ?", (int(user_id),)) return None if row is None else row["user_name"] async def avatar_png(user_id: int): await ping_db() row = await _fetchone("SELECT avatar FROM rs_user WHERE user_id = ?", (int(user_id),)) return None if row is None else row["avatar"] async def check_user(email: str, password: str): await ping_db() row = await _fetchone( "SELECT user_id, user_name, active FROM rs_user WHERE email = ? AND password = ?", (email, password), ) if row is None: return None, None, None return int(row["user_id"]), row["user_name"], int(row["active"]) async def ls_user(user_id: int): await ping_db() A = await permissions_user_r(user_id) if A is None: return None q = """ SELECT u.user_id, u.user_name, u.email, u.avatar, u.active, g.group_id, g.name, g.rights FROM rs_user u JOIN rs_group g ON u.group_id = g.group_id """ params: list = [] if A[2] == 1: rows = await _fetchall(q, ()) return [list(r) for r in rows] conds = [] if A[0] == 1: conds.append("u.user_id = ?") params.append(int(user_id)) if A[1] == 1: conds.append( "u.user_id IN (SELECT user_id FROM rs_user WHERE group_id IN (SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?))" ) params.append(int(user_id)) if not conds: return [] q += " WHERE " + " OR ".join(conds) rows = await _fetchall(q, tuple(params)) return [list(r) for r in rows] async def ls_item(user_id: int): await ping_db() A = await permissions_item_r(user_id) if A is None: return None q = """ SELECT i.item_id, i.item_name, i.status, i.user_id_1, u1.user_name, i.user_id_2, u2.user_name, i.icon, i.comments, i.inventory_id FROM rs_item i JOIN rs_user u1 ON i.user_id_1 = u1.user_id JOIN rs_user u2 ON i.user_id_2 = u2.user_id """ params: list = [] if A[2] == 1: rows = await _fetchall(q, ()) return [list(r) for r in rows] conds = [] if A[0] == 1: conds.append("(i.user_id_1 = ? OR i.user_id_2 = ?)") params.extend([int(user_id), int(user_id)]) if A[1] == 1: conds.append( "i.user_id_2 IN (SELECT user_id FROM rs_user WHERE group_id IN (SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?))" ) params.append(int(user_id)) if not conds: return [] q += " WHERE " + " OR ".join(conds) rows = await _fetchall(q, tuple(params)) return [list(r) for r in rows] async def user_add(user_id: int, user_name: str, email: str, password: str, avatar: int, active: int, group_id: int): await ping_db() A = await permissions_user_w(user_id) if A is None: return None if A[2] != 1: if A[1] == 1: owned = await _owned_group_ids(user_id) if int(group_id) not in owned: return None else: return None cur = await _execute( """ INSERT INTO rs_user (user_name, email, password, avatar, active, group_id) VALUES (?, ?, ?, ?, ?, ?) """, (user_name, email, password, int(avatar), int(active), int(group_id)), commit=True, ) return int(cur.lastrowid) async def _can_read_user(viewer_id: int, target_user_id: int) -> bool: A = await permissions_user_r(viewer_id) if A is None: return False if A[2] == 1: return True if A[0] == 1 and int(viewer_id) == int(target_user_id): return True if A[1] == 1: owned_users = await _owned_user_ids(viewer_id) return int(target_user_id) in owned_users return False async def _can_write_user(actor_id: int, target_user_id: int) -> bool: A = await permissions_user_w(actor_id) if A is None: return False if A[2] == 1: return True if A[0] == 1 and int(actor_id) == int(target_user_id): return True if A[1] == 1: owned_users = await _owned_user_ids(actor_id) return int(target_user_id) in owned_users return False async def user_edit(user_id: int, user_id_rec: int, user_name, email, password, avatar, active, group_id): await ping_db() if not await _can_write_user(user_id, user_id_rec): return 0 if group_id is not None: A = await permissions_user_w(user_id) if A is None: return 0 if A[2] != 1: owned = await _owned_group_ids(user_id) if int(group_id) not in owned: return 0 cur = await _execute( """ UPDATE rs_user SET user_name = COALESCE(?, user_name), email = COALESCE(?, email), password = COALESCE(?, password), avatar = COALESCE(?, avatar), active = COALESCE(?, active), group_id = COALESCE(?, group_id) WHERE user_id = ? """, (user_name, email, password, avatar, active, group_id, int(user_id_rec)), commit=True, ) changed = int(cur.rowcount) if changed == 1 and (active == 0 or password is not None): full_sessions_kill(user_id_rec) return changed async def user_info(user_id: int, user_id_rec: int): await ping_db() if not await _can_read_user(user_id, user_id_rec): return None row = await _fetchone( """ SELECT u.user_name, u.email, u.avatar, u.active, g.group_id, g.rights FROM rs_user u JOIN rs_group g ON u.group_id = g.group_id WHERE u.user_id = ? """, (int(user_id_rec),), ) return None if row is None else tuple(row) async def ls_group(user_id: int): await ping_db() A = await permissions_user_r(user_id) if A is None: return None if A[2] == 1: rows = await _fetchall("SELECT group_id, name, rights FROM rs_group", ()) return [tuple(r) for r in rows] rows = await _fetchall( """ SELECT group_id, name, rights FROM rs_group WHERE group_id IN (SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?) """, (int(user_id),), ) return [tuple(r) for r in rows] async def _can_read_item(viewer_id: int, item_row: aiosqlite.Row) -> bool: A = await permissions_item_r(viewer_id) if A is None: return False if A[2] == 1: return True if A[0] == 1 and (int(item_row["user_id_1"]) == int(viewer_id) or int(item_row["user_id_2"]) == int(viewer_id)): return True if A[1] == 1: owned_users = await _owned_user_ids(viewer_id) return int(item_row["user_id_2"]) in owned_users return False async def _can_write_item(actor_id: int, item_row: aiosqlite.Row) -> bool: A = await permissions_item_w(actor_id) if A is None: return False if A[2] == 1: return True if A[0] == 1 and int(item_row["user_id_1"]) == int(actor_id): return True if A[1] == 1: owned_users = await _owned_user_ids(actor_id) return int(item_row["user_id_1"]) in owned_users return False async def item_info(user_id: int, item_id: int): await ping_db() row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),)) if row is None: return None if not await _can_read_item(user_id, row): return None return tuple(row) async def item_w_test(user_id: int, item_id: int): await ping_db() row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),)) if row is None: return 0 return 1 if await _can_write_item(user_id, row) else 0 async def item_edit(user_id: int, item_id: int, user_id_1, user_id_2, name, status, icon, comments, inventory_id): await ping_db() row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),)) if row is None: return 0 if not await _can_write_item(user_id, row): return 0 if user_id_1 is not None and int(user_id_1) != -1: if not await _can_read_user(user_id, int(user_id_1)) and int(user_id) != int(user_id_1): return 0 if user_id_2 is not None and int(user_id_2) != -1: if not await _can_read_user(user_id, int(user_id_2)) and int(user_id) != int(user_id_2): return 0 icon_val = icon if icon is not None and str(icon) == "-1": icon_val = None inv_val = inventory_id if inventory_id is not None and int(inventory_id) == -1: inv_val = None cur = await _execute( """ UPDATE rs_item SET item_name = COALESCE(?, item_name), status = COALESCE(?, status), user_id_1 = COALESCE(?, user_id_1), user_id_2 = COALESCE(?, user_id_2), icon = COALESCE(?, icon), comments = COALESCE(?, comments), inventory_id = COALESCE(?, inventory_id) WHERE item_id = ? """, (name, status, user_id_1, user_id_2, icon_val, comments, inv_val, int(item_id)), commit=True, ) return int(cur.rowcount) async def item_rm(user_id: int, item_id: int): await ping_db() row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),)) if row is None: return 0 if not await _can_write_item(user_id, row): return 0 cur = await _execute("DELETE FROM rs_item WHERE item_id = ?", (int(item_id),), commit=True) return int(cur.rowcount) async def add_item(user_id: int, user_id_1: int, user_id_2: int, name: str, status, icon, comments, inventory_id): await ping_db() A = await permissions_item_w(user_id) if A is None: return 0 if A[2] != 1: ok = False if A[0] == 1 and (int(user_id_1) == int(user_id) or int(user_id_2) == int(user_id)): ok = True if not ok and A[1] == 1: owned_users = await _owned_user_ids(user_id) if int(user_id_1) == int(user_id) or int(user_id_2) == int(user_id): ok = True if int(user_id_1) in owned_users or int(user_id_2) in owned_users: ok = True if not ok: return 0 cur = await _execute( """ INSERT INTO rs_item (item_name, status, user_id_1, user_id_2, icon, comments, inventory_id) VALUES (?, ?, ?, ?, ?, ?, ?) """, (name, status, int(user_id_1), int(user_id_2), icon, comments, inventory_id), commit=True, ) return int(cur.lastrowid) async def user_rm(user_id: int, user_id_rm: int): await ping_db() if int(user_id) == int(user_id_rm): return 0 if not await _can_write_user(user_id, user_id_rm): return 0 db = await _connect_db() try: await db.execute("BEGIN;") await db.execute( "DELETE FROM rs_ownership_over_group WHERE user_id = ?", (int(user_id_rm),), ) cur2 = await db.execute( "DELETE FROM rs_user WHERE user_id = ?", (int(user_id_rm),), ) await db.commit() return int(cur2.rowcount) except Exception: await db.rollback() raise # Константы (оставлены, если где-то используются извне) admin = 0 meneger = 34815 sender = 41935 user = 409665