diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..47217b3 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +venv/ +*.json +*.dev.* diff --git a/main.py b/main.py index 100b710..b20e2e1 100644 --- a/main.py +++ b/main.py @@ -4,7 +4,7 @@ import sys import argparse from typing import Optional, Tuple, Dict, Any -class RmqPurger: +class RMQCleaner: def __init__( self, host: str = "localhost", @@ -38,7 +38,7 @@ class RmqPurger: self.connection.close() print("✅ Disconnected from RabbitMQ") - def read_unacked_messages(self, job_id: str) -> None: + def read_unacked_messages(self, job_id: str, type: str) -> None: """ Read all unacknowledged messages from the queue, excluding messages with specified job_id, and save them to unack.json @@ -48,6 +48,8 @@ class RmqPurger: try: job_id = job_id.split(',') + print(f"Job IDs: {job_id}") + print(f"Type: {type}") # Get queue information queue_info = self.channel.queue_declare(queue=self.queue_name, passive=True) message_count = queue_info.method.message_count @@ -75,9 +77,12 @@ class RmqPurger: try: message = json.loads(body.decode()) # Save message if job_id doesn't match - if str(message.get("job_id")) not in job_id: + if message.get("type") != type: messages_to_save.append(message) - messages_filtered += 1 + else: + if str(message.get("job_id")) not in job_id: + messages_to_save.append(message) + messages_filtered += 1 except json.JSONDecodeError: print(f"⚠️ Error decoding message: {body}") except Exception as e: @@ -153,13 +158,17 @@ def parse_args(): required=True, help='Comma-separated list of job IDs to exclude') + parser.add_argument('--type', + required=True, + help='Type of operation (example: createClusters)') + return parser.parse_args() def main(): args = parse_args() - # Initialize RmqPurger with command line arguments - purger = RmqPurger( + # Initialize RMQCleaner with command line arguments + cleaner = RMQCleaner( host=args.host, user=args.user, password=args.password, @@ -167,15 +176,15 @@ def main(): ) try: - purger.connect() + cleaner.connect() # First read and save unacked messages (excluding specified job_ids) - purger.read_unacked_messages(args.job_id) + cleaner.read_unacked_messages(args.job_id, args.type) # Then publish saved messages back to the queue - purger.publish_messages() + cleaner.publish_messages() finally: - purger.disconnect() + cleaner.disconnect() if __name__ == "__main__": main()