◐ Shell
clean mode source ↗

chore: switch back to sqlite3 by alissonlauffer · Pull Request #56 · hydrogram/hydrogram

Expand Up @@ -21,13 +21,12 @@
import base64 import logging import sqlite3 import struct import time from pathlib import Path from typing import Any
import aiosqlite
from hydrogram import raw, utils
from .base import BaseStorage, InputPeer Expand Down Expand Up @@ -105,7 +104,7 @@ def __init__( else (workdir / (self.name + self.FILE_EXTENSION) if workdir else ":memory:") ) self.session_string: str | None = session_string self.conn: aiosqlite.Connection | None = None self.conn: sqlite3.Connection | None = None
async def update(self) -> None: if not self.conn: Expand All @@ -115,47 +114,47 @@ async def update(self) -> None: version: int | None = await self.version()
if version == 1: await self.conn.execute("DELETE FROM peers") self.conn.execute("DELETE FROM peers") version += 1
if version == 2: await self.conn.execute("ALTER TABLE sessions ADD api_id INTEGER") self.conn.execute("ALTER TABLE sessions ADD api_id INTEGER") version += 1
await self.version(version) await self.conn.commit() self.conn.commit()
async def create(self) -> None: if not self.conn: logging.warning("Database connection is not available.") return
await self.conn.executescript(SCHEMA) await self.conn.execute( self.conn.executescript(SCHEMA) self.conn.execute( "INSERT INTO version VALUES (?)", (self.VERSION,), ) await self.conn.execute( self.conn.execute( "INSERT INTO sessions VALUES (?, ?, ?, ?, ?, ?, ?)", (2, None, None, None, 0, None, None), ) await self.conn.commit() self.conn.commit()
async def open(self) -> None: path = self.database file_exists = isinstance(path, Path) and path.is_file()
self.conn = await aiosqlite.connect(self.database) self.conn = sqlite3.connect(self.database)
await self.conn.execute("PRAGMA journal_mode=WAL") self.conn.execute("PRAGMA journal_mode=WAL")
if file_exists: await self.update() await self.conn.execute("VACUUM") self.conn.execute("VACUUM") else: await self.create()
await self.conn.commit() self.conn.commit()
if self.session_string: await self._load_session_string() Expand Down Expand Up @@ -187,11 +186,11 @@ async def save(self) -> None: return
await self.date(int(time.time())) await self.conn.commit() self.conn.commit()
async def close(self) -> None: if self.conn: await self.conn.close() self.conn.close()
async def delete(self) -> None: if self.database != ":memory:": Expand All @@ -204,22 +203,22 @@ async def update_peers( logging.warning("Database connection is not available.") return
await self.conn.executemany( self.conn.executemany( "REPLACE INTO peers (id, access_hash, type, username, phone_number) " "VALUES (?, ?, ?, ?, ?)", peers, ) await self.conn.commit() self.conn.commit()
async def get_peer_by_id(self, peer_id: int) -> InputPeer | None: if not self.conn: logging.warning("Database connection is not available.") return None
q = await self.conn.execute( q = self.conn.execute( "SELECT id, access_hash, type FROM peers WHERE id = ?", (peer_id,) ) r = await q.fetchone() r = q.fetchone() if not r: raise KeyError(f"ID not found: {peer_id}")
Expand All @@ -230,14 +229,14 @@ async def get_peer_by_username(self, username: str) -> InputPeer | None: logging.warning("Database connection is not available.") return None
q = await self.conn.execute( q = self.conn.execute( "SELECT id, access_hash, type, last_update_on " "FROM peers " "WHERE username = ? " "ORDER BY last_update_on DESC", (username,), ) r = await q.fetchone() r = q.fetchone() if not r: raise KeyError(f"Username not found: {username}")
Expand All @@ -251,10 +250,10 @@ async def get_peer_by_phone_number(self, phone_number: str) -> InputPeer | None: logging.warning("Database connection is not available.") return None
q = await self.conn.execute( q = self.conn.execute( "SELECT id, access_hash, type FROM peers WHERE phone_number = ?", (phone_number,) ) r = await q.fetchone() r = q.fetchone() if not r: raise KeyError(f"Phone number not found: {phone_number}")
Expand All @@ -265,17 +264,17 @@ async def _get(self, attr: str) -> Any: logging.warning("Database connection is not available.") return None
q = await self.conn.execute(f"SELECT {attr} FROM sessions") row = await q.fetchone() q = self.conn.execute(f"SELECT {attr} FROM sessions") row = q.fetchone() return row[0] if row else None
async def _set(self, attr: str, value: Any) -> None: if not self.conn: logging.warning("Database connection is not available.") return
await self.conn.execute(f"UPDATE sessions SET {attr} = ?", (value,)) await self.conn.commit() self.conn.execute(f"UPDATE sessions SET {attr} = ?", (value,)) self.conn.commit()
async def _accessor(self, attr: str, value: Any = object) -> Any | None: if not self.conn: Expand Down Expand Up @@ -315,10 +314,10 @@ async def version(self, value: int | object = object) -> int | None: return None
if value is object: q = await self.conn.execute("SELECT number FROM version") row = await q.fetchone() q = self.conn.execute("SELECT number FROM version") row = q.fetchone() return row[0] if row else None
await self.conn.execute("UPDATE version SET number = ?", (value,)) await self.conn.commit() self.conn.execute("UPDATE version SET number = ?", (value,)) self.conn.commit() return None