-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0ac18a6
commit 7c2a144
Showing
3 changed files
with
121 additions
and
64 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,73 +1,88 @@ | ||
import asyncio | ||
import random | ||
import requests | ||
import logging | ||
import hikari | ||
import random | ||
from enum import Enum | ||
|
||
import hikari | ||
import requests | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
async def process_users(users, draw_common_movie=False) -> hikari.Embed: | ||
mentioned_users = [user.mention for user in users if user is not None] | ||
response = f"Oznaczono {len(mentioned_users)} użytkowników:\n" + "\n".join(mentioned_users) | ||
class MediaType(Enum): | ||
FILM = "film" | ||
SERIAL = "serial" | ||
|
||
|
||
async def fetch_filmweb_id(user: any) -> str | None: | ||
logger.debug(f"Fetching Filmweb ID for user: {user.id}") | ||
response = requests.get(f"http://filman_server:8000/filmweb/user/mapping/get", params={"discord_id": user.id}) | ||
if response.status_code == 200: | ||
filmweb_id = response.json().get("filmweb_id") | ||
if filmweb_id: | ||
logger.debug(f"Found Filmweb ID: {filmweb_id} for user: {user.id}") | ||
return filmweb_id | ||
else: | ||
logger.warning(f"No Filmweb ID found for user: {user.id}") | ||
else: | ||
logger.error(f"Error fetching Filmweb ID for user: {user.id}, status: {response.status_code}") | ||
return None | ||
|
||
|
||
async def fetch_media_to_watch(filmweb_id: str, media_type: MediaType) -> dict[str, any] | None: | ||
logger.debug(f"Fetching list of media to watch for Filmweb ID: {filmweb_id}") | ||
response = requests.get(f"https://www.filmweb.pl/api/v1/user/{filmweb_id}/want2see/{media_type.value}") | ||
if response.status_code == 200: | ||
return response.json() | ||
else: | ||
logger.error(f"Error fetching list of media for Filmweb ID: {filmweb_id}, status: {response.status_code}") | ||
return None | ||
|
||
|
||
async def process_media(users: any, draw_common_media: bool = False, media_type: MediaType = MediaType.FILM) -> str: | ||
mentioned_users = [user.mention for user in users if user is not None] | ||
logger.debug(f"Users: {mentioned_users}") | ||
|
||
filmweb_ids = {} | ||
for user in users: | ||
if user is not None: | ||
logger.debug(f"Fetching Filmweb ID for user: {user.id}") | ||
r = requests.get(f"http://filman_server:8000/filmweb/user/mapping/get", params={"discord_id": user.id}) | ||
if r.status_code == 200: | ||
filmweb_id = r.json().get("filmweb_id") | ||
if filmweb_id: | ||
filmweb_ids[filmweb_id] = user | ||
logger.debug(f"Found Filmweb ID: {filmweb_id} for user: {user.id}") | ||
else: | ||
logger.warning(f"No Filmweb ID found for user: {user.id}") | ||
else: | ||
logger.error(f"Error fetching Filmweb ID for user: {user.id}, status: {r.status_code}") | ||
|
||
movies_to_watch = {} | ||
filmweb_ids = {await fetch_filmweb_id(user): user for user in users if user is not None} | ||
filmweb_ids = {k: v for k, v in filmweb_ids.items() if k is not None} | ||
|
||
media_to_watch: dict[str, dict[str, any]] = {} | ||
for filmweb_id, user in filmweb_ids.items(): | ||
logger.debug(f"Fetching list of movies to watch for Filmweb ID: {filmweb_id}") | ||
r = requests.get(f"https://www.filmweb.pl/api/v1/user/{filmweb_id}/want2see/film") | ||
if r.status_code == 200: | ||
movie_entities = r.json() | ||
for movie in movie_entities: | ||
movie_id = movie.get("entity") | ||
movie_url = f"https://www.filmweb.pl/film/x-1-{movie_id}" | ||
if movie_id not in movies_to_watch: | ||
movies_to_watch[movie_id] = {"url": movie_url, "users": set()} | ||
movies_to_watch[movie_id]["users"].add(user.id) | ||
logger.debug(f"Added movie to list: {movie_url} from user: {user.id}") | ||
else: | ||
logger.error(f"Error fetching list of movies for Filmweb ID: {filmweb_id}, status: {r.status_code}") | ||
media_entities = await fetch_media_to_watch(filmweb_id, media_type) | ||
if media_entities: | ||
for media in media_entities: | ||
media_id = media.get("entity") | ||
media_url = f"https://www.filmweb.pl/{media_type.value}/x-1-{media_id}" | ||
if media_id not in media_to_watch: | ||
media_to_watch[media_id] = {"url": media_url, "users": set()} | ||
media_to_watch[media_id]["users"].add(user.id) | ||
logger.debug(f"Added {media_type.value} to list: {media_url} from user: {user.id}") | ||
|
||
common_movies = [movie for movie in movies_to_watch.values() if len(movie["users"]) > 1] | ||
common_media = [media for media in media_to_watch.values() if len(media["users"]) > 1] | ||
|
||
if draw_common_movie: | ||
if common_movies: | ||
common_movie = random.choice(common_movies) | ||
user_mentions = ", ".join([f"<@{user_id}>" for user_id in common_movie["users"]]) | ||
response += f"\nCommon movie to watch: {common_movie['url']} (added by {user_mentions})" | ||
response_url = "" | ||
response_mentions = "" | ||
|
||
if draw_common_media: | ||
if common_media: | ||
common_media_item = random.choice(common_media) | ||
user_mentions = ", ".join([f"<@{user_id}>" for user_id in common_media_item["users"]]) | ||
response_url = common_media_item["url"] | ||
response_mentions = f"z listy {user_mentions}" | ||
else: | ||
response += "\nNo common movies found." | ||
response_url = "Brak wspólnych filmów." | ||
response_mentions = "" | ||
else: | ||
uncommon_movies = [movie for movie in movies_to_watch.values() if len(movie["users"]) == 1] | ||
if uncommon_movies: | ||
uncommon_movie = random.choice(uncommon_movies) | ||
user_mentions = ", ".join([f"<@{user_id}>" for user_id in uncommon_movie["users"]]) | ||
response += f"\nUncommon movie to watch: {uncommon_movie['url']} (added by {user_mentions})" | ||
uncommon_media = [media for media in media_to_watch.values() if len(media["users"]) == 1] | ||
if uncommon_media: | ||
uncommon_media_item = random.choice(uncommon_media) | ||
user_mentions = ", ".join([f"<@{user_id}>" for user_id in uncommon_media_item["users"]]) | ||
response_url = uncommon_media_item["url"] | ||
response_mentions = f"z listy {user_mentions}" | ||
else: | ||
response += "\nNo uncommon movies found." | ||
|
||
#todo: fix it | ||
response_embed = hikari.Embed( | ||
title="Movies to watch", | ||
description=response, | ||
color=0xFFC200, | ||
) | ||
response_url = "Brak filmów do obejrzenia." | ||
response_mentions = "" | ||
|
||
response = f"{response_url} {response_mentions}" | ||
|
||
return response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters