191 lines
6.5 KiB
Python
191 lines
6.5 KiB
Python
import pika
|
|
import json
|
|
import sys
|
|
import argparse
|
|
from typing import Optional, Tuple, Dict, Any
|
|
|
|
class RMQCleaner:
|
|
def __init__(
|
|
self,
|
|
host: str = "localhost",
|
|
user: str = "admin",
|
|
password: str = "admin",
|
|
queue_name: str = "my_queue"
|
|
):
|
|
self.host = host
|
|
self.user = user
|
|
self.password = password
|
|
self.queue_name = queue_name
|
|
self.connection: Optional[pika.BlockingConnection] = None
|
|
self.channel: Optional[pika.channel.Channel] = None
|
|
|
|
def connect(self) -> None:
|
|
"""Establish connection to RabbitMQ server"""
|
|
try:
|
|
credentials = pika.PlainCredentials(self.user, self.password)
|
|
self.connection = pika.BlockingConnection(
|
|
pika.ConnectionParameters(host=self.host, credentials=credentials)
|
|
)
|
|
self.channel = self.connection.channel()
|
|
print(f"✅ Connected to RabbitMQ at {self.host}")
|
|
except Exception as e:
|
|
print(f"❌ Failed to connect to RabbitMQ: {e}")
|
|
raise
|
|
|
|
def disconnect(self) -> None:
|
|
"""Close the RabbitMQ connection"""
|
|
if self.connection and not self.connection.is_closed:
|
|
self.connection.close()
|
|
print("✅ Disconnected from RabbitMQ")
|
|
|
|
def read_unacked_messages(self, job_id: str, type: str) -> None:
|
|
"""
|
|
Read all unacknowledged messages from the queue, excluding messages with specified job_id,
|
|
and save them to unack.json
|
|
"""
|
|
if not self.channel:
|
|
raise RuntimeError("Not connected to RabbitMQ. Call connect() first.")
|
|
|
|
try:
|
|
job_id = job_id.split(',')
|
|
print(f"Job IDs: {job_id}")
|
|
print(f"Type: {type}")
|
|
# Get queue information
|
|
queue_info = self.channel.queue_declare(queue=self.queue_name, passive=True)
|
|
message_count = queue_info.method.message_count
|
|
|
|
if message_count == 0:
|
|
print("No messages in queue.")
|
|
return
|
|
|
|
print(f"Found {message_count} messages in queue.")
|
|
messages_to_save = []
|
|
messages_read = 0
|
|
messages_filtered = 0
|
|
|
|
# Read all available messages
|
|
while True:
|
|
method_frame, _, body = self.channel.basic_get(
|
|
queue=self.queue_name,
|
|
auto_ack=True
|
|
)
|
|
|
|
if method_frame is None:
|
|
break
|
|
|
|
messages_read += 1
|
|
try:
|
|
message = json.loads(body.decode())
|
|
# Save message if job_id doesn't match
|
|
if message.get("type") != type:
|
|
messages_to_save.append(message)
|
|
else:
|
|
if str(message.get("job_id")) not in job_id:
|
|
messages_to_save.append(message)
|
|
messages_filtered += 1
|
|
except json.JSONDecodeError:
|
|
print(f"⚠️ Error decoding message: {body}")
|
|
except Exception as e:
|
|
print(f"⚠️ Error processing message: {e}")
|
|
|
|
# Write filtered messages to unack.json
|
|
with open("unack.json", "w") as f:
|
|
json.dump(messages_to_save, f, indent=2)
|
|
|
|
print(f"\n✅ Read {messages_read} messages")
|
|
print(f"✅ Saved {messages_filtered} messages to unack.json (excluding job_id={job_id})")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error reading messages: {e}")
|
|
raise
|
|
def publish_messages(self) -> None:
|
|
"""Read messages from unack.json and publish them to the queue"""
|
|
if not self.channel:
|
|
raise RuntimeError("Not connected to RabbitMQ. Call connect() first.")
|
|
|
|
try:
|
|
# Read messages from unack.json
|
|
with open("unack.json", "r") as f:
|
|
messages = json.load(f)
|
|
|
|
if not messages:
|
|
print("No messages found in unack.json")
|
|
return
|
|
|
|
print(f"Found {len(messages)} messages in unack.json")
|
|
messages_published = 0
|
|
|
|
# Publish each message
|
|
for message in messages:
|
|
self.channel.basic_publish(
|
|
exchange="",
|
|
routing_key=self.queue_name,
|
|
body=json.dumps(message),
|
|
properties=pika.BasicProperties(
|
|
delivery_mode=2 # make message persistent
|
|
)
|
|
)
|
|
messages_published += 1
|
|
print(f"Published message {messages_published}/{len(messages)}: {message}")
|
|
|
|
print(f"\n✅ Successfully published {messages_published} messages to queue '{self.queue_name}'")
|
|
except Exception as e:
|
|
print(f"❌ Error publishing messages: {e}")
|
|
raise
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command line arguments"""
|
|
parser = argparse.ArgumentParser(description='RabbitMQ message purger and manager')
|
|
|
|
parser.add_argument('--host',
|
|
default='localhost',
|
|
help='RabbitMQ host (default: localhost)')
|
|
|
|
parser.add_argument('--user',
|
|
default='admin',
|
|
help='RabbitMQ username (default: admin)')
|
|
|
|
parser.add_argument('--password',
|
|
default='admin',
|
|
help='RabbitMQ password (default: admin)')
|
|
|
|
parser.add_argument('--queue',
|
|
default='my_queue',
|
|
help='Queue name (default: my_queue)')
|
|
|
|
parser.add_argument('--job-id',
|
|
required=True,
|
|
help='Comma-separated list of job IDs to exclude')
|
|
|
|
parser.add_argument('--type',
|
|
required=True,
|
|
help='Type of operation (example: createClusters)')
|
|
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
args = parse_args()
|
|
|
|
# Initialize RMQCleaner with command line arguments
|
|
cleaner = RMQCleaner(
|
|
host=args.host,
|
|
user=args.user,
|
|
password=args.password,
|
|
queue_name=args.queue
|
|
)
|
|
|
|
try:
|
|
cleaner.connect()
|
|
|
|
# First read and save unacked messages (excluding specified job_ids)
|
|
cleaner.read_unacked_messages(args.job_id, args.type)
|
|
|
|
# Then publish saved messages back to the queue
|
|
cleaner.publish_messages()
|
|
finally:
|
|
cleaner.disconnect()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|