Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions bats_ai/core/management/commands/examplelog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.conf import settings
import djclick as click
import mlflow

from bats_ai.tasks.tasks import example_train


@click.command()
@click.option('--experiment-name', type=click.STRING, required=False, default='Default')
def command(experiment_name):
click.echo('Finding experiment')
mlflow.set_tracking_uri(settings.MLFLOW_ENDPOINT)
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment:
click.echo(f'Creating a log for experiment {experiment_name}')
example_train.delay(experiment_name)
else:
click.echo(
f'Could not find experiment {experiment_name}.'
' Use the create experiment command to create a new experiement.'
)
23 changes: 23 additions & 0 deletions bats_ai/core/management/commands/makeexperiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from django.conf import settings
import djclick as click
from mlflow import MlflowClient


@click.option('--name', type=click.STRING, required=True, help='a name for the mlflow experiment')
@click.option(
'--description',
type=click.STRING,
required=False,
help='a description for the mlflow experiment',
)
@click.command()
def command(name, description: str | None = None):
mlflow_client = MlflowClient(tracking_uri=settings.MLFLOW_ENDPOINT)

experiment_tags = {
'project-name': 'batsai',
}
if description:
experiment_tags['mlflow.note.content'] = description

mlflow_client.create_experiment(name=name, tags=experiment_tags)
31 changes: 31 additions & 0 deletions bats_ai/core/management/commands/registeronnxmodel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
from pathlib import Path

from django.conf import settings
import djclick as click
import mlflow
import mlflow.onnx as mlflow_onnx
import onnx


@click.command()
def command():
relative = ('..',) * 5
asset_path = os.path.abspath(os.path.join(__file__, *relative, 'assets'))
onnx_filename = os.path.join(asset_path, 'model.mobilenet.onnx')
assert Path(onnx_filename).exists()
onnx_model = onnx.load(onnx_filename)

mlflow.set_tracking_uri(settings.MLFLOW_ENDPOINT)
mlflow.set_experiment('Default')
with mlflow.start_run() as run:
run_id = run.info.run_id
click.echo(f'Run ID: {run_id}')
mlflow_onnx.log_model(
onnx_model=onnx_model,
name='onnx_model',
registered_model_name="onnx-prototype"
)
# model_uri = 'models:/onnx_model'
# result = mlflow.register_model(model_uri=model_uri, name='prototype')
# click.echo(f'Registered {model_uri} version {result.version}')
46 changes: 46 additions & 0 deletions bats_ai/core/management/commands/setupmlflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import boto3
from django.conf import settings
import djclick as click
import psycopg2
from psycopg2 import extensions, sql


@click.command()
def setupmlflow():
db_name = settings.MLFLOW_DB if settings.MLFLOW_DB else 'mlflow'
bucket_name = settings.MLFLOW_BUCKET if settings.MLFLOW_BUCKET else 'mlflow'

click.echo(f'Creating database {db_name} for mlflow')
default_db = settings.DATABASES['default']
host = default_db['HOST']
user = default_db['USER']
password = default_db['PASSWORD']
conn = psycopg2.connect(f"dbname='postgres' user='{user}' password='{password}' host='{host}'")
# We cannot use CREATE DATABASE in a transaction
conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
with conn.cursor() as cursor:
cursor.execute(sql.SQL('SELECT 1 FROM pg_database WHERE datname = %s'), (db_name,))
if not cursor.fetchone():
cursor.execute(f'CREATE DATABASE {sql.Identifier(db_name).as_string(cursor)}')
click.echo(f'Created database {db_name} for mlflow')
else:
click.echo(f'Database {db_name} already exists')
conn.close()

click.echo(f'Creating storage bucket {bucket_name} for mlflow artifacts')
access_key = settings.MINIO_STORAGE_ACCESS_KEY
secret_key = settings.MINIO_STORAGE_SECRET_KEY
storage_endpoint: str = settings.MINIO_STORAGE_ENDPOINT
if not storage_endpoint.startswith('http'):
storage_endpoint = f'http://{storage_endpoint}'
s3_client = boto3.client(
's3',
endpoint_url=storage_endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
if not any([bucket['Name'] == bucket_name for bucket in s3_client.list_buckets()['Buckets']]):
s3_client.create_bucket(Bucket=bucket_name)
click.echo(f'Created bucket {bucket_name} for mlflow artifacts')
else:
click.echo(f'Bucket {bucket_name} already exists')
12 changes: 12 additions & 0 deletions bats_ai/settings/development.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
staticfiles_index = INSTALLED_APPS.index('django.contrib.staticfiles')
INSTALLED_APPS.insert(staticfiles_index, 'whitenoise.runserver_nostatic')

DATABASES = {
**DATABASES,
'mlflow': {
**env.db_url('DJANGO_MLFLOW_DB_URL', engine='django.contrib.gis.db.backends.postgis'),
'CONN_MAX_AGE': timedelta(minutes=10).total_seconds(),
}
}

MLFLOW_BUCKET: str = env.str('DJANGO_MLFLOW_BUCKET')
MLFLOW_DB: str = env.str('DJANGO_MLFLOW_DB')
MLFLOW_ENDPOINT: str = env.str('DJANGO_MLFLOW_ENDPOINT')

# Include Debug Toolbar middleware as early as possible in the list.
# However, it must come after any other middleware that encodes the response's content,
# such as GZipMiddleware.
Expand Down
172 changes: 170 additions & 2 deletions bats_ai/tasks/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import io
import logging
import math
import os
import tempfile

from PIL import Image
from celery import shared_task
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.files import File

Expand All @@ -30,7 +33,7 @@ def image_compute_checksum(image_id: int):


@shared_task
def recording_compute_spectrogram(recording_id: int):
def recording_compute_spectrogram(recording_id: int, inference_mode: int = 0):
recording = Recording.objects.get(pk=recording_id)

with tempfile.TemporaryDirectory() as tmpdir:
Expand Down Expand Up @@ -87,8 +90,16 @@ def recording_compute_spectrogram(recording_id: int):
)

config = Configuration.objects.first()
inference_mode = 1 if os.getenv("USE_MLFLOW") else inference_mode
if inference_mode == 1:
logger.info("Using MLFlow model repository")
from mlflow.tracking import MlflowClient
client = MlflowClient(tracking_uri="http://localhost:5000")
registered_models = client.search_registered_models()
for model in registered_models:
logger.info(model.name)
if config and config.run_inference_on_upload:
predict_results = predict_from_compressed(compressed_obj)
predict_results = predict_from_compressed(compressed_obj, inference_mode)
label = predict_results['label']
score = predict_results['score']
confs = predict_results['confs']
Expand All @@ -113,3 +124,160 @@ def recording_compute_spectrogram(recording_id: int):
recording_annotation.save()

return {'spectrogram_id': spectrogram.id, 'compressed_id': compressed_obj.id}


def _fully_local_inference(image_file, use_mlflow_model):
import json

import onnx
import onnxruntime as ort
import tqdm

img = Image.open(image_file)

if not use_mlflow_model:
relative = ('..',) * 3
asset_path = os.path.abspath(os.path.join(__file__, *relative, 'assets'))

onnx_filename = os.path.join(asset_path, 'model.mobilenet.onnx')
assert os.path.exists(onnx_filename)

session = ort.InferenceSession(
onnx_filename,
providers=[
(
'CUDAExecutionProvider',
{
'cudnn_conv_use_max_workspace': '1',
'device_id': 0,
'cudnn_conv_algo_search': 'HEURISTIC',
},
),
'CPUExecutionProvider',
],
)
model = onnx.load(onnx_filename)
else:
import mlflow
import mlflow.onnx

MODEL_URI = 'models:/prototype/1'
mlflow.set_tracking_uri(settings.MLFLOW_ENDPOINT)
model = mlflow.onnx.load_model(model_uri=MODEL_URI)
session = ort.InferenceSession(
model.SerializeToString(),
providers=[
(
'CUDAExecutionProvider',
{
'cudnn_conv_use_max_workspace': '1',
'device_id': 0,
'cudnn_conv_algo_search': 'HEURISTIC',
},
),
'CPUExecutionProvider',
],
)

img = np.array(img)

h, w, c = img.shape
ratio_y = 224 / h
ratio_x = ratio_y * 0.5
raw = cv2.resize(img, None, fx=ratio_x, fy=ratio_y, interpolation=cv2.INTER_LANCZOS4)

h, w, c = raw.shape
if w <= h:
canvas = np.zeros((h, h + 1, 3), dtype=raw.dtype)
canvas[:, :w, :] = raw
raw = canvas
h, w, c = raw.shape

inputs_ = []
for index in range(0, w - h, 100):
inputs_.append(raw[:, index : index + h, :])
inputs_.append(raw[:, -h:, :])
inputs_ = np.array(inputs_)

chunksize = 1
chunks = np.array_split(inputs_, np.arange(chunksize, len(inputs_), chunksize))
outputs = []
for chunk in tqdm.tqdm(chunks, desc='Inference'):
outputs_ = session.run(
None,
{'input': chunk},
)
outputs.append(outputs_[0])
outputs = np.vstack(outputs)
outputs = outputs.mean(axis=0)

mapping = json.loads(model.metadata_props[0].value)
labels = [mapping['forward'][str(index)] for index in range(len(mapping['forward']))]

prediction = np.argmax(outputs)
label = labels[prediction]
score = outputs[prediction]

confs = dict(zip(labels, outputs))

return label, score, confs


def predict_compressed(image_file):
# 0: use the local file and do inference with that
# 1: get the file from mlflow and do inference locally
# 2: do inference from deployed mlflow model
inference_mode = int(os.getenv('INFERENCE_MODE', 0))
if inference_mode == 1:
print('Using inference mode 1: file from mlflow')
return _fully_local_inference(image_file, True)
elif inference_mode == 2:
print('Using inference mode 2: deployed mlflow model')
else:
print('Using inference mode 0: local file')
return _fully_local_inference(image_file, False)


@shared_task
def example_train(experiment_name: str):
import mlflow
from mlflow.models import infer_signature
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

X, y = datasets.load_iris(return_X_y=True)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

params = {
'solver': 'lbfgs',
'max_iter': 1000,
'multi_class': 'auto',
'random_state': 8888,
}

lr = LogisticRegression(**params)
lr.fit(X_train, y_train)

y_pred = lr.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)

mlflow.set_tracking_uri(settings.MLFLOW_ENDPOINT)
mlflow.set_experiment(experiment_name)

with mlflow.start_run():
mlflow.log_params(params)
mlflow.log_metric('accuracy', accuracy)
mlflow.set_tag('Training Info', 'Basic LR model for iris data')

signature = infer_signature(X_train, lr.predict(X_train))
_ = mlflow.sklearn.log_model(
sk_model=lr,
artifact_path='iris_model',
signature=signature,
input_example=X_train,
registered_model_name='tracking-quickstart',
)
18 changes: 17 additions & 1 deletion bats_ai/utils/spectrogram_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from PIL import Image
import cv2
from django.conf import settings
import librosa
import librosa.display
import matplotlib.pyplot as plt
Expand Down Expand Up @@ -58,6 +59,7 @@ class PredictionOutput(TypedDict):

def predict_from_compressed(
compressed_object: CompressedSpectrogram | NABatCompressedSpectrogram,
inference_mode: int = 0
) -> PredictionOutput:
"""
Predict label, score, and confidences from an image file.
Expand All @@ -82,8 +84,22 @@ def predict_from_compressed(
onnx_filename = current_file.parents[2] / 'assets' / 'model.mobilenet.onnx'
assert os.path.exists(onnx_filename), f'ONNX model file not found at {onnx_filename}'

if inference_mode == 1:
# Get onnx model as a file from Mlflow
import mlflow
import mlflow.onnx as mlflow_onnx
from onnx import ModelProto
logger.info("Loading model from MLFlow artifact store")
mlflow.set_tracking_uri(settings.MLFLOW_ENDPOINT)

onnx_model: ModelProto = mlflow_onnx.load_model("models:/onnx-prototype/1")
onnx_input: Path | bytes = onnx_model.SerializeToString()
else:
logger.info("Loading model via local file")
onnx_input = onnx_filename

session = ort.InferenceSession(
onnx_filename,
onnx_input,
providers=[
(
'CUDAExecutionProvider',
Expand Down
Loading
Loading