Source code for service.preclass.main

from bson import ObjectId
import pika
import sys
import os

from pymongo import MongoClient

from service import get_services
from utils import get_logger, now, change_file_path, get_channel
from config import MONGO

from data.lecture import create_lecture

[docs] class PRECLASS_MAIN: """Main service class for handling the pre-class lecture processing pipeline. This class manages a multi-stage workflow for processing lecture materials, including converting presentations to different formats and generating various educational materials. """
[docs] class STAGE: """Enumeration of processing stages for the pre-class pipeline.""" START=0 PPTX2PDF=1 PDF2PNG=2 PPT2TEXT=3 GEN_DESCRIPTION=4 GEN_STRUCTURE=5 GEN_SHOWFILE=6 GEN_READSCRIPT=7 GEN_ASKQUESTION=8 PUSH_AGENDA=99 FINISHED=100
_collection = MongoClient( MONGO.HOST, MONGO.PORT ).preclass.main _queue_name = "preclass-main" _logger = get_logger( __name__=__name__, __file__=__file__, )
[docs] @staticmethod def get_status(job_id: str): job = PRECLASS_MAIN._collection.find_one(dict(_id=ObjectId(job_id))) if job is None: return {"status": "job not found"} return job["stage"]
[docs] @staticmethod def trigger(parent_service: str, source_file: str) -> str: """Initiates a new pre-class processing job. Args: parent_service (str): Identifier of the parent service initiating this job source_file (str): Path to the source presentation file to be processed Returns: str: The ID of the created job Notes: Creates a new job in MongoDB and pushes it to RabbitMQ queue for processing. Moves the source file to a buffer location for processing. """ connection, channel = get_channel(PRECLASS_MAIN._queue_name) PRECLASS_MAIN._logger.info("Moving File To Buffer Location") filename = os.path.basename(source_file) basename, _ = os.path.splitext(filename) lecture_id = create_lecture(source_file_name=basename) buffer_path = f"buffer/{lecture_id}" change_file_path( source_file, buffer_path, "seed_file" ) PRECLASS_MAIN._logger.info("Pushing job to MONGO") job_id = PRECLASS_MAIN._collection.insert_one( dict( parent_service=parent_service, created_time = now(), lecture_id=lecture_id, stage=PRECLASS_MAIN.STAGE.START, value=dict(), ) ).inserted_id PRECLASS_MAIN._logger.info("Pushing job to RabbitMQ") channel.basic_publish( exchange="", routing_key=PRECLASS_MAIN._queue_name, body=str(job_id) ) connection.close() PRECLASS_MAIN._logger.info("Job pushed to RabbitMQ") return job_id
[docs] @staticmethod def launch_worker(): """Launches the worker process for handling pre-class processing jobs. Establishes a connection to RabbitMQ and begins consuming messages from the queue. Each message triggers the appropriate stage of processing based on the job's current stage. The worker can be terminated using CTRL+C. Raises: KeyboardInterrupt: When the worker is manually stopped """ try: connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare( queue=PRECLASS_MAIN._queue_name, durable=True ) def callback(ch, method, properties, body): job_id = ObjectId(body.decode()) job = PRECLASS_MAIN._collection.find_one(dict(_id=job_id)) lecture_id, stage, value = job["lecture_id"], job["stage"], job["value"] PRECLASS_MAIN._logger.debug(f"Recieved PreClass Main Job - {lecture_id}") if stage==PRECLASS_MAIN.STAGE.START: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.PPTX2PDF )} ) sub_job_id = get_services()["preclass_pptx2pdf"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id, ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger PPTX2PDF Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) elif stage==PRECLASS_MAIN.STAGE.PPTX2PDF: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.PDF2PNG )} ) sub_job_id = get_services()["preclass_pdf2png"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id, ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger PDF2PNG Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) elif stage==PRECLASS_MAIN.STAGE.PDF2PNG: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.PPT2TEXT )} ) sub_job_id = get_services()["preclass_ppt2text"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id, ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger PPT2TEXT Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) elif stage==PRECLASS_MAIN.STAGE.PPT2TEXT: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.GEN_DESCRIPTION )} ) sub_job_id = get_services()["preclass_gen_description"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id, ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger GEN_DESCRIPTION Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) elif stage==PRECLASS_MAIN.STAGE.GEN_DESCRIPTION: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.GEN_STRUCTURE )} ) sub_job_id = get_services()["preclass_gen_structure"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger GEN_STRUCTURE Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) elif stage==PRECLASS_MAIN.STAGE.GEN_STRUCTURE: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.GEN_SHOWFILE )} ) sub_job_id = get_services()["preclass_gen_showfile"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger GEN_SHOWFILE Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) elif stage==PRECLASS_MAIN.STAGE.GEN_SHOWFILE: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.GEN_READSCRIPT )} ) sub_job_id = get_services()["preclass_gen_readscript"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger GEN_READSCRIPT Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) elif stage==PRECLASS_MAIN.STAGE.GEN_READSCRIPT: PRECLASS_MAIN._collection.update_one( dict(_id=job_id), {"$set":dict( stage=PRECLASS_MAIN.STAGE.GEN_ASKQUESTION )} ) sub_job_id = get_services()["preclass_gen_askquestion"].trigger( parent_service=PRECLASS_MAIN._queue_name, lecture_id=lecture_id, parent_job_id=job_id ) PRECLASS_MAIN._logger.debug(f"{job_id} Trigger GEN_ASKQUESTION Service {sub_job_id}") ch.basic_ack(delivery_tag = method.delivery_tag) else: PRECLASS_MAIN._logger.info(f"Stage: {stage}") channel.basic_consume( queue=PRECLASS_MAIN._queue_name, on_message_callback=callback, auto_ack=False, ) PRECLASS_MAIN._logger.info('Worker Launched. To exit press CTRL+C') channel.start_consuming() except KeyboardInterrupt: PRECLASS_MAIN._logger.warning('Shutting Off Worker') try: sys.exit(0) except SystemExit: os._exit(0)
if __name__=="__main__": PRECLASS_MAIN._logger.warning("STARTING PRECLASS-MAIN SERVICE") PRECLASS_MAIN.launch_worker()