#!/usr/bin/env python3 import requests import xml.etree.ElementTree as ET import json import time import threading from flask import Flask, jsonify, request from datetime import datetime import logging import os from typing import Dict, Any, Optional # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = Flask(__name__) class RackNerdAPIConverter: def __init__(self): # Load configuration from environment variables self.api_key = os.getenv('RACKNERD_API_KEY', 'A0ZJA-FSJXW-QXV7R') self.api_hash = os.getenv('RACKNERD_API_HASH', 'fce545debdab0edf2565788277d3670e1afd8823') self.vserver_id = os.getenv('RACKNERD_VSERVER_ID', '476515') self.base_url = os.getenv('RACKNERD_BASE_URL', 'https://nerdvm.racknerd.com/api/client/command.php') self.update_interval = int(os.getenv('UPDATE_INTERVAL', '1800')) # 30 minutes default self.cached_data = {} self.last_update = None self.lock = threading.Lock() def fetch_kvm_data(self) -> Optional[Dict[str, Any]]: """Fetch data from RackNerd API and convert XML to JSON""" try: # Construct the API URL params = { 'key': self.api_key, 'hash': self.api_hash, 'action': 'info', 'vserverid': self.vserver_id, 'bw': 'true' } logger.info(f"Fetching data from RackNerd API for vserver {self.vserver_id}") # Make the API request response = requests.get(self.base_url, params=params, timeout=30) response.raise_for_status() # Clean up the response text xml_text = response.text.strip() # Handle potential HTML/error responses if xml_text.lower().startswith(''): xml_text = f"{xml_text}" logger.info("Wrapped XML fragments in root element") # Parse XML response root = ET.fromstring(xml_text) # Convert XML to JSON json_data = self.xml_to_dict(root) # Parse bandwidth data if present if 'bw' in json_data and isinstance(json_data['bw'], str): bw_parts = json_data['bw'].split(',') if len(bw_parts) >= 4: json_data['bandwidth'] = { 'total_bytes': int(bw_parts[0]) if bw_parts[0].isdigit() else bw_parts[0], 'used_bytes': int(bw_parts[1]) if bw_parts[1].isdigit() else bw_parts[1], 'total_formatted': self.format_bytes(int(bw_parts[0])) if bw_parts[0].isdigit() else bw_parts[0], 'used_formatted': self.format_bytes(int(bw_parts[1])) if bw_parts[1].isdigit() else bw_parts[1], 'remaining_bytes': int(bw_parts[0]) - int(bw_parts[1]) if bw_parts[0].isdigit() and bw_parts[1].isdigit() else 0, 'usage_percent': round((int(bw_parts[1]) / int(bw_parts[0])) * 100, 2) if bw_parts[0].isdigit() and bw_parts[1].isdigit() and int(bw_parts[0]) > 0 else 0 } if json_data['bandwidth']['remaining_bytes'] > 0: json_data['bandwidth']['remaining_formatted'] = self.format_bytes(json_data['bandwidth']['remaining_bytes']) # Add metadata json_data['_metadata'] = { 'last_updated': datetime.utcnow().isoformat() + 'Z', 'source': 'racknerd_api', 'vserver_id': self.vserver_id, 'raw_response_length': len(response.text) } logger.info("Successfully converted XML to JSON") return json_data except requests.exceptions.RequestException as e: logger.error(f"API request failed: {e}") return { 'error': 'API Request Failed', 'message': str(e), 'timestamp': datetime.utcnow().isoformat() + 'Z' } except ET.ParseError as e: logger.error(f"XML parsing failed: {e}") logger.error(f"Raw response that failed to parse: {response.text}") return { 'error': 'XML Parse Error', 'message': str(e), 'raw_response': response.text[:1000], 'timestamp': datetime.utcnow().isoformat() + 'Z' } except Exception as e: logger.error(f"Unexpected error: {e}") return { 'error': 'Unexpected Error', 'message': str(e), 'timestamp': datetime.utcnow().isoformat() + 'Z' } def xml_to_dict(self, element) -> Dict[str, Any]: """Recursively convert XML element to dictionary""" result = {} # Handle element text if element.text and element.text.strip(): # Try to convert to appropriate type text = element.text.strip() if text.lower() in ['true', 'false']: result['_text'] = text.lower() == 'true' elif text.isdigit(): result['_text'] = int(text) elif self.is_float(text): result['_text'] = float(text) else: result['_text'] = text # Handle attributes if element.attrib: result['_attributes'] = element.attrib # Handle child elements children = {} for child in element: child_data = self.xml_to_dict(child) if child.tag in children: # Handle multiple children with same tag if not isinstance(children[child.tag], list): children[child.tag] = [children[child.tag]] children[child.tag].append(child_data) else: children[child.tag] = child_data if children: result.update(children) # If only text content, return it directly if len(result) == 1 and '_text' in result: return result['_text'] return result def format_bytes(self, bytes_value: int) -> str: """Convert bytes to human readable format""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes_value < 1024.0: return f"{bytes_value:.1f} {unit}" bytes_value /= 1024.0 return f"{bytes_value:.1f} PB" def is_float(self, value: str) -> bool: """Check if string can be converted to float""" try: float(value) return True except ValueError: return False def update_cache(self): """Update cached data""" with self.lock: data = self.fetch_kvm_data() if data: self.cached_data = data self.last_update = datetime.utcnow() logger.info("Cache updated successfully") else: logger.error("Failed to update cache") def get_cached_data(self) -> Dict[str, Any]: """Get cached data with thread safety""" with self.lock: if not self.cached_data: return { 'error': 'No data available', 'message': 'Initial data fetch in progress' } return self.cached_data.copy() def start_background_updates(self): """Start background thread for periodic updates""" def update_loop(): while True: self.update_cache() time.sleep(self.update_interval) thread = threading.Thread(target=update_loop, daemon=True) thread.start() logger.info(f"Background updates started (interval: {self.update_interval} seconds)") # Initialize converter converter = RackNerdAPIConverter() @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.utcnow().isoformat() + 'Z', 'last_update': converter.last_update.isoformat() + 'Z' if converter.last_update else None }) @app.route('/api/kvm', methods=['GET']) def get_kvm_data(): """Main endpoint to get KVM data in JSON format""" return jsonify(converter.get_cached_data()) @app.route('/api/kvm/raw', methods=['GET']) def get_raw_kvm_data(): """Endpoint to get fresh data (bypass cache)""" data = converter.fetch_kvm_data() if data: return jsonify(data) else: return jsonify({ 'error': 'Failed to fetch data', 'message': 'Unable to retrieve data from RackNerd API' }), 500 @app.route('/api/kvm/debug', methods=['GET']) def debug_api(): """Debug endpoint to see raw API response""" try: params = { 'key': converter.api_key, 'hash': converter.api_hash, 'action': 'info', 'vserverid': converter.vserver_id, 'bw': 'true' } response = requests.get(converter.base_url, params=params, timeout=30) return jsonify({ 'status_code': response.status_code, 'headers': dict(response.headers), 'raw_content': response.text, 'content_length': len(response.text), 'url': response.url }) except Exception as e: return jsonify({ 'error': 'Debug request failed', 'message': str(e) }), 500 @app.route('/api/kvm/status', methods=['GET']) def get_status(): """Get service status and metadata""" with converter.lock: return jsonify({ 'service': 'racknerd-api-converter', 'status': 'running', 'vserver_id': converter.vserver_id, 'update_interval': converter.update_interval, 'last_update': converter.last_update.isoformat() + 'Z' if converter.last_update else None, 'has_cached_data': bool(converter.cached_data), 'timestamp': datetime.utcnow().isoformat() + 'Z' }) @app.errorhandler(404) def not_found(error): return jsonify({ 'error': 'Not found', 'message': 'The requested endpoint does not exist' }), 404 @app.errorhandler(500) def internal_error(error): return jsonify({ 'error': 'Internal server error', 'message': 'An unexpected error occurred' }), 500 if __name__ == '__main__': # Initial data fetch logger.info("Starting RackNerd API Converter...") converter.update_cache() # Start background updates converter.start_background_updates() # Start Flask app port = int(os.getenv('PORT', '5000')) host = os.getenv('HOST', '0.0.0.0') logger.info(f"Starting server on {host}:{port}") app.run(host=host, port=port, debug=False)