import pika import json import sys import argparse from typing import Optional, Tuple, Dict, Any class RMQCleaner: def __init__( self, host: str = "localhost", user: str = "admin", password: str = "admin", queue_name: str = "my_queue" ): self.host = host self.user = user self.password = password self.queue_name = queue_name self.connection: Optional[pika.BlockingConnection] = None self.channel: Optional[pika.channel.Channel] = None def connect(self) -> None: """Establish connection to RabbitMQ server""" try: credentials = pika.PlainCredentials(self.user, self.password) self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host, credentials=credentials) ) self.channel = self.connection.channel() print(f"✅ Connected to RabbitMQ at {self.host}") except Exception as e: print(f"❌ Failed to connect to RabbitMQ: {e}") raise def disconnect(self) -> None: """Close the RabbitMQ connection""" if self.connection and not self.connection.is_closed: self.connection.close() print("✅ Disconnected from RabbitMQ") def read_unacked_messages(self, job_id: str, type: str) -> None: """ Read all unacknowledged messages from the queue, excluding messages with specified job_id, and save them to unack.json """ if not self.channel: raise RuntimeError("Not connected to RabbitMQ. Call connect() first.") try: job_id = job_id.split(',') print(f"Job IDs: {job_id}") print(f"Type: {type}") # Get queue information queue_info = self.channel.queue_declare(queue=self.queue_name, passive=True) message_count = queue_info.method.message_count if message_count == 0: print("No messages in queue.") return print(f"Found {message_count} messages in queue.") messages_to_save = [] messages_read = 0 messages_filtered = 0 # Read all available messages while True: method_frame, _, body = self.channel.basic_get( queue=self.queue_name, auto_ack=True ) if method_frame is None: break messages_read += 1 try: message = json.loads(body.decode()) # Save message if job_id doesn't match if message.get("type") != type: messages_to_save.append(message) else: if str(message.get("job_id")) not in job_id: messages_to_save.append(message) messages_filtered += 1 except json.JSONDecodeError: print(f"⚠️ Error decoding message: {body}") except Exception as e: print(f"⚠️ Error processing message: {e}") # Write filtered messages to unack.json with open("unack.json", "w") as f: json.dump(messages_to_save, f, indent=2) print(f"\n✅ Read {messages_read} messages") print(f"✅ Saved {messages_filtered} messages to unack.json (excluding job_id={job_id})") except Exception as e: print(f"❌ Error reading messages: {e}") raise def publish_messages(self) -> None: """Read messages from unack.json and publish them to the queue""" if not self.channel: raise RuntimeError("Not connected to RabbitMQ. Call connect() first.") try: # Read messages from unack.json with open("unack.json", "r") as f: messages = json.load(f) if not messages: print("No messages found in unack.json") return print(f"Found {len(messages)} messages in unack.json") messages_published = 0 # Publish each message for message in messages: self.channel.basic_publish( exchange="", routing_key=self.queue_name, body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2 # make message persistent ) ) messages_published += 1 print(f"Published message {messages_published}/{len(messages)}: {message}") print(f"\n✅ Successfully published {messages_published} messages to queue '{self.queue_name}'") except Exception as e: print(f"❌ Error publishing messages: {e}") raise def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='RabbitMQ message purger and manager') parser.add_argument('--host', default='localhost', help='RabbitMQ host (default: localhost)') parser.add_argument('--user', default='admin', help='RabbitMQ username (default: admin)') parser.add_argument('--password', default='admin', help='RabbitMQ password (default: admin)') parser.add_argument('--queue', default='my_queue', help='Queue name (default: my_queue)') parser.add_argument('--job-id', required=True, help='Comma-separated list of job IDs to exclude') parser.add_argument('--type', required=True, help='Type of operation (example: createClusters)') return parser.parse_args() def main(): args = parse_args() # Initialize RMQCleaner with command line arguments cleaner = RMQCleaner( host=args.host, user=args.user, password=args.password, queue_name=args.queue ) try: cleaner.connect() # First read and save unacked messages (excluding specified job_ids) cleaner.read_unacked_messages(args.job_id, args.type) # Then publish saved messages back to the queue cleaner.publish_messages() finally: cleaner.disconnect() if __name__ == "__main__": main()