"""Discord Agent Integration Cog Handles all service integrations and provides commands for interacting with them. """ import discord from discord.ext import commands import logging import aiohttp from typing import Optional, Dict, Any from datetime import datetime logger = logging.getLogger(__name__) class IntegrationCog(commands.Cog): """Manages integrations with external services.""" def __init__(self, bot): self.bot = bot self.config = bot.config self.session = getattr(bot, 'session', None) self.db_pool = getattr(bot, 'db_pool', None) self.redis_client = getattr(bot, 'redis_client', None) # Integration configurations self.integrations = self.config.get('integrations', {}).get('enabled', []) self.agent_endpoint = self.config.get('agent', {}).get('endpoint', 'http://192.168.0.10:8080') self.agent_api_key = self.config.get('agent', {}).get('api_key', '') # Integration metadata self.integration_info = { 'jellyfin': {'name': 'Jellyfin', 'description': 'Access Jellyfin media server'}, 'paperless': {'name': 'Paperless', 'description': 'Manage documents in Paperless'}, 'gitea': {'name': 'Gitea', 'description': 'Interact with Gitea repositories'}, 'wygiwyh': {'name': 'WYGIWYH', 'description': 'Check financial tracking'}, 'syncthing': {'name': 'Syncthing', 'description': 'Sync files with Syncthing'}, 'immich': {'name': 'Immich', 'description': 'Manage photos in Immich'}, 'speedtest-tracker': {'name': 'Speedtest', 'description': 'Check network speed'}, 'maloja': {'name': 'Maloja', 'description': 'Access music scrobbling'}, 'npm': {'name': 'Nginx Proxy Manager', 'description': 'Manage reverse proxy'}, } def _create_help_embed(self) -> discord.Embed: """Create a formatted help embed with all available commands.""" embed = discord.Embed( title="🤖 Discord Agent Commands", description="Available commands and integrations", color=discord.Color.blue() ) # Integration commands if self.integrations: integration_text = "" for integration_id in self.integrations: if integration_id in self.integration_info: info = self.integration_info[integration_id] cmd_name = 'speedtest' if integration_id == 'speedtest-tracker' else integration_id integration_text += f"**!{cmd_name}** - {info['description']}\n" if integration_text: embed.add_field( name="📦 Integration Commands", value=integration_text, inline=False ) # General commands general_text = ( "**!agent ** - Chat with the AI agent\n" "**!status** - Check system status\n" "**!help** - Show this help message" ) embed.add_field(name="⚙️ General Commands", value=general_text, inline=False) return embed @commands.command(name='help') async def help_command(self, ctx): """Show help information for integrations.""" try: embed = self._create_help_embed() await ctx.send(embed=embed) except Exception as e: logger.error(f"Error in help command: {e}") await ctx.send("❌ Sorry, I'm having trouble showing help right now.") @commands.command() async def agent(self, ctx, *, query: str): """Chat with the AI agent.""" try: async with ctx.typing(): response = await self._query_agent(query) # Split long responses if len(response) > 2000: chunks = [response[i:i+2000] for i in range(0, len(response), 2000)] for chunk in chunks: await ctx.send(chunk) else: await ctx.send(response) except Exception as e: logger.error(f"Error in agent command: {e}") await ctx.send("❌ Sorry, I'm having trouble connecting to the agent service.") async def _query_agent(self, query: str) -> str: """Query the agent API.""" if not self.session: return "❌ HTTP session not initialized" try: async with self.session.post( f"{self.agent_endpoint}/api/chat", json={"query": query, "api_key": self.agent_api_key}, timeout=aiohttp.ClientTimeout(total=30) ) as resp: if resp.status == 200: data = await resp.json() return data.get('response', '❌ No response from agent') else: return f"❌ Agent error: HTTP {resp.status}" except asyncio.TimeoutError: return "❌ Agent request timed out" except Exception as e: logger.error(f"Agent query error: {e}") return "❌ Failed to connect to agent" @commands.command() async def status(self, ctx): """Check system status.""" try: embed = discord.Embed( title="📊 System Status", color=discord.Color.green() ) # Bot status embed.add_field( name="🤖 Bot", value=f"Connected to {len(self.bot.guilds)} server(s)", inline=True ) # Agent status embed.add_field( name="🧠 Agent", value="Online ✅", inline=True ) # Database status db_status = "Connected ✅" if self.db_pool else "Disconnected ❌" embed.add_field(name="💾 Database", value=db_status, inline=True) # Redis status redis_status = "Connected ✅" if self.redis_client else "Disconnected ❌" embed.add_field(name="🔴 Redis", value=redis_status, inline=True) # Integrations embed.add_field( name="📦 Integrations", value=f"{len(self.integrations)} enabled", inline=True ) # Latency latency = round(self.bot.latency * 1000) embed.add_field(name="⚡ Latency", value=f"{latency}ms", inline=True) await ctx.send(embed=embed) except Exception as e: logger.error(f"Error in status command: {e}") await ctx.send("❌ Sorry, I'm having trouble checking system status.") async def _handle_integration_command(self, ctx, integration_id: str, query: str = ""): """Generic handler for integration commands.""" if integration_id not in self.integrations: info = self.integration_info.get(integration_id, {}) service_name = info.get('name', integration_id.title()) await ctx.send(f"❌ {service_name} integration is not enabled.") return try: # Placeholder for actual integration logic info = self.integration_info[integration_id] embed = discord.Embed( title=f"📦 {info['name']}", description=f"Integration is active. Query: `{query or 'None'}`", color=discord.Color.green() ) embed.add_field(name="Status", value="✅ Working", inline=True) embed.set_footer(text="This is a placeholder - implement actual integration logic") await ctx.send(embed=embed) except Exception as e: logger.error(f"Error in {integration_id} command: {e}") await ctx.send(f"❌ Sorry, I'm having trouble accessing {integration_id.title()}.") # Integration commands using the generic handler @commands.command() async def jellyfin(self, ctx, *, query: str = ""): """Access Jellyfin media server.""" await self._handle_integration_command(ctx, 'jellyfin', query) @commands.command() async def paperless(self, ctx, *, query: str = ""): """Manage documents in Paperless.""" await self._handle_integration_command(ctx, 'paperless', query) @commands.command() async def gitea(self, ctx, *, query: str = ""): """Interact with Gitea repositories.""" await self._handle_integration_command(ctx, 'gitea', query) @commands.command() async def wygiwyh(self, ctx, *, query: str = ""): """Check financial tracking.""" await self._handle_integration_command(ctx, 'wygiwyh', query) @commands.command() async def syncthing(self, ctx, *, query: str = ""): """Sync files with Syncthing.""" await self._handle_integration_command(ctx, 'syncthing', query) @commands.command() async def immich(self, ctx, *, query: str = ""): """Manage photos in Immich.""" await self._handle_integration_command(ctx, 'immich', query) @commands.command() async def speedtest(self, ctx, *, query: str = ""): """Check network speed.""" await self._handle_integration_command(ctx, 'speedtest-tracker', query) @commands.command() async def maloja(self, ctx, *, query: str = ""): """Access music scrobbling.""" await self._handle_integration_command(ctx, 'maloja', query) @commands.command() async def npm(self, ctx, *, query: str = ""): """Access Nginx Proxy Manager.""" await self._handle_integration_command(ctx, 'npm', query) async def setup(bot): """Setup function for loading the cog.""" await bot.add_cog(IntegrationCog(bot))