chore: switch back to sqlite3 by alissonlauffer · Pull Request #56 · hydrogram/hydrogram
import base64 import logging import sqlite3 import struct import time from pathlib import Path from typing import Any
import aiosqlite
from hydrogram import raw, utils
from .base import BaseStorage, InputPeer
async def update(self) -> None: if not self.conn:
if version == 1: await self.conn.execute("DELETE FROM peers") self.conn.execute("DELETE FROM peers") version += 1
if version == 2: await self.conn.execute("ALTER TABLE sessions ADD api_id INTEGER") self.conn.execute("ALTER TABLE sessions ADD api_id INTEGER") version += 1
await self.version(version) await self.conn.commit() self.conn.commit()
async def create(self) -> None: if not self.conn: logging.warning("Database connection is not available.") return
await self.conn.executescript(SCHEMA) await self.conn.execute( self.conn.executescript(SCHEMA) self.conn.execute( "INSERT INTO version VALUES (?)", (self.VERSION,), ) await self.conn.execute( self.conn.execute( "INSERT INTO sessions VALUES (?, ?, ?, ?, ?, ?, ?)", (2, None, None, None, 0, None, None), ) await self.conn.commit() self.conn.commit()
async def open(self) -> None: path = self.database file_exists = isinstance(path, Path) and path.is_file()
self.conn = await aiosqlite.connect(self.database) self.conn = sqlite3.connect(self.database)
await self.conn.execute("PRAGMA journal_mode=WAL") self.conn.execute("PRAGMA journal_mode=WAL")
if file_exists: await self.update() await self.conn.execute("VACUUM") self.conn.execute("VACUUM") else: await self.create()
await self.conn.commit() self.conn.commit()
if self.session_string: await self._load_session_string()
await self.date(int(time.time())) await self.conn.commit() self.conn.commit()
async def close(self) -> None: if self.conn: await self.conn.close() self.conn.close()
async def delete(self) -> None: if self.database != ":memory:":
await self.conn.executemany( self.conn.executemany( "REPLACE INTO peers (id, access_hash, type, username, phone_number) " "VALUES (?, ?, ?, ?, ?)", peers, ) await self.conn.commit() self.conn.commit()
async def get_peer_by_id(self, peer_id: int) -> InputPeer | None: if not self.conn: logging.warning("Database connection is not available.") return None
q = await self.conn.execute( q = self.conn.execute( "SELECT id, access_hash, type FROM peers WHERE id = ?", (peer_id,) ) r = await q.fetchone() r = q.fetchone() if not r: raise KeyError(f"ID not found: {peer_id}")
q = await self.conn.execute( q = self.conn.execute( "SELECT id, access_hash, type, last_update_on " "FROM peers " "WHERE username = ? " "ORDER BY last_update_on DESC", (username,), ) r = await q.fetchone() r = q.fetchone() if not r: raise KeyError(f"Username not found: {username}")
q = await self.conn.execute( q = self.conn.execute( "SELECT id, access_hash, type FROM peers WHERE phone_number = ?", (phone_number,) ) r = await q.fetchone() r = q.fetchone() if not r: raise KeyError(f"Phone number not found: {phone_number}")
q = await self.conn.execute(f"SELECT {attr} FROM sessions") row = await q.fetchone() q = self.conn.execute(f"SELECT {attr} FROM sessions") row = q.fetchone() return row[0] if row else None
async def _set(self, attr: str, value: Any) -> None: if not self.conn: logging.warning("Database connection is not available.") return
await self.conn.execute(f"UPDATE sessions SET {attr} = ?", (value,)) await self.conn.commit() self.conn.execute(f"UPDATE sessions SET {attr} = ?", (value,)) self.conn.commit()
async def _accessor(self, attr: str, value: Any = object) -> Any | None: if not self.conn:
if value is object: q = await self.conn.execute("SELECT number FROM version") row = await q.fetchone() q = self.conn.execute("SELECT number FROM version") row = q.fetchone() return row[0] if row else None
await self.conn.execute("UPDATE version SET number = ?", (value,)) await self.conn.commit() self.conn.execute("UPDATE version SET number = ?", (value,)) self.conn.commit() return None