first commit
This commit is contained in:
commit
cdf381192e
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
venv/
|
||||||
|
*.json
|
||||||
|
*.dev.*
|
||||||
6
Dockerfile
Normal file
6
Dockerfile
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
FROM python:3.12-slim
|
||||||
|
WORKDIR /app
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install -r requirements.txt
|
||||||
|
COPY . .
|
||||||
|
ENTRYPOINT ["python", "main.py"]
|
||||||
181
main.py
Normal file
181
main.py
Normal file
@ -0,0 +1,181 @@
|
|||||||
|
import pika
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
from typing import Optional, Tuple, Dict, Any
|
||||||
|
|
||||||
|
class RmqPurger:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str = "localhost",
|
||||||
|
user: str = "admin",
|
||||||
|
password: str = "admin",
|
||||||
|
queue_name: str = "my_queue"
|
||||||
|
):
|
||||||
|
self.host = host
|
||||||
|
self.user = user
|
||||||
|
self.password = password
|
||||||
|
self.queue_name = queue_name
|
||||||
|
self.connection: Optional[pika.BlockingConnection] = None
|
||||||
|
self.channel: Optional[pika.channel.Channel] = None
|
||||||
|
|
||||||
|
def connect(self) -> None:
|
||||||
|
"""Establish connection to RabbitMQ server"""
|
||||||
|
try:
|
||||||
|
credentials = pika.PlainCredentials(self.user, self.password)
|
||||||
|
self.connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host=self.host, credentials=credentials)
|
||||||
|
)
|
||||||
|
self.channel = self.connection.channel()
|
||||||
|
print(f"✅ Connected to RabbitMQ at {self.host}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Failed to connect to RabbitMQ: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
"""Close the RabbitMQ connection"""
|
||||||
|
if self.connection and not self.connection.is_closed:
|
||||||
|
self.connection.close()
|
||||||
|
print("✅ Disconnected from RabbitMQ")
|
||||||
|
|
||||||
|
def read_unacked_messages(self, job_id: str) -> None:
|
||||||
|
"""
|
||||||
|
Read all unacknowledged messages from the queue, excluding messages with specified job_id,
|
||||||
|
and save them to unack.json
|
||||||
|
"""
|
||||||
|
if not self.channel:
|
||||||
|
raise RuntimeError("Not connected to RabbitMQ. Call connect() first.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
job_id = job_id.split(',')
|
||||||
|
# Get queue information
|
||||||
|
queue_info = self.channel.queue_declare(queue=self.queue_name, passive=True)
|
||||||
|
message_count = queue_info.method.message_count
|
||||||
|
|
||||||
|
if message_count == 0:
|
||||||
|
print("No messages in queue.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Found {message_count} messages in queue.")
|
||||||
|
messages_to_save = []
|
||||||
|
messages_read = 0
|
||||||
|
messages_filtered = 0
|
||||||
|
|
||||||
|
# Read all available messages
|
||||||
|
while True:
|
||||||
|
method_frame, _, body = self.channel.basic_get(
|
||||||
|
queue=self.queue_name,
|
||||||
|
auto_ack=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if method_frame is None:
|
||||||
|
break
|
||||||
|
|
||||||
|
messages_read += 1
|
||||||
|
try:
|
||||||
|
message = json.loads(body.decode())
|
||||||
|
# Save message if job_id doesn't match
|
||||||
|
if str(message.get("job_id")) not in job_id:
|
||||||
|
messages_to_save.append(message)
|
||||||
|
messages_filtered += 1
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(f"⚠️ Error decoding message: {body}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"⚠️ Error processing message: {e}")
|
||||||
|
|
||||||
|
# Write filtered messages to unack.json
|
||||||
|
with open("unack.json", "w") as f:
|
||||||
|
json.dump(messages_to_save, f, indent=2)
|
||||||
|
|
||||||
|
print(f"\n✅ Read {messages_read} messages")
|
||||||
|
print(f"✅ Saved {messages_filtered} messages to unack.json (excluding job_id={job_id})")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Error reading messages: {e}")
|
||||||
|
raise
|
||||||
|
def publish_messages(self) -> None:
|
||||||
|
"""Read messages from unack.json and publish them to the queue"""
|
||||||
|
if not self.channel:
|
||||||
|
raise RuntimeError("Not connected to RabbitMQ. Call connect() first.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read messages from unack.json
|
||||||
|
with open("unack.json", "r") as f:
|
||||||
|
messages = json.load(f)
|
||||||
|
|
||||||
|
if not messages:
|
||||||
|
print("No messages found in unack.json")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Found {len(messages)} messages in unack.json")
|
||||||
|
messages_published = 0
|
||||||
|
|
||||||
|
# Publish each message
|
||||||
|
for message in messages:
|
||||||
|
self.channel.basic_publish(
|
||||||
|
exchange="",
|
||||||
|
routing_key=self.queue_name,
|
||||||
|
body=json.dumps(message),
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=2 # make message persistent
|
||||||
|
)
|
||||||
|
)
|
||||||
|
messages_published += 1
|
||||||
|
print(f"Published message {messages_published}/{len(messages)}: {message}")
|
||||||
|
|
||||||
|
print(f"\n✅ Successfully published {messages_published} messages to queue '{self.queue_name}'")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Error publishing messages: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
"""Parse command line arguments"""
|
||||||
|
parser = argparse.ArgumentParser(description='RabbitMQ message purger and manager')
|
||||||
|
|
||||||
|
parser.add_argument('--host',
|
||||||
|
default='localhost',
|
||||||
|
help='RabbitMQ host (default: localhost)')
|
||||||
|
|
||||||
|
parser.add_argument('--user',
|
||||||
|
default='admin',
|
||||||
|
help='RabbitMQ username (default: admin)')
|
||||||
|
|
||||||
|
parser.add_argument('--password',
|
||||||
|
default='admin',
|
||||||
|
help='RabbitMQ password (default: admin)')
|
||||||
|
|
||||||
|
parser.add_argument('--queue',
|
||||||
|
default='my_queue',
|
||||||
|
help='Queue name (default: my_queue)')
|
||||||
|
|
||||||
|
parser.add_argument('--job-id',
|
||||||
|
required=True,
|
||||||
|
help='Comma-separated list of job IDs to exclude')
|
||||||
|
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = parse_args()
|
||||||
|
|
||||||
|
# Initialize RmqPurger with command line arguments
|
||||||
|
purger = RmqPurger(
|
||||||
|
host=args.host,
|
||||||
|
user=args.user,
|
||||||
|
password=args.password,
|
||||||
|
queue_name=args.queue
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
purger.connect()
|
||||||
|
|
||||||
|
# First read and save unacked messages (excluding specified job_ids)
|
||||||
|
purger.read_unacked_messages(args.job_id)
|
||||||
|
|
||||||
|
# Then publish saved messages back to the queue
|
||||||
|
purger.publish_messages()
|
||||||
|
finally:
|
||||||
|
purger.disconnect()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
1
requirements.txt
Normal file
1
requirements.txt
Normal file
@ -0,0 +1 @@
|
|||||||
|
pika==1.3.2
|
||||||
Loading…
x
Reference in New Issue
Block a user