In a recent Machine Learning project, I decided on an integration between FastAPI and MLFlow for flexible model serving. A key feature of this setup is the ability to initialise the API even without an available model. Instead, the system periodically checks and waits for a new model before loading it.
The entire codebase for this project can be accessed on my GitHub repo
Architecture
The system consists of three primary components, each housed in a Docker container.
To simplify the code I decided to enable it to start without docker.
- MLFlow Server: Serves as the primary model repository
- API: Hosted via FastAPI, this handles model serving and manual reloading
- Training Container: Once triggered, this trains the ML model, subsequently uploading it to the MLFlow server
Workflow Breakdown
- The API starts without a preloaded model
- Upon receiving data, the Training Container initiates the model’s training process
- Once the training completes, the model is uploaded to the MLFlow server
- A GET request to the
/reload
endpoint signals the FastAPI application to fetch and load the new model
API Lifecycle & Reloading
- On API startup, the ModelLoader class object is initialised and attempts to load artifacts. If unavailable, a warning is logged:
(WARNING): app: model is not available!
. - A background task (check_status_and_reload) continuously checks if a model reload is required
- An initial trigger to
/reload
is essential after model training to signal the API to attempt model reloading.
The asynchronous function starts with class object initialisaiton and continues with initialising a task that monitors the status and calls for model reload when necessary. The part behind yield stamens is executed when we destroy the object.
@asynccontextmanager
async def lifespan(app: FastAPI):
""" Main API endpoint that is responsible for loading model dynamically"""
server_uri = 'http://127.0.0.1:5001'
model_name = "ModelName"
stage = "Production"
app.model_loader = ModelLoader(server_uri, model_name, stage)
app.model_loader.load_artifacts()
app.model_loader_task = asyncio.create_task(check_status_and_reload(app.model_loader))
yield
"""Execute when API is shutdown, cancels the task to monitor model status"""
app.model_loader_task.cancel()
A class object is set with the variable self.is_prod_model = True which means MLFlow server will not be queried for a model.
Once we know model is available, endpoint /reload sets variable self.is_prod_model = False which triggers function to query MLFlow server for a new model. Once model is successfully loaded, the status is again set to self.is_prod_model = True
If we request model reload with /reload but model is not available, MLFlow will be queried every 10 seconds for the model until one can be loaded.
API Endpoints
The model has to be in ‘Production’ stage to be loaded. Once models are trained, and there is a decision to make one of them production model, we manually relaunch model with:
- /reload: Signals that a new model is ready. This changes the model status, prompting a reload during the next check
- /predict: Accepts input data and returns predictions from the loaded model
Running the Code
Launch MLFlow Server
mlflow server -p 5001 --host 0.0.0.0
Launch the FastAPI Server
gunicorn app:app -k uvicorn.workers.UvicornWorker -b 0.0.0.0:5002 --timeout 120`
Upon initialisation, you may notice a warning about a missing model:
[2023-10-20 07:52:24 +0200] [9776] [INFO] Waiting for application startup.
(WARNING): app: model is not available!
[2023-10-20 07:52:24 +0200] [9776] [INFO] Application startup complete.
If you initialise model reload with /reload, the logs will show
(INFO): app: model reload request received
(WARNING): app: model is not available!
(WARNING): app: model is not available!
(WARNING): app: model is not available!
This keeps going until a model is loaded which will result in success message in the logs.
I have decided on manual request of model reload which of course can be changed in the code.
To complete my code I followed suggestions on https://github.com/tiangolo/fastapi/issues/4257
In another post I will talk more about Docker configs so stay tuned!