Add service category tagging script for Uptime Kuma
Creates and assigns service category tags to all monitors: - Hardware/monitoring: Server fan monitors (4 monitors) - Application: Web apps, database, media services (3 monitors) - Infrastructure: Storage, virtualization, VMs (5 monitors) Tags with color coding: - hardware (red), monitoring (orange) - application (blue), web (cyan), database (purple), media (pink) - infrastructure (green), storage (light green), virtualization (teal), vm (blue grey)
This commit is contained in:
Executable
+135
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Add service category tags to Uptime Kuma monitors
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from uptime_kuma_api import UptimeKumaApi
|
||||
|
||||
# Configuration
|
||||
UPTIME_KUMA_URL = "https://uptime.kolpacksoftware.com"
|
||||
USERNAME = os.getenv('UPTIME_KUMA_USERNAME')
|
||||
PASSWORD = os.getenv('UPTIME_KUMA_PASSWORD')
|
||||
|
||||
# Service category mappings
|
||||
MONITOR_TAGS = {
|
||||
# Hardware/Monitoring
|
||||
1: ['hardware', 'monitoring'], # Server Fan Online
|
||||
2: ['hardware', 'monitoring'], # Server Front Fan Stalled
|
||||
3: ['hardware', 'monitoring'], # Server Back Fan Stalled
|
||||
4: ['hardware', 'monitoring'], # Server Fan (group)
|
||||
|
||||
# Applications
|
||||
5: ['application', 'web'], # TSA Chapter Organizer RMS
|
||||
6: ['database', 'application'], # couchdb
|
||||
7: ['media', 'application'], # Plex
|
||||
|
||||
# Infrastructure
|
||||
8: ['infrastructure', 'storage'], # unraid
|
||||
13: ['infrastructure', 'virtualization'], # Proxmox
|
||||
|
||||
# Network/Hosts
|
||||
10: ['infrastructure', 'vm'], # pallas VM
|
||||
11: ['infrastructure', 'storage'], # NAS
|
||||
14: ['infrastructure', 'vm'], # dev-win10 VM
|
||||
}
|
||||
|
||||
def connect():
|
||||
"""Connect to Uptime Kuma API"""
|
||||
api = UptimeKumaApi(UPTIME_KUMA_URL, ssl_verify=False)
|
||||
|
||||
if USERNAME and PASSWORD:
|
||||
api.login(USERNAME, PASSWORD)
|
||||
else:
|
||||
print("Error: Set UPTIME_KUMA_USERNAME and UPTIME_KUMA_PASSWORD")
|
||||
sys.exit(1)
|
||||
|
||||
return api
|
||||
|
||||
def ensure_tags_exist(api):
|
||||
"""Create all needed tags upfront"""
|
||||
tag_colors = {
|
||||
'hardware': '#F44336', # Red
|
||||
'monitoring': '#FF9800', # Orange
|
||||
'application': '#2196F3', # Blue
|
||||
'web': '#00BCD4', # Cyan
|
||||
'database': '#9C27B0', # Purple
|
||||
'media': '#E91E63', # Pink
|
||||
'infrastructure': '#4CAF50', # Green
|
||||
'storage': '#8BC34A', # Light Green
|
||||
'virtualization': '#009688', # Teal
|
||||
'vm': '#607D8B', # Blue Grey
|
||||
}
|
||||
|
||||
existing_tags = api.get_tags()
|
||||
# Tags might have 'id' or 'tag_id'
|
||||
existing_tag_names = {tag['name']: (tag.get('tag_id') or tag.get('id')) for tag in existing_tags}
|
||||
tag_map = {}
|
||||
|
||||
print("Ensuring all tags exist...")
|
||||
for tag_name, color in tag_colors.items():
|
||||
if tag_name in existing_tag_names:
|
||||
tag_map[tag_name] = existing_tag_names[tag_name]
|
||||
print(f" ✓ Tag '{tag_name}' already exists (ID: {tag_map[tag_name]})")
|
||||
else:
|
||||
result = api.add_tag(name=tag_name, color=color)
|
||||
# The result might be the tag dict itself or contain it
|
||||
if isinstance(result, dict):
|
||||
tag_id = result.get('tag_id') or result.get('id')
|
||||
else:
|
||||
tag_id = result
|
||||
tag_map[tag_name] = tag_id
|
||||
print(f" + Created tag '{tag_name}' (ID: {tag_map[tag_name]})")
|
||||
|
||||
return tag_map
|
||||
|
||||
def add_tags_to_monitor(api, monitor_id, tag_names, tag_map):
|
||||
"""Add tags to a monitor"""
|
||||
# Get monitor details
|
||||
monitor = api.get_monitor(monitor_id)
|
||||
|
||||
# Build tag list
|
||||
tag_ids = []
|
||||
for tag_name in tag_names:
|
||||
if tag_name in tag_map:
|
||||
tag_ids.append({'tag_id': tag_map[tag_name], 'value': ''})
|
||||
|
||||
# Preserve existing tags and add new ones
|
||||
existing_tag_ids = monitor.get('tags', [])
|
||||
existing_tag_id_set = {tag['tag_id'] for tag in existing_tag_ids}
|
||||
|
||||
for tag_info in tag_ids:
|
||||
if tag_info['tag_id'] not in existing_tag_id_set:
|
||||
existing_tag_ids.append(tag_info)
|
||||
|
||||
# Update monitor with tags
|
||||
monitor['tags'] = existing_tag_ids
|
||||
api.edit_monitor(monitor_id, **monitor)
|
||||
|
||||
print(f" ✓ Monitor {monitor_id} ({monitor['name']}): Added tags {tag_names}")
|
||||
|
||||
def main():
|
||||
api = connect()
|
||||
|
||||
try:
|
||||
# First, ensure all tags exist
|
||||
tag_map = ensure_tags_exist(api)
|
||||
|
||||
print("\nAdding tags to monitors...\n")
|
||||
|
||||
for monitor_id, tags in MONITOR_TAGS.items():
|
||||
try:
|
||||
add_tags_to_monitor(api, monitor_id, tags, tag_map)
|
||||
except Exception as e:
|
||||
print(f" ✗ Failed to tag monitor {monitor_id}: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
print("\n✅ Tag assignment complete!")
|
||||
|
||||
finally:
|
||||
api.disconnect()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user