# This file is part of python-cinema-club-bot # contributed in 2024 by Mikhail Kirillov (~w96k) # To the extent possible under law, the author(s) have dedicated all copyright # and related and neighboring rights to this software to the public domain # worldwide. This software is distributed without any warranty. # You should have received a copy of the CC0 Public Domain Dedication along # with this software. If not, see: # from telegram import Update, error from telegram.ext import ContextTypes from collections import deque from strings import USER_NOT_PROVIDED, USERS_ADDED, USERS_REMOVED, \ EXPECTED_ONE_USER, USER_SET, USER_REMOVE, ADD_MORE_USERS, \ NEXT_MOVIE_USER, USER_NOT_FOUND, USER_CHOOSE, NO_USERS, \ NEXT_MOVIE from utils import context_init, create_users_string, choose_next_user, \ normalize_username from predicates import has_finished_movie, has_event_without_movie async def set_users( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: context_init(context) if context.args == []: raise error.TelegramError(USER_NOT_PROVIDED) context.chat_data["users"] = context.args await update.message.reply_text(USER_SET) async def add_users( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: context_init(context) if context.args == []: raise error.TelegramError(USER_NOT_PROVIDED) for user in context.args: context.chat_data["users"].append(user) await update.message.reply_text(USERS_ADDED) async def list_users( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: context_init(context) users = context.chat_data["users"] if users == []: await update.message.reply_text(NO_USERS) return if has_finished_movie(context): users = context.chat_data["users"] = choose_next_user(context) await update.message.reply_markdown(create_users_string(users)) async def who_is_next( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """ This commands sets the next chooser if needed and shows current """ context_init(context) users = context.chat_data["users"] if users == []: raise error.TelegramError(ADD_MORE_USERS) if has_finished_movie(context): users = context.chat_data["users"] = choose_next_user(context) next_user_string = NEXT_MOVIE_USER.format(user=users[0]) last_movie = context.chat_data["movies"][-1] \ if context.chat_data["movies"] != [] else None if last_movie and last_movie["user"] == normalize_username(users[0]): next_user_string += NEXT_MOVIE.format(movie=last_movie["title"]) await update.message.reply_text(next_user_string) async def remove_users( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: context_init(context) if context.args == []: raise error.TelegramError(USER_NOT_PROVIDED) for user in context.args: if user == "*": context.chat_data["users"] = [] break context.chat_data["users"].remove(user) await update.message.reply_text(USER_REMOVE.format(user=user)) await update.message.reply_text(USERS_REMOVED) async def chooser_user( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: context_init(context) if context.args == []: raise error.TelegramError(USER_NOT_PROVIDED) if len(context.args) > 1: raise error.TelegramError(EXPECTED_ONE_USER) chooser = context.args[0] users = deque(context.chat_data["users"]) try: chooser_index = users.index(chooser) except ValueError: raise error.TelegramError(USER_NOT_FOUND.format(user=chooser)) users.rotate(-chooser_index) users = list(users) context.chat_data["users"] = users if has_finished_movie(context): users = context.chat_data["users"] = choose_next_user(context) if has_event_without_movie(context): context.chat_data["events"][-1]["user"] = users[0] await update.message.reply_text(USER_CHOOSE.format(user=users[0])) await update.message.reply_markdown(create_users_string(users))