RMQ-cleanup/main.py
2025-09-17 23:14:37 +06:00

191 lines
6.5 KiB
Python

import pika
import json
import sys
import argparse
from typing import Optional, Tuple, Dict, Any
class RMQCleaner:
def __init__(
self,
host: str = "localhost",
user: str = "admin",
password: str = "admin",
queue_name: str = "my_queue"
):
self.host = host
self.user = user
self.password = password
self.queue_name = queue_name
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[pika.channel.Channel] = None
def connect(self) -> None:
"""Establish connection to RabbitMQ server"""
try:
credentials = pika.PlainCredentials(self.user, self.password)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, credentials=credentials)
)
self.channel = self.connection.channel()
print(f"✅ Connected to RabbitMQ at {self.host}")
except Exception as e:
print(f"❌ Failed to connect to RabbitMQ: {e}")
raise
def disconnect(self) -> None:
"""Close the RabbitMQ connection"""
if self.connection and not self.connection.is_closed:
self.connection.close()
print("✅ Disconnected from RabbitMQ")
def read_unacked_messages(self, job_id: str, type: str) -> None:
"""
Read all unacknowledged messages from the queue, excluding messages with specified job_id,
and save them to unack.json
"""
if not self.channel:
raise RuntimeError("Not connected to RabbitMQ. Call connect() first.")
try:
job_id = job_id.split(',')
print(f"Job IDs: {job_id}")
print(f"Type: {type}")
# Get queue information
queue_info = self.channel.queue_declare(queue=self.queue_name, passive=True)
message_count = queue_info.method.message_count
if message_count == 0:
print("No messages in queue.")
return
print(f"Found {message_count} messages in queue.")
messages_to_save = []
messages_read = 0
messages_filtered = 0
# Read all available messages
while True:
method_frame, _, body = self.channel.basic_get(
queue=self.queue_name,
auto_ack=True
)
if method_frame is None:
break
messages_read += 1
try:
message = json.loads(body.decode())
# Save message if job_id doesn't match
if message.get("type") != type:
messages_to_save.append(message)
else:
if str(message.get("job_id")) not in job_id:
messages_to_save.append(message)
messages_filtered += 1
except json.JSONDecodeError:
print(f"⚠️ Error decoding message: {body}")
except Exception as e:
print(f"⚠️ Error processing message: {e}")
# Write filtered messages to unack.json
with open("unack.json", "w") as f:
json.dump(messages_to_save, f, indent=2)
print(f"\n✅ Read {messages_read} messages")
print(f"✅ Saved {messages_filtered} messages to unack.json (excluding job_id={job_id})")
except Exception as e:
print(f"❌ Error reading messages: {e}")
raise
def publish_messages(self) -> None:
"""Read messages from unack.json and publish them to the queue"""
if not self.channel:
raise RuntimeError("Not connected to RabbitMQ. Call connect() first.")
try:
# Read messages from unack.json
with open("unack.json", "r") as f:
messages = json.load(f)
if not messages:
print("No messages found in unack.json")
return
print(f"Found {len(messages)} messages in unack.json")
messages_published = 0
# Publish each message
for message in messages:
self.channel.basic_publish(
exchange="",
routing_key=self.queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2 # make message persistent
)
)
messages_published += 1
print(f"Published message {messages_published}/{len(messages)}: {message}")
print(f"\n✅ Successfully published {messages_published} messages to queue '{self.queue_name}'")
except Exception as e:
print(f"❌ Error publishing messages: {e}")
raise
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='RabbitMQ message purger and manager')
parser.add_argument('--host',
default='localhost',
help='RabbitMQ host (default: localhost)')
parser.add_argument('--user',
default='admin',
help='RabbitMQ username (default: admin)')
parser.add_argument('--password',
default='admin',
help='RabbitMQ password (default: admin)')
parser.add_argument('--queue',
default='my_queue',
help='Queue name (default: my_queue)')
parser.add_argument('--job-id',
required=True,
help='Comma-separated list of job IDs to exclude')
parser.add_argument('--type',
required=True,
help='Type of operation (example: createClusters)')
return parser.parse_args()
def main():
args = parse_args()
# Initialize RMQCleaner with command line arguments
cleaner = RMQCleaner(
host=args.host,
user=args.user,
password=args.password,
queue_name=args.queue
)
try:
cleaner.connect()
# First read and save unacked messages (excluding specified job_ids)
cleaner.read_unacked_messages(args.job_id, args.type)
# Then publish saved messages back to the queue
cleaner.publish_messages()
finally:
cleaner.disconnect()
if __name__ == "__main__":
main()