Connect to MLFlow

Send agent traces from any framework to MLflow on Red Hat OpenShift AI.

View as Markdown

MLflow tracing captures every LLM call, tool invocation, and agent state transition as structured spans. On OpenShift AI, MLflow runs as a managed service that your agent connects to via environment variables — no code changes needed when moving between standalone and operator-managed deployments.

The pattern is the same across all frameworks: read the tracking URI from the environment, optionally authenticate with a service account token, and call the framework's autolog() function. Every trace is then automatically collected, including LLM inputs/outputs, latency, token counts, and tool results.

OpenShift AI Setup

MLflow on OpenShift AI supports two deployment modes: standalone (a Deployment + Service + PVC managed by your Helm chart) and CR mode (an MLflow custom resource managed by the MLflow operator). Both expose the same tracking API — only the connection details differ.

Environment variables

Your agent reads these environment variables at startup. In standalone mode, only MLFLOW_TRACKING_URI is required. In CR mode, the workspace and token file are also needed.

Environment variables
# Required
MLFLOW_TRACKING_URI=http://mlflow:5500

# Optional
MLFLOW_EXPERIMENT_NAME=my-agent

# OpenShift AI / CR mode only
MLFLOW_WORKSPACE=my-namespace
MLFLOW_TRACKING_TOKEN_FILE=/var/run/secrets/kubernetes.io/serviceaccount/token
REQUESTS_CA_BUNDLE=/tmp/ca-bundle/combined-ca.crt
VariableRequiredDescription
MLFLOW_TRACKING_URIYesMLflow server URL. Set automatically by the Helm chart.
MLFLOW_EXPERIMENT_NAMENoExperiment name. Defaults to the agent name.
MLFLOW_WORKSPACECR onlyNamespace for multi-tenant isolation via the operator gateway.
MLFLOW_TRACKING_TOKEN_FILECR onlyPath to the service account token for gateway authentication.
REQUESTS_CA_BUNDLECR onlyCA bundle for TLS to the operator-managed MLflow gateway.

Authentication

In CR mode, the agent authenticates to the MLflow operator gateway using a Kubernetes service account token. The token is mounted at the standard path and read at startup:

Token file authentication
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
if _token_file and os.path.isfile(_token_file):
    with open(_token_file) as f:
        os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

The operator gateway also requires a merged CA bundle (system CAs + Kubernetes service CA) for TLS verification. This is handled by an init container in the deployment.

Agent Frameworks

LangGraph

LangGraph is the most common framework for building stateful, multi-actor agent applications. MLflow's mlflow.langchain.autolog() automatically traces all LangChain and LangGraph components — LLM calls, tool executions, graph node transitions, and state checkpoints.

This example is from the bank-voice-agent reference architecture, which runs a multi-agent banking assistant on OpenShift AI with full MLflow observability.

mlflow.langchain.autolog() — Traces LLM calls, tool use, and graph state transitions
import os
import mlflow
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, StateGraph

from src.graph import build_graph

# ── Optional MLflow tracing ──────────────────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()
_mlflow_enabled = False
_mlflow_experiment_id: str | None = None

if _mlflow_uri:
    try:
        # Token file auth (OpenShift AI / MLflow operator CR mode)
        _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
        if _token_file and os.path.isfile(_token_file):
            with open(_token_file) as f:
                os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

        mlflow.set_tracking_uri(_mlflow_uri)

        # Workspace support (MLflow operator CR mode)
        _workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
        if _workspace:
            mlflow.set_workspace(_workspace)

        experiment_name = os.environ.get(
            "MLFLOW_EXPERIMENT_NAME", "my-agent"
        )

        if _workspace:
            # MLflow operator gateway blocks get_experiment.
            # Use search_experiments to find or create.
            import mlflow.tracking.fluent as _fluent

            client = mlflow.MlflowClient()
            exps = client.search_experiments(
                filter_string=f"name = '{experiment_name}'"
            )
            if exps:
                _mlflow_experiment_id = exps[0].experiment_id
            else:
                _mlflow_experiment_id = client.create_experiment(
                    experiment_name
                )
            _fluent._active_experiment_id = _mlflow_experiment_id
        else:
            mlflow.set_experiment(experiment_name)

        mlflow.langchain.autolog()
        _mlflow_enabled = True
        print(f"[mlflow] Tracing enabled → {_mlflow_uri}")
    except Exception as exc:
        print(f"[mlflow] Failed to initialise: {exc}")

With autolog() enabled, every call to graph.invoke() or graph.stream() produces a trace with spans for each node, LLM invocation, and tool call. No manual callbacks are needed.

CrewAI

CrewAI orchestrates role-based AI agents working together as a crew. MLflow's mlflow.crewai.autolog() captures each agent's task execution, tool calls, and crew-level orchestration.

mlflow.crewai.autolog() — Traces crew orchestration, agent tasks, and tool calls
import os
import mlflow
from crewai import Agent, Task, Crew, Process

# ── Connect to MLflow on OpenShift AI ──────────────────────────
mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if mlflow_uri:
    _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "")
    if _token_file and os.path.isfile(_token_file):
        with open(_token_file) as f:
            os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

    mlflow.set_tracking_uri(mlflow_uri)
    mlflow.set_experiment(
        os.environ.get("MLFLOW_EXPERIMENT_NAME", "crewai-agent")
    )
    mlflow.crewai.autolog()
    print(f"[mlflow] CrewAI tracing enabled → {mlflow_uri}")

# ── Define agents and tasks ────────────────────────────────────
researcher = Agent(
    role="Researcher",
    goal="Find accurate information on the given topic",
    backstory="You are an expert research analyst.",
    verbose=True,
)

writer = Agent(
    role="Writer",
    goal="Write a clear summary based on the research",
    backstory="You are a technical writer.",
    verbose=True,
)

research_task = Task(
    description="Research the topic: {topic}",
    expected_output="A detailed summary of findings",
    agent=researcher,
)

write_task = Task(
    description="Write a report based on the research",
    expected_output="A well-structured report",
    agent=writer,
)

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
    verbose=True,
)

# All CrewAI traces are automatically sent to MLflow
result = crew.kickoff(inputs={"topic": "AI agent observability"})

AutoGen

AutoGen enables multi-agent conversations where agents collaborate, debate, and solve problems together. MLflow's mlflow.autogen.autolog() traces each agent turn, message exchange, and termination condition.

mlflow.autogen.autolog() — Traces agent conversations, turns, and group chat flow
import os
import mlflow
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

# ── Connect to MLflow on OpenShift AI ──────────────────────────
mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if mlflow_uri:
    _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "")
    if _token_file and os.path.isfile(_token_file):
        with open(_token_file) as f:
            os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

    mlflow.set_tracking_uri(mlflow_uri)
    mlflow.set_experiment(
        os.environ.get("MLFLOW_EXPERIMENT_NAME", "autogen-agent")
    )
    mlflow.autogen.autolog()
    print(f"[mlflow] AutoGen tracing enabled → {mlflow_uri}")

# ── Define agents ──────────────────────────────────────────────
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

planner = AssistantAgent(
    name="planner",
    model_client=model_client,
    system_message="You are a planning agent. Break tasks into steps.",
)

executor = AssistantAgent(
    name="executor",
    model_client=model_client,
    system_message="You execute the plan. Say TERMINATE when done.",
)

termination = TextMentionTermination("TERMINATE")

team = RoundRobinGroupChat(
    participants=[planner, executor],
    termination_condition=termination,
    max_turns=6,
)

# All AutoGen traces are automatically sent to MLflow
import asyncio
result = asyncio.run(
    team.run(task="Summarize best practices for agent tracing")
)

LlamaIndex

LlamaIndex specializes in RAG pipelines and data-connected agents. MLflow's mlflow.llama_index.autolog() captures document loading, embedding, retrieval, and query engine execution.

mlflow.llama_index.autolog() — Traces RAG retrieval, embedding, and query execution
import os
import mlflow
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI

# ── Connect to MLflow on OpenShift AI ──────────────────────────
mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if mlflow_uri:
    _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "")
    if _token_file and os.path.isfile(_token_file):
        with open(_token_file) as f:
            os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

    mlflow.set_tracking_uri(mlflow_uri)
    mlflow.set_experiment(
        os.environ.get("MLFLOW_EXPERIMENT_NAME", "llamaindex-agent")
    )
    mlflow.llama_index.autolog()
    print(f"[mlflow] LlamaIndex tracing enabled → {mlflow_uri}")

# ── Build a RAG pipeline ──────────────────────────────────────
Settings.llm = OpenAI(model="gpt-4o-mini")

documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()

# All LlamaIndex traces are automatically sent to MLflow
response = query_engine.query(
    "What are the key concepts in agent tracing?"
)
print(response)

Google ADK

Google Agent Development Kit (ADK) builds agents using Gemini models with built-in tool use. ADK uses OpenTelemetry natively — traces can be exported to MLflow's OTLP endpoint or via the mlflow.tracing API.

OpenTelemetry export — Traces agent runs, tool calls, and Gemini model interactions
import os
import mlflow
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

# ── Connect to MLflow on OpenShift AI ──────────────────────────
mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if mlflow_uri:
    _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "")
    if _token_file and os.path.isfile(_token_file):
        with open(_token_file) as f:
            os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

    mlflow.set_tracking_uri(mlflow_uri)
    mlflow.set_experiment(
        os.environ.get("MLFLOW_EXPERIMENT_NAME", "google-adk-agent")
    )
    # ADK uses OpenTelemetry — export spans to MLflow
    os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = mlflow_uri
    os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf"
    print(f"[mlflow] Google ADK tracing enabled → {mlflow_uri}")

# ── Define an ADK agent ───────────────────────────────────────
def get_weather(city: str) -> dict:
    """Get current weather for a city."""
    return {
        "city": city,
        "temperature": "72°F",
        "condition": "Sunny",
    }

agent = Agent(
    name="weather_agent",
    model="gemini-2.0-flash",
    description="A helpful weather assistant",
    instruction="Help users check the weather. Use the get_weather "
                "tool when they ask about weather in a specific city.",
    tools=[get_weather],
)

# ── Run the agent ─────────────────────────────────────────────
session_service = InMemorySessionService()
runner = Runner(agent=agent, app_name="weather_app",
                session_service=session_service)

session = session_service.create_session(
    app_name="weather_app", user_id="user1"
)

message = types.Content(
    role="user",
    parts=[types.Part(text="What's the weather in San Francisco?")],
)

import asyncio

async def run():
    async for event in runner.run_async(
        user_id="user1", session_id=session.id, new_message=message
    ):
        if event.is_final_response():
            print(event.content.parts[0].text)

asyncio.run(run())

OpenShift Deployment

The Helm chart handles MLflow deployment and injects the correct environment variables into your agent's pod. The configuration differs between standalone and CR mode.

Standalone mode

Deploys MLflow as a Deployment + Service + PVC in your namespace. The agent connects directly via HTTP. This is the simplest setup and works on any OpenShift cluster. See the chart deployment template for a full working example.

values.yaml
mlflow:
  enabled: true
  # "standalone" deploys MLflow as Deployment+Service+PVC
  # "cr" uses the MLflow operator custom resource
  deployMode: "standalone"
  crServiceUrl: "https://mlflow.redhat-ods-applications.svc.cluster.local:8443"
  image:
    repository: ghcr.io/mlflow/mlflow
    tag: v3.10.1
  port: 5500
  args:
    - "mlflow"
    - "server"
    - "--host"
    - "0.0.0.0"
    - "--port"
    - "5500"
    - "--backend-store-uri"
    - "sqlite:////mlflow/mlflow.db"
    - "--default-artifact-root"
    - "/mlflow/mlartifacts"
  persistence:
    size: 5Gi

CR mode (MLflow Operator)

Uses the MLflow operator to manage MLflow as a custom resource. The operator provides a gateway that handles multi-tenant workspace isolation and service account authentication. An init container merges CA certificates for TLS. See the chart CR template for a full working example.

backend-deployment.yaml
# backend-deployment.yaml (Helm template excerpt)
containers:
  - name: backend
    env:
      {{- if eq (.Values.mlflow.deployMode | default "standalone") "cr" }}
      - name: MLFLOW_TRACKING_URI
        value: {{ .Values.mlflow.crServiceUrl | default
          "https://mlflow.redhat-ods-applications.svc.cluster.local:8443" }}
      - name: MLFLOW_WORKSPACE
        value: {{ .Release.Namespace | quote }}
      - name: REQUESTS_CA_BUNDLE
        value: "/tmp/ca-bundle/combined-ca.crt"
      - name: MLFLOW_TRACKING_TOKEN_FILE
        value: "/var/run/secrets/kubernetes.io/serviceaccount/token"
      {{- else }}
      - name: MLFLOW_TRACKING_URI
        value: {{ printf "http://%s-mlflow:%v"
          (include "app.fullname" $) .Values.mlflow.port }}
      {{- end }}
      - name: MLFLOW_EXPERIMENT_NAME
        value: {{ .Values.mlflow.experimentName | default "my-agent" }}
Init container — CA bundle merge
# Merge CA certificates for TLS to MLflow operator gateway
initContainers:
  - name: merge-ca
    image: "{{ .Values.backend.image.repository }}:{{ .Values.backend.image.tag }}"
    command: ["/bin/sh", "-c"]
    args:
      - |
        cat /etc/pki/tls/certs/ca-bundle.crt \
          > /tmp/ca-bundle/combined-ca.crt
        if [ -f /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt ]; then
          cat /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt \
            >> /tmp/ca-bundle/combined-ca.crt
        fi
    volumeMounts:
      - name: ca-bundle
        mountPath: /tmp/ca-bundle