EU-Utility/plugins/nexus_search/plugin.py

443 lines
15 KiB
Python

"""
EU-Utility - Nexus Search Plugin
Built-in plugin for searching EntropiaNexus via API.
Uses official Nexus API endpoints.
"""
import json
import webbrowser
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout,
QLineEdit, QPushButton, QLabel, QComboBox,
QListWidget, QListWidgetItem, QTabWidget,
QTableWidget, QTableWidgetItem, QHeaderView
)
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from plugins.base_plugin import BasePlugin
class NexusAPIClient:
"""Client for EntropiaNexus API."""
BASE_URL = "https://www.entropianexus.com"
@classmethod
def fetch_exchange_items(cls, search_query=None, http_get_func=None):
"""Fetch exchange items from Nexus API."""
try:
url = f"{cls.BASE_URL}/api/market/exchange"
if http_get_func:
response = http_get_func(
url,
cache_ttl=60, # 1 minute cache for market data
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
data = response.get('json') if response else None
else:
# Fallback for standalone usage
import urllib.request
req = urllib.request.Request(
url,
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode('utf-8'))
# Filter by search query if provided
if search_query and data:
search_lower = search_query.lower()
filtered = []
for category in data:
if 'items' in category:
for item in category['items']:
if search_lower in item.get('name', '').lower():
filtered.append(item)
return filtered
return data
except Exception as e:
print(f"API Error: {e}")
return None
@classmethod
def fetch_item_prices(cls, item_ids, http_get_func=None):
"""Fetch latest prices for items."""
try:
if not item_ids:
return {}
ids_str = ','.join(str(id) for id in item_ids[:100]) # Max 100
url = f"{cls.BASE_URL}/api/market/prices/latest?items={ids_str}"
if http_get_func:
response = http_get_func(
url,
cache_ttl=60, # 1 minute cache
headers={'Accept': 'application/json'}
)
return response.get('json') if response else {}
else:
# Fallback for standalone usage
import urllib.request
req = urllib.request.Request(
url,
headers={'Accept': 'application/json'}
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode('utf-8'))
except Exception as e:
print(f"Price API Error: {e}")
return {}
@classmethod
def search_users(cls, query, http_get_func=None):
"""Search for verified users."""
try:
params = {'q': query, 'limit': 10}
query_string = '&'.join(f"{k}={v}" for k, v in params.items())
url = f"{cls.BASE_URL}/api/users/search?{query_string}"
if http_get_func:
response = http_get_func(
url,
cache_ttl=300, # 5 minute cache for user search
headers={'Accept': 'application/json'}
)
return response.get('json') if response else None
else:
# Fallback for standalone usage
import urllib.request
req = urllib.request.Request(
url,
headers={'Accept': 'application/json'}
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode('utf-8'))
except Exception as e:
print(f"User Search Error: {e}")
return None
class NexusSearchThread(QThread):
"""Background thread for API searches."""
results_ready = pyqtSignal(list, str) # results, search_type
error_occurred = pyqtSignal(str)
def __init__(self, query, search_type, http_get_func=None):
super().__init__()
self.query = query
self.search_type = search_type
self.http_get_func = http_get_func
def run(self):
"""Perform API search."""
try:
results = []
if self.search_type == "Items":
# Search exchange items
data = NexusAPIClient.fetch_exchange_items(self.query, http_get_func=self.http_get_func)
if data:
if isinstance(data, list) and len(data) > 0 and 'name' in data[0]:
# Already filtered items
results = data[:20] # Limit to 20
else:
# Full category structure
for category in data:
if 'items' in category:
for item in category['items']:
if self.query.lower() in item.get('name', '').lower():
results.append(item)
if len(results) >= 20:
break
if len(results) >= 20:
break
elif self.search_type == "Users":
# Search users
data = NexusAPIClient.search_users(self.query, http_get_func=self.http_get_func)
if data:
results = data[:10]
self.results_ready.emit(results, self.search_type)
except Exception as e:
self.error_occurred.emit(str(e))
class NexusSearchPlugin(BasePlugin):
"""Search EntropiaNexus via API."""
name = "EntropiaNexus"
version = "1.1.0"
author = "ImpulsiveFPS"
description = "Search items, users, and market data via Nexus API"
hotkey = "ctrl+shift+n"
def initialize(self):
"""Setup the plugin."""
self.base_url = "https://www.entropianexus.com"
self.search_thread = None
self.current_results = []
def get_ui(self):
"""Create plugin UI."""
widget = QWidget()
layout = QVBoxLayout(widget)
# Title
title = QLabel("EntropiaNexus")
title.setStyleSheet("color: #4a9eff; font-size: 18px; font-weight: bold;")
layout.addWidget(title)
# Search type
type_layout = QHBoxLayout()
type_layout.addWidget(QLabel("Search:"))
self.search_type = QComboBox()
self.search_type.addItems([
"Items",
"Users",
])
self.search_type.setStyleSheet("""
QComboBox {
background-color: #444;
color: white;
padding: 5px;
border-radius: 4px;
min-width: 100px;
}
""")
type_layout.addWidget(self.search_type)
# Search input
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("Enter search term...")
self.search_input.setStyleSheet("""
QLineEdit {
background-color: #333;
color: white;
padding: 8px;
border: 2px solid #555;
border-radius: 4px;
font-size: 13px;
}
QLineEdit:focus {
border-color: #4a9eff;
}
""")
self.search_input.returnPressed.connect(self._do_search)
type_layout.addWidget(self.search_input, 1)
# Search button
search_btn = QPushButton("🔍")
search_btn.setFixedWidth(40)
search_btn.setStyleSheet("""
QPushButton {
background-color: #4a9eff;
color: white;
border: none;
border-radius: 4px;
font-size: 14px;
}
QPushButton:hover {
background-color: #5aafff;
}
""")
search_btn.clicked.connect(self._do_search)
type_layout.addWidget(search_btn)
layout.addLayout(type_layout)
# Status
self.status_label = QLabel("Ready")
self.status_label.setStyleSheet("color: #666; font-size: 11px;")
layout.addWidget(self.status_label)
# Results table
self.results_table = QTableWidget()
self.results_table.setColumnCount(3)
self.results_table.setHorizontalHeaderLabels(["Name", "Type", "Price"])
self.results_table.horizontalHeader().setStretchLastSection(True)
self.results_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.Stretch)
self.results_table.setStyleSheet("""
QTableWidget {
background-color: #2a2a2a;
color: white;
border: 1px solid #444;
border-radius: 4px;
gridline-color: #444;
}
QTableWidget::item {
padding: 8px;
border-bottom: 1px solid #333;
}
QTableWidget::item:selected {
background-color: #4a9eff;
}
QHeaderView::section {
background-color: #333;
color: #aaa;
padding: 8px;
border: none;
font-weight: bold;
}
""")
self.results_table.cellClicked.connect(self._on_item_clicked)
self.results_table.setMaximumHeight(300)
layout.addWidget(self.results_table)
# Action buttons
btn_layout = QHBoxLayout()
open_btn = QPushButton("Open on Nexus")
open_btn.setStyleSheet("""
QPushButton {
background-color: #333;
color: white;
padding: 8px 16px;
border: none;
border-radius: 4px;
}
QPushButton:hover {
background-color: #444;
}
""")
open_btn.clicked.connect(self._open_selected)
btn_layout.addWidget(open_btn)
btn_layout.addStretch()
# Quick links
links_label = QLabel("Quick:")
links_label.setStyleSheet("color: #666;")
btn_layout.addWidget(links_label)
for name, url in [
("Market", "/market/exchange"),
("Items", "/items"),
("Mobs", "/mobs"),
]:
btn = QPushButton(name)
btn.setStyleSheet("""
QPushButton {
background-color: transparent;
color: #4a9eff;
border: none;
padding: 5px;
}
QPushButton:hover {
color: #5aafff;
text-decoration: underline;
}
""")
btn.clicked.connect(lambda checked, u=self.base_url + url: webbrowser.open(u))
btn_layout.addWidget(btn)
layout.addLayout(btn_layout)
layout.addStretch()
return widget
def _do_search(self):
"""Perform API search."""
query = self.search_input.text().strip()
if not query or len(query) < 2:
self.status_label.setText("Enter at least 2 characters")
return
search_type = self.search_type.currentText()
# Clear previous results
self.results_table.setRowCount(0)
self.current_results = []
self.status_label.setText("Searching...")
# Start search thread with http_get function
self.search_thread = NexusSearchThread(query, search_type, http_get_func=self.http_get)
self.search_thread.results_ready.connect(self._on_results)
self.search_thread.error_occurred.connect(self._on_error)
self.search_thread.start()
def _on_results(self, results, search_type):
"""Handle search results."""
self.current_results = results
if not results:
self.status_label.setText("No results found")
return
# Populate table
self.results_table.setRowCount(len(results))
for row, item in enumerate(results):
if search_type == "Items":
name = item.get('name', 'Unknown')
item_type = item.get('type', 'Item')
# Price info
buy_price = item.get('buy', [])
sell_price = item.get('sell', [])
if buy_price:
price_text = f"Buy: {buy_price[0].get('price', 'N/A')}"
elif sell_price:
price_text = f"Sell: {sell_price[0].get('price', 'N/A')}"
else:
price_text = "No orders"
self.results_table.setItem(row, 0, QTableWidgetItem(name))
self.results_table.setItem(row, 1, QTableWidgetItem(item_type))
self.results_table.setItem(row, 2, QTableWidgetItem(price_text))
elif search_type == "Users":
name = item.get('name', 'Unknown')
eu_name = item.get('euName', '')
self.results_table.setItem(row, 0, QTableWidgetItem(name))
self.results_table.setItem(row, 1, QTableWidgetItem("User"))
self.results_table.setItem(row, 2, QTableWidgetItem(eu_name or ''))
self.status_label.setText(f"Found {len(results)} results")
def _on_error(self, error):
"""Handle search error."""
self.status_label.setText(f"Error: {error}")
def _on_item_clicked(self, row, column):
"""Handle item click."""
if row < len(self.current_results):
item = self.current_results[row]
search_type = self.search_type.currentText()
if search_type == "Items":
item_id = item.get('id')
if item_id:
url = f"{self.base_url}/items/{item_id}"
webbrowser.open(url)
elif search_type == "Users":
user_id = item.get('id')
if user_id:
url = f"{self.base_url}/users/{user_id}"
webbrowser.open(url)
def _open_selected(self):
"""Open selected item in browser."""
selected = self.results_table.selectedItems()
if selected:
row = selected[0].row()
self._on_item_clicked(row, 0)
def on_hotkey(self):
"""Focus search when hotkey pressed."""
if hasattr(self, 'search_input'):
self.search_input.setFocus()
self.search_input.selectAll()