| from typing import Dict |
| from uuid import UUID |
| import asyncio |
| from fastapi import WebSocket |
| from types import SimpleNamespace |
| from typing import Dict |
| from typing import Union |
|
|
| UserDataContent = Dict[UUID, Dict[str, Union[WebSocket, asyncio.Queue]]] |
|
|
|
|
| class UserData: |
| def __init__(self): |
| self.data_content: Dict[UUID, UserDataContent] = {} |
|
|
| async def create_user(self, user_id: UUID, websocket: WebSocket): |
| self.data_content[user_id] = { |
| "websocket": websocket, |
| "queue": asyncio.Queue(), |
| } |
| await asyncio.sleep(1) |
|
|
| def check_user(self, user_id: UUID) -> bool: |
| return user_id in self.data_content |
|
|
| async def update_data(self, user_id: UUID, new_data: SimpleNamespace): |
| user_session = self.data_content[user_id] |
| queue = user_session["queue"] |
| while not queue.empty(): |
| try: |
| queue.get_nowait() |
| except asyncio.QueueEmpty: |
| continue |
| await queue.put(new_data) |
|
|
| async def get_latest_data(self, user_id: UUID) -> SimpleNamespace: |
| user_session = self.data_content[user_id] |
| queue = user_session["queue"] |
|
|
| try: |
| return await queue.get() |
| except asyncio.QueueEmpty: |
| return None |
|
|
| def delete_user(self, user_id: UUID): |
| user_session = self.data_content[user_id] |
| queue = user_session["queue"] |
| while not queue.empty(): |
| try: |
| queue.get_nowait() |
| except asyncio.QueueEmpty: |
| continue |
| if user_id in self.data_content: |
| del self.data_content[user_id] |
|
|
| def get_user_count(self) -> int: |
| return len(self.data_content) |
|
|
| def get_websocket(self, user_id: UUID) -> WebSocket: |
| return self.data_content[user_id]["websocket"] |
|
|
|
|
| user_data = UserData() |
|
|