wg_index/bd_module.py
2025-12-27 05:02:10 +03:00

748 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import random
import secrets
from time import strftime # Для (Time)
from datetime import timedelta
import asyncio
import aiosqlite
# BEGIN redis (оставлено синхронным как в исходнике)
import redis
put = os.path.dirname(os.path.realpath(__file__)) + "/"
with open(f"{put}settings.json", "r", encoding="utf-8") as read_file:
settings = json.load(read_file)
r = redis.StrictRedis(
host=settings["Redis"]["host"],
port=settings["Redis"]["port"],
password=settings["Redis"]["password"],
encoding="utf-8",
decode_responses=True,
db=settings["Redis"]["db"],
)
# END redis
# -------------------------
# SQLite (async) backend
# -------------------------
_SQLITE_PATH = (
settings.get("Sqlite", {}).get("path")
or os.path.join(put, "recording_spark.sqlite3")
)
# Если раньше в settings.json было Mariadb.select_commit — оставим аналог
_SQLITE_SELECT_COMMIT = bool(settings.get("Sqlite", {}).get("select_commit", False))
_db: aiosqlite.Connection | None = None
_db_init_lock = asyncio.Lock()
_db_initialized = False
async def _connect_db() -> aiosqlite.Connection:
"""
Возвращает глобальное соединение aiosqlite, инициализирует схему при первом обращении.
"""
global _db, _db_initialized
async with _db_init_lock:
if _db is None:
_db = await aiosqlite.connect(_SQLITE_PATH)
_db.row_factory = aiosqlite.Row
await _db.execute("PRAGMA foreign_keys = ON;")
await _db.execute("PRAGMA journal_mode = WAL;")
await _db.execute("PRAGMA synchronous = NORMAL;")
await _db.execute("PRAGMA busy_timeout = 5000;")
if not _db_initialized:
await _init_schema(_db)
_db_initialized = True
return _db
async def _init_schema(db: aiosqlite.Connection) -> None:
"""
Создаёт таблицы и индексы в SQLite.
Заменяет MySQL/MariaDB схему recording_spark.* на rs_* (без schema prefix).
"""
await db.executescript(
"""
CREATE TABLE IF NOT EXISTS rs_group (
group_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
rights INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS rs_user (
user_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
email TEXT,
password TEXT,
avatar INTEGER NOT NULL DEFAULT 0,
active INTEGER NOT NULL DEFAULT 0,
group_id INTEGER NOT NULL,
FOREIGN KEY (group_id) REFERENCES rs_group (group_id)
);
CREATE TABLE IF NOT EXISTS rs_item (
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
item_name TEXT NOT NULL,
status TEXT,
user_id_1 INTEGER NOT NULL,
user_id_2 INTEGER NOT NULL,
icon TEXT DEFAULT NULL,
comments TEXT,
inventory_id INTEGER,
FOREIGN KEY (user_id_1) REFERENCES rs_user (user_id),
FOREIGN KEY (user_id_2) REFERENCES rs_user (user_id)
);
CREATE TABLE IF NOT EXISTS rs_ownership_over_group (
group_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
FOREIGN KEY (group_id) REFERENCES rs_group (group_id),
FOREIGN KEY (user_id) REFERENCES rs_user (user_id)
);
CREATE INDEX IF NOT EXISTS idx_rs_user_group_id ON rs_user(group_id);
CREATE INDEX IF NOT EXISTS idx_rs_item_user1 ON rs_item(user_id_1);
CREATE INDEX IF NOT EXISTS idx_rs_item_user2 ON rs_item(user_id_2);
CREATE INDEX IF NOT EXISTS idx_rs_oog_group_id ON rs_ownership_over_group(group_id);
CREATE INDEX IF NOT EXISTS idx_rs_oog_user_id ON rs_ownership_over_group(user_id);
"""
)
await db.commit()
async def close_db() -> None:
global _db, _db_initialized
if _db is not None:
await _db.close()
_db = None
_db_initialized = False
async def ping_db() -> None:
"""
В MariaDB это был ping/reconnect. В SQLite (aiosqlite) просто гарантируем, что соединение поднято.
"""
await _connect_db()
async def _fetchone(q: str, params: tuple = ()) -> aiosqlite.Row | None:
db = await _connect_db()
if _SQLITE_SELECT_COMMIT:
await db.commit()
async with db.execute(q, params) as cur:
return await cur.fetchone()
async def _fetchall(q: str, params: tuple = ()) -> list[aiosqlite.Row]:
db = await _connect_db()
if _SQLITE_SELECT_COMMIT:
await db.commit()
async with db.execute(q, params) as cur:
return await cur.fetchall()
async def _execute(q: str, params: tuple = (), *, commit: bool = False) -> aiosqlite.Cursor:
db = await _connect_db()
cur = await db.execute(q, params)
if commit:
await db.commit()
return cur
# -------------------------
# Rights / permissions
# -------------------------
# Биты прав как в исходнике (mariadb):
# item_r: bits 0,2,4
# item_w: bits 1,3,5
# user_r: bits 6,8,10
# user_w: bits 7,9,11
_BITS = {
"item_r": (0, 2, 4),
"item_w": (1, 3, 5),
"user_r": (6, 8, 10),
"user_w": (7, 9, 11),
}
async def _get_user_rights(user_id: int) -> int | None:
row = await _fetchone(
"""
SELECT g.rights
FROM rs_user u
JOIN rs_group g ON u.group_id = g.group_id
WHERE u.user_id = ?
""",
(int(user_id),),
)
return None if row is None else int(row["rights"])
def _rights_tuple(rights: int, kind: str) -> tuple[int, int, int]:
b0, b1, b2 = _BITS[kind]
return ((rights >> b0) & 1, (rights >> b1) & 1, (rights >> b2) & 1)
async def permissions(user_id: int, kind: int) -> tuple[int, int, int] | None:
"""
kind: 0=item_r, 1=item_w, 2=user_r, 3=user_w
Возвращает 3 флага (self, group, all) как в исходнике.
"""
rights = await _get_user_rights(int(user_id))
if rights is None:
return None
if kind == 0:
return _rights_tuple(rights, "item_r")
if kind == 1:
return _rights_tuple(rights, "item_w")
if kind == 2:
return _rights_tuple(rights, "user_r")
if kind == 3:
return _rights_tuple(rights, "user_w")
raise ValueError("Unknown kind")
async def permissions_item_r(u_id: int):
return await permissions(u_id, 0)
async def permissions_item_w(u_id: int):
return await permissions(u_id, 1)
async def permissions_user_r(u_id: int):
return await permissions(u_id, 2)
async def permissions_user_w(u_id: int):
return await permissions(u_id, 3)
async def _owned_group_ids(owner_user_id: int) -> list[int]:
rows = await _fetchall(
"SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?",
(int(owner_user_id),),
)
return [int(r["group_id"]) for r in rows]
async def _owned_user_ids(owner_user_id: int) -> list[int]:
gids = await _owned_group_ids(owner_user_id)
if not gids:
return []
placeholders = ",".join(["?"] * len(gids))
rows = await _fetchall(
f"SELECT user_id FROM rs_user WHERE group_id IN ({placeholders})",
tuple(gids),
)
return [int(r["user_id"]) for r in rows]
# -------------------------
# Token / sessions (Redis) — оставлено как было
# -------------------------
def update_short_token(user_id, live_token):
TTL = r.pttl(f"recording_spark:{user_id}:{live_token}:PC-INFO") / 1000
if TTL > 31 * 60:
rm_short_token(user_id, live_token)
list_A = r.lrange(f"recording_spark:{user_id}:{live_token}:PC-INFO", 0, -1)
for key in r.scan_iter(f"recording_spark:{user_id}:{live_token}:*"):
r.delete(key)
short_token, live_token = add_live_token(
user_id, list_A[0], list_A[1], list_A[2], int(TTL)
)
return short_token, live_token
return None, None
def rm_live_token(user_id, live_token):
short_token = r.get(f"recording_spark:{user_id}:{live_token}:short_token")
r.delete(f"recording_spark:short_token:{short_token}")
A = False
for key in r.scan_iter(f"recording_spark:{user_id}:{live_token}:*"):
A = True
r.delete(key)
return A
def rm_short_token(user_id, live_token):
short_token = r.get(f"recording_spark:{user_id}:{live_token}:short_token")
r.delete(f"recording_spark:short_token:{short_token}")
r.delete(f"recording_spark:{user_id}:{live_token}:short_token")
def add_short_token(user_id, live_token):
short_token = secrets.token_urlsafe(32)
r.set(f"recording_spark:short_token:{short_token}", user_id, 30 * 60)
r.set(f"recording_spark:{user_id}:{live_token}:short_token", short_token, 30 * 60)
return short_token
def add_live_token(user_id, ps_info_name, ps_info_ip, ps_info_date, TTL):
live_token = secrets.token_urlsafe(128)
r.rpush(
f"recording_spark:{str(user_id)}:{live_token}:PC-INFO",
ps_info_name,
ps_info_ip,
str(ps_info_date),
)
r.expire(f"recording_spark:{user_id}:{live_token}:PC-INFO", timedelta(seconds=TTL))
short_token = add_short_token(user_id, live_token)
return short_token, live_token
def check_user_live_token(user_id, live_token):
user_id = r.get(f"recording_spark:{user_id}:{live_token}:PC-INFO")
if user_id is None:
return None
short_token = update_short_token(user_id, live_token)
return short_token
def check_user_short_token(short_token):
user_id = r.get(f"recording_spark:short_token:{short_token}")
return None if user_id is None else user_id
def ls_sessions(user_id):
A = 0
Q = []
for key in r.scan_iter(f"recording_spark:{user_id}:*:PC-INFO"):
short_token = r.lrange(key, 0, -1)
short_token.append(A)
Q.append(short_token)
A = A + 1
return Q
def rm_live_token_position(user_id, position):
A = 0
for key in r.scan_iter(f"recording_spark:{user_id}:*"):
if position == A:
short_token = r.get(f"recording_spark:{user_id}:{key}:short_token")
r.delete(f"recording_spark:short_token:{short_token}")
for key in r.scan_iter(f"recording_spark:{user_id}:{key}:*"):
r.delete(key)
A = A + 1
def full_sessions_kill(user_id):
for key in r.scan_iter(f"recording_spark:{user_id}:*:short_token"):
short_token = r.get(key)
r.delete(f"recording_spark:short_token:{short_token}")
r.delete(key)
for key in r.scan_iter(f"recording_spark:{user_id}:*"):
r.delete(key)
def add_user_registration(email, password):
G = int(strftime("%Y"))
M = int(strftime("%m"))
D = int(strftime("%d"))
ch = int(strftime("%H"))
m = int(strftime("%M"))
s = int(strftime("%S"))
id = f"{D}{ch}{m}{s}{random.randint(0, 100000)}"
date = f"{G}-{M}-{D}"
r.set(f"recording_spark:registration:{id}:email", email, 7 * 24 * 60 * 60)
r.set(f"recording_spark:registration:{id}:password", password, 7 * 24 * 60 * 60)
r.set(f"recording_spark:registration:{id}:date", date, 7 * 24 * 60 * 60)
return id
# -------------------------
# DB functions (async) — аналог прежних, но на SQLite
# -------------------------
async def user_name(user_id: int):
await ping_db()
row = await _fetchone("SELECT user_name FROM rs_user WHERE user_id = ?", (int(user_id),))
return None if row is None else row["user_name"]
async def avatar_png(user_id: int):
await ping_db()
row = await _fetchone("SELECT avatar FROM rs_user WHERE user_id = ?", (int(user_id),))
return None if row is None else row["avatar"]
async def check_user(email: str, password: str):
await ping_db()
row = await _fetchone(
"SELECT user_id, user_name, active FROM rs_user WHERE email = ? AND password = ?",
(email, password),
)
if row is None:
return None, None, None
return int(row["user_id"]), row["user_name"], int(row["active"])
async def ls_user(user_id: int):
await ping_db()
A = await permissions_user_r(user_id)
if A is None:
return None
q = """
SELECT u.user_id, u.user_name, u.email, u.avatar, u.active, g.group_id, g.name, g.rights
FROM rs_user u
JOIN rs_group g ON u.group_id = g.group_id
"""
params: list = []
if A[2] == 1:
rows = await _fetchall(q, ())
return [list(r) for r in rows]
conds = []
if A[0] == 1:
conds.append("u.user_id = ?")
params.append(int(user_id))
if A[1] == 1:
conds.append(
"u.user_id IN (SELECT user_id FROM rs_user WHERE group_id IN (SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?))"
)
params.append(int(user_id))
if not conds:
return []
q += " WHERE " + " OR ".join(conds)
rows = await _fetchall(q, tuple(params))
return [list(r) for r in rows]
async def ls_item(user_id: int):
await ping_db()
A = await permissions_item_r(user_id)
if A is None:
return None
q = """
SELECT i.item_id, i.item_name, i.status,
i.user_id_1, u1.user_name,
i.user_id_2, u2.user_name,
i.icon, i.comments, i.inventory_id
FROM rs_item i
JOIN rs_user u1 ON i.user_id_1 = u1.user_id
JOIN rs_user u2 ON i.user_id_2 = u2.user_id
"""
params: list = []
if A[2] == 1:
rows = await _fetchall(q, ())
return [list(r) for r in rows]
conds = []
if A[0] == 1:
conds.append("(i.user_id_1 = ? OR i.user_id_2 = ?)")
params.extend([int(user_id), int(user_id)])
if A[1] == 1:
conds.append(
"i.user_id_2 IN (SELECT user_id FROM rs_user WHERE group_id IN (SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?))"
)
params.append(int(user_id))
if not conds:
return []
q += " WHERE " + " OR ".join(conds)
rows = await _fetchall(q, tuple(params))
return [list(r) for r in rows]
async def user_add(user_id: int, user_name: str, email: str, password: str, avatar: int, active: int, group_id: int):
await ping_db()
A = await permissions_user_w(user_id)
if A is None:
return None
if A[2] != 1:
if A[1] == 1:
owned = await _owned_group_ids(user_id)
if int(group_id) not in owned:
return None
else:
return None
cur = await _execute(
"""
INSERT INTO rs_user (user_name, email, password, avatar, active, group_id)
VALUES (?, ?, ?, ?, ?, ?)
""",
(user_name, email, password, int(avatar), int(active), int(group_id)),
commit=True,
)
return int(cur.lastrowid)
async def _can_read_user(viewer_id: int, target_user_id: int) -> bool:
A = await permissions_user_r(viewer_id)
if A is None:
return False
if A[2] == 1:
return True
if A[0] == 1 and int(viewer_id) == int(target_user_id):
return True
if A[1] == 1:
owned_users = await _owned_user_ids(viewer_id)
return int(target_user_id) in owned_users
return False
async def _can_write_user(actor_id: int, target_user_id: int) -> bool:
A = await permissions_user_w(actor_id)
if A is None:
return False
if A[2] == 1:
return True
if A[0] == 1 and int(actor_id) == int(target_user_id):
return True
if A[1] == 1:
owned_users = await _owned_user_ids(actor_id)
return int(target_user_id) in owned_users
return False
async def user_edit(user_id: int, user_id_rec: int, user_name, email, password, avatar, active, group_id):
await ping_db()
if not await _can_write_user(user_id, user_id_rec):
return 0
if group_id is not None:
A = await permissions_user_w(user_id)
if A is None:
return 0
if A[2] != 1:
owned = await _owned_group_ids(user_id)
if int(group_id) not in owned:
return 0
cur = await _execute(
"""
UPDATE rs_user
SET
user_name = COALESCE(?, user_name),
email = COALESCE(?, email),
password = COALESCE(?, password),
avatar = COALESCE(?, avatar),
active = COALESCE(?, active),
group_id = COALESCE(?, group_id)
WHERE user_id = ?
""",
(user_name, email, password, avatar, active, group_id, int(user_id_rec)),
commit=True,
)
changed = int(cur.rowcount)
if changed == 1 and (active == 0 or password is not None):
full_sessions_kill(user_id_rec)
return changed
async def user_info(user_id: int, user_id_rec: int):
await ping_db()
if not await _can_read_user(user_id, user_id_rec):
return None
row = await _fetchone(
"""
SELECT u.user_name, u.email, u.avatar, u.active, g.group_id, g.rights
FROM rs_user u
JOIN rs_group g ON u.group_id = g.group_id
WHERE u.user_id = ?
""",
(int(user_id_rec),),
)
return None if row is None else tuple(row)
async def ls_group(user_id: int):
await ping_db()
A = await permissions_user_r(user_id)
if A is None:
return None
if A[2] == 1:
rows = await _fetchall("SELECT group_id, name, rights FROM rs_group", ())
return [tuple(r) for r in rows]
rows = await _fetchall(
"""
SELECT group_id, name, rights
FROM rs_group
WHERE group_id IN (SELECT group_id FROM rs_ownership_over_group WHERE user_id = ?)
""",
(int(user_id),),
)
return [tuple(r) for r in rows]
async def _can_read_item(viewer_id: int, item_row: aiosqlite.Row) -> bool:
A = await permissions_item_r(viewer_id)
if A is None:
return False
if A[2] == 1:
return True
if A[0] == 1 and (int(item_row["user_id_1"]) == int(viewer_id) or int(item_row["user_id_2"]) == int(viewer_id)):
return True
if A[1] == 1:
owned_users = await _owned_user_ids(viewer_id)
return int(item_row["user_id_2"]) in owned_users
return False
async def _can_write_item(actor_id: int, item_row: aiosqlite.Row) -> bool:
A = await permissions_item_w(actor_id)
if A is None:
return False
if A[2] == 1:
return True
if A[0] == 1 and int(item_row["user_id_1"]) == int(actor_id):
return True
if A[1] == 1:
owned_users = await _owned_user_ids(actor_id)
return int(item_row["user_id_1"]) in owned_users
return False
async def item_info(user_id: int, item_id: int):
await ping_db()
row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),))
if row is None:
return None
if not await _can_read_item(user_id, row):
return None
return tuple(row)
async def item_w_test(user_id: int, item_id: int):
await ping_db()
row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),))
if row is None:
return 0
return 1 if await _can_write_item(user_id, row) else 0
async def item_edit(user_id: int, item_id: int, user_id_1, user_id_2, name, status, icon, comments, inventory_id):
await ping_db()
row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),))
if row is None:
return 0
if not await _can_write_item(user_id, row):
return 0
if user_id_1 is not None and int(user_id_1) != -1:
if not await _can_read_user(user_id, int(user_id_1)) and int(user_id) != int(user_id_1):
return 0
if user_id_2 is not None and int(user_id_2) != -1:
if not await _can_read_user(user_id, int(user_id_2)) and int(user_id) != int(user_id_2):
return 0
icon_val = icon
if icon is not None and str(icon) == "-1":
icon_val = None
inv_val = inventory_id
if inventory_id is not None and int(inventory_id) == -1:
inv_val = None
cur = await _execute(
"""
UPDATE rs_item
SET
item_name = COALESCE(?, item_name),
status = COALESCE(?, status),
user_id_1 = COALESCE(?, user_id_1),
user_id_2 = COALESCE(?, user_id_2),
icon = COALESCE(?, icon),
comments = COALESCE(?, comments),
inventory_id = COALESCE(?, inventory_id)
WHERE item_id = ?
""",
(name, status, user_id_1, user_id_2, icon_val, comments, inv_val, int(item_id)),
commit=True,
)
return int(cur.rowcount)
async def item_rm(user_id: int, item_id: int):
await ping_db()
row = await _fetchone("SELECT * FROM rs_item WHERE item_id = ?", (int(item_id),))
if row is None:
return 0
if not await _can_write_item(user_id, row):
return 0
cur = await _execute("DELETE FROM rs_item WHERE item_id = ?", (int(item_id),), commit=True)
return int(cur.rowcount)
async def add_item(user_id: int, user_id_1: int, user_id_2: int, name: str, status, icon, comments, inventory_id):
await ping_db()
A = await permissions_item_w(user_id)
if A is None:
return 0
if A[2] != 1:
ok = False
if A[0] == 1 and (int(user_id_1) == int(user_id) or int(user_id_2) == int(user_id)):
ok = True
if not ok and A[1] == 1:
owned_users = await _owned_user_ids(user_id)
if int(user_id_1) == int(user_id) or int(user_id_2) == int(user_id):
ok = True
if int(user_id_1) in owned_users or int(user_id_2) in owned_users:
ok = True
if not ok:
return 0
cur = await _execute(
"""
INSERT INTO rs_item (item_name, status, user_id_1, user_id_2, icon, comments, inventory_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(name, status, int(user_id_1), int(user_id_2), icon, comments, inventory_id),
commit=True,
)
return int(cur.lastrowid)
async def user_rm(user_id: int, user_id_rm: int):
await ping_db()
if int(user_id) == int(user_id_rm):
return 0
if not await _can_write_user(user_id, user_id_rm):
return 0
db = await _connect_db()
try:
await db.execute("BEGIN;")
await db.execute(
"DELETE FROM rs_ownership_over_group WHERE user_id = ?",
(int(user_id_rm),),
)
cur2 = await db.execute(
"DELETE FROM rs_user WHERE user_id = ?",
(int(user_id_rm),),
)
await db.commit()
return int(cur2.rowcount)
except Exception:
await db.rollback()
raise
# Константы (оставлены, если где-то используются извне)
admin = 0
meneger = 34815
sender = 41935
user = 409665