Ловите мою бездарность
import logging
import json
import sqlite3
from telebot import TeleBot
import g4f
class ChatBot:
def __init__(self, token, db_name="chat_contexts.db"):
self.bot = TeleBot(token)
self.db_name = db_name
self._initialize_database()
def _initialize_database(self):
self._execute_query("""
CREATE TABLE IF NOT EXISTS contexts (
chat_id INTEGER PRIMARY KEY,
context_data TEXT
)
""")
def _execute_query(self, query, params=(), fetchone=False):
try:
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
if fetchone:
return cursor.fetchone()
conn.commit()
except sqlite3.Error as err:
logging.error(f"Database error: {err}")
return None
def _save_context(self, chat_id, context):
self._execute_query("""
INSERT OR REPLACE INTO contexts (chat_id, context_data)
VALUES (?, ?)
""", (chat_id, json.dumps(context)))
def _load_context(self, chat_id):
row = self._execute_query(
"SELECT context_data FROM contexts WHERE chat_id = ?",
(chat_id,),
fetchone=True
)
return json.loads(row[0]) if row else []
def _process_user_message(self, message):
chat_id = message.chat.id
user_input = message.text
context = self._load_context(chat_id)
context.append({"role": "user", "content": user_input})
try:
response = g4f.ChatCompletion.create(
model=g4f.models.gpt_4,
messages=context
)
assistant_reply = ''.join(response) if isinstance(response, list) else str(response)
context.append({"role": "assistant", "content": assistant_reply})
self._save_context(chat_id, context)
return assistant_reply
except Exception as err:
logging.error(f"Processing error: {err}")
return "Произошла ошибка. Попробуйте снова!"
def _add_handlers(self):
@self.bot.message_handler(commands=["start", "help"])
def handle_start(message):
self.bot.reply_to(message, "Добро пожаловать! Я умный помощник, задавайте вопросы.")
@self.bot.message_handler(func=lambda msg: msg.content_type == "text")
def handle_text(message):
reply = self._process_user_message(message)
self.bot.reply_to(message, reply)
@self.bot.message_handler(func=lambda msg: True)
def handle_other(message):
self.bot.reply_to(message, "Извините, я пока не умею обрабатывать этот тип сообщений.")
def run(self):
self._add_handlers()
try:
logging.info("Bot is running...")
self.bot.polling()
except KeyboardInterrupt:
logging.info("Bot stopped manually.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
token = " ТОКЕН_ТГ_БОТА"
chatbot = ChatBot(token)
chatbot.run()