commit cdf381192ec924b7970a8b86ffe23a337ae7956f Author: syed.shafin Date: Wed Sep 17 22:01:58 2025 +0600 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..47217b3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +venv/ +*.json +*.dev.* diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..aa483e9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.12-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt +COPY . . +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..100b710 --- /dev/null +++ b/main.py @@ -0,0 +1,181 @@ +import pika +import json +import sys +import argparse +from typing import Optional, Tuple, Dict, Any + +class RmqPurger: + def __init__( + self, + host: str = "localhost", + user: str = "admin", + password: str = "admin", + queue_name: str = "my_queue" + ): + self.host = host + self.user = user + self.password = password + self.queue_name = queue_name + self.connection: Optional[pika.BlockingConnection] = None + self.channel: Optional[pika.channel.Channel] = None + + def connect(self) -> None: + """Establish connection to RabbitMQ server""" + try: + credentials = pika.PlainCredentials(self.user, self.password) + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, credentials=credentials) + ) + self.channel = self.connection.channel() + print(f"✅ Connected to RabbitMQ at {self.host}") + except Exception as e: + print(f"❌ Failed to connect to RabbitMQ: {e}") + raise + + def disconnect(self) -> None: + """Close the RabbitMQ connection""" + if self.connection and not self.connection.is_closed: + self.connection.close() + print("✅ Disconnected from RabbitMQ") + + def read_unacked_messages(self, job_id: str) -> None: + """ + Read all unacknowledged messages from the queue, excluding messages with specified job_id, + and save them to unack.json + """ + if not self.channel: + raise RuntimeError("Not connected to RabbitMQ. Call connect() first.") + + try: + job_id = job_id.split(',') + # Get queue information + queue_info = self.channel.queue_declare(queue=self.queue_name, passive=True) + message_count = queue_info.method.message_count + + if message_count == 0: + print("No messages in queue.") + return + + print(f"Found {message_count} messages in queue.") + messages_to_save = [] + messages_read = 0 + messages_filtered = 0 + + # Read all available messages + while True: + method_frame, _, body = self.channel.basic_get( + queue=self.queue_name, + auto_ack=True + ) + + if method_frame is None: + break + + messages_read += 1 + try: + message = json.loads(body.decode()) + # Save message if job_id doesn't match + if str(message.get("job_id")) not in job_id: + messages_to_save.append(message) + messages_filtered += 1 + except json.JSONDecodeError: + print(f"⚠️ Error decoding message: {body}") + except Exception as e: + print(f"⚠️ Error processing message: {e}") + + # Write filtered messages to unack.json + with open("unack.json", "w") as f: + json.dump(messages_to_save, f, indent=2) + + print(f"\n✅ Read {messages_read} messages") + print(f"✅ Saved {messages_filtered} messages to unack.json (excluding job_id={job_id})") + + except Exception as e: + print(f"❌ Error reading messages: {e}") + raise + def publish_messages(self) -> None: + """Read messages from unack.json and publish them to the queue""" + if not self.channel: + raise RuntimeError("Not connected to RabbitMQ. Call connect() first.") + + try: + # Read messages from unack.json + with open("unack.json", "r") as f: + messages = json.load(f) + + if not messages: + print("No messages found in unack.json") + return + + print(f"Found {len(messages)} messages in unack.json") + messages_published = 0 + + # Publish each message + for message in messages: + self.channel.basic_publish( + exchange="", + routing_key=self.queue_name, + body=json.dumps(message), + properties=pika.BasicProperties( + delivery_mode=2 # make message persistent + ) + ) + messages_published += 1 + print(f"Published message {messages_published}/{len(messages)}: {message}") + + print(f"\n✅ Successfully published {messages_published} messages to queue '{self.queue_name}'") + except Exception as e: + print(f"❌ Error publishing messages: {e}") + raise + + +def parse_args(): + """Parse command line arguments""" + parser = argparse.ArgumentParser(description='RabbitMQ message purger and manager') + + parser.add_argument('--host', + default='localhost', + help='RabbitMQ host (default: localhost)') + + parser.add_argument('--user', + default='admin', + help='RabbitMQ username (default: admin)') + + parser.add_argument('--password', + default='admin', + help='RabbitMQ password (default: admin)') + + parser.add_argument('--queue', + default='my_queue', + help='Queue name (default: my_queue)') + + parser.add_argument('--job-id', + required=True, + help='Comma-separated list of job IDs to exclude') + + return parser.parse_args() + +def main(): + args = parse_args() + + # Initialize RmqPurger with command line arguments + purger = RmqPurger( + host=args.host, + user=args.user, + password=args.password, + queue_name=args.queue + ) + + try: + purger.connect() + + # First read and save unacked messages (excluding specified job_ids) + purger.read_unacked_messages(args.job_id) + + # Then publish saved messages back to the queue + purger.publish_messages() + finally: + purger.disconnect() + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..cde8834 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pika==1.3.2 \ No newline at end of file