Connect to MLFlow

Send agent traces from any framework to MLflow on Red Hat OpenShift AI.

View as Markdown

MLflow tracing captures every LLM call, tool invocation, and agent state transition as structured spans. On OpenShift AI, MLflow runs as a managed service that your agent connects to via environment variables — no code changes needed when moving between standalone and operator-managed deployments.

The pattern is the same across all frameworks: read the tracking URI from the environment and optionally authenticate with a service account token. Most frameworks use autolog() for automatic trace collection. Google ADK uses OpenTelemetry natively — you configure a TracerProvider with an OTLP exporter pointed at MLflow instead. Either way, every trace is automatically collected, including LLM inputs/outputs, latency, token counts, and tool results.

OpenShift AI Setup

MLflow on OpenShift AI supports two deployment modes: standalone (a Deployment + Service + PVC managed by your Helm chart) and CR mode (an MLflow custom resource managed by the MLflow operator). Both expose the same tracking API — only the connection details differ.

Environment variables

Your agent reads these environment variables at startup. In standalone mode, only MLFLOW_TRACKING_URI is required. In CR mode, the workspace and token file are also needed.

Environment variables
# Required
MLFLOW_TRACKING_URI=http://mlflow:5500

# Optional
MLFLOW_EXPERIMENT_NAME=my-agent

# OpenShift AI / CR mode only
MLFLOW_WORKSPACE=my-namespace
MLFLOW_TRACKING_TOKEN_FILE=/var/run/secrets/kubernetes.io/serviceaccount/token
REQUESTS_CA_BUNDLE=/tmp/ca-bundle/combined-ca.crt
VariableRequiredDescription
MLFLOW_TRACKING_URIYesMLflow server URL. Set automatically by the Helm chart.
MLFLOW_EXPERIMENT_NAMENoExperiment name. Defaults to the agent name.
MLFLOW_WORKSPACECR onlyNamespace for multi-tenant isolation via the operator gateway.
MLFLOW_TRACKING_TOKEN_FILECR onlyPath to the service account token for gateway authentication.
REQUESTS_CA_BUNDLECR onlyCA bundle for TLS to the operator-managed MLflow gateway.

Authentication

In CR mode, the agent authenticates to the MLflow operator gateway using a Kubernetes service account token. The token is mounted at the standard path and read at startup:

Token file authentication
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
if _token_file and os.path.isfile(_token_file):
    with open(_token_file) as f:
        os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

The operator gateway also requires a merged CA bundle (system CAs + Kubernetes service CA) for TLS verification. This is handled by an init container in the deployment.

Gateway RBAC

In CR mode, the MLflow operator gateway authorizes requests using Kubernetes RBAC. The pod's service account must have the mlflow-operator-mlflow-integration ClusterRole bound in the agent's namespace. This grants permission to use the gateway endpoint (gatewayendpoints/use) and manage experiments, datasets, and registered models.

Grant gateway access
# Grant the pod's service account access to the MLflow operator gateway
oc create rolebinding agent-mlflow \
  --clusterrole=mlflow-operator-mlflow-integration \
  --serviceaccount=<namespace>:default \
  -n <namespace>

Replace <namespace> with the namespace where your agent runs (e.g. basic-agents). If your pod uses a named service account instead of default, substitute that name. This is only required for CR mode — standalone deployments connect directly to MLflow without going through the operator gateway.

Agent Frameworks

LangGraph

LangGraph is the most common framework for building stateful, multi-actor agent applications. MLflow's mlflow.langchain.autolog() automatically traces all LangChain and LangGraph components — LLM calls, tool executions, graph node transitions, and state checkpoints.

This example is from the bank-voice-agent reference architecture, which runs a multi-agent banking assistant on OpenShift AI with full MLflow observability.

mlflow.langchain.autolog() — Traces LLM calls, tool use, and graph state transitions
import os
import mlflow
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, StateGraph

from src.graph import build_graph

# ── Optional MLflow tracing ──────────────────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()
_mlflow_enabled = False
_mlflow_experiment_id: str | None = None

if _mlflow_uri:
    try:
        # Token file auth (OpenShift AI / MLflow operator CR mode)
        _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
        if _token_file and os.path.isfile(_token_file):
            with open(_token_file) as f:
                os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

        mlflow.set_tracking_uri(_mlflow_uri)

        # Workspace support (MLflow operator CR mode)
        _workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
        if _workspace:
            mlflow.set_workspace(_workspace)

        experiment_name = os.environ.get(
            "MLFLOW_EXPERIMENT_NAME", "my-agent"
        )

        if _workspace:
            # MLflow operator gateway blocks get_experiment.
            # Use search_experiments to find or create.
            import mlflow.tracking.fluent as _fluent

            client = mlflow.MlflowClient()
            exps = client.search_experiments(
                filter_string=f"name = '{experiment_name}'"
            )
            if exps:
                _mlflow_experiment_id = exps[0].experiment_id
            else:
                _mlflow_experiment_id = client.create_experiment(
                    experiment_name
                )
            _fluent._active_experiment_id = _mlflow_experiment_id
        else:
            mlflow.set_experiment(experiment_name)

        mlflow.langchain.autolog()
        _mlflow_enabled = True
        print(f"[mlflow] Tracing enabled → {_mlflow_uri}")
    except Exception as exc:
        print(f"[mlflow] Failed to initialise: {exc}")

With autolog() enabled, every call to graph.invoke() or graph.stream() produces a trace with spans for each node, LLM invocation, and tool call. No manual callbacks are needed.

CrewAI

CrewAI orchestrates role-based AI agents working together as a crew. MLflow's mlflow.crewai.autolog() captures each agent's task execution, tool calls, and crew-level orchestration.

mlflow.crewai.autolog() — Traces crew orchestration, agent tasks, and tool calls
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import mlflow
from crewai import Agent, Task, Crew, Process

# ── Connect to MLflow on OpenShift AI ──────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if _mlflow_uri:
    try:
        _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
        if _token_file and os.path.isfile(_token_file):
            with open(_token_file) as f:
                os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

        mlflow.set_tracking_uri(_mlflow_uri)

        # Workspace support (MLflow operator CR mode)
        _workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
        if _workspace:
            mlflow.set_workspace(_workspace)

        experiment_name = os.environ.get(
            "MLFLOW_EXPERIMENT_NAME", "crewai-agent"
        )

        if _workspace:
            import mlflow.tracking.fluent as _fluent

            client = mlflow.MlflowClient()
            exps = client.search_experiments(
                filter_string=f"name = '{experiment_name}'"
            )
            if exps:
                _fluent._active_experiment_id = exps[0].experiment_id
            else:
                _fluent._active_experiment_id = client.create_experiment(
                    experiment_name
                )
        else:
            mlflow.set_experiment(experiment_name)

        mlflow.crewai.autolog()
        print(f"[mlflow] CrewAI tracing enabled → {_mlflow_uri}")
    except Exception as exc:
        print(f"[mlflow] Failed to initialise: {exc}")

# ── Define agents and tasks ────────────────────────────────────
researcher = Agent(
    role="Researcher",
    goal="Find accurate information on the given topic",
    backstory="You are an expert research analyst.",
    verbose=True,
)

writer = Agent(
    role="Writer",
    goal="Write a clear summary based on the research",
    backstory="You are a technical writer.",
    verbose=True,
)

research_task = Task(
    description="Research the topic: {topic}",
    expected_output="A detailed summary of findings",
    agent=researcher,
)

write_task = Task(
    description="Write a report based on the research",
    expected_output="A well-structured report",
    agent=writer,
)

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
    verbose=True,
)

# All CrewAI traces are automatically sent to MLflow
result = crew.kickoff(inputs={"topic": "AI agent observability"})

AutoGen

AutoGen enables multi-agent conversations where agents collaborate, debate, and solve problems together. MLflow's mlflow.autogen.autolog() traces each agent turn, message exchange, and termination condition.

mlflow.autogen.autolog() — Traces agent conversations, turns, and group chat flow
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import mlflow
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

# ── Connect to MLflow on OpenShift AI ──────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if _mlflow_uri:
    try:
        _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
        if _token_file and os.path.isfile(_token_file):
            with open(_token_file) as f:
                os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

        mlflow.set_tracking_uri(_mlflow_uri)

        # Workspace support (MLflow operator CR mode)
        _workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
        if _workspace:
            mlflow.set_workspace(_workspace)

        experiment_name = os.environ.get(
            "MLFLOW_EXPERIMENT_NAME", "autogen-agent"
        )

        if _workspace:
            import mlflow.tracking.fluent as _fluent

            client = mlflow.MlflowClient()
            exps = client.search_experiments(
                filter_string=f"name = '{experiment_name}'"
            )
            if exps:
                _fluent._active_experiment_id = exps[0].experiment_id
            else:
                _fluent._active_experiment_id = client.create_experiment(
                    experiment_name
                )
        else:
            mlflow.set_experiment(experiment_name)

        mlflow.autogen.autolog()
        print(f"[mlflow] AutoGen tracing enabled → {_mlflow_uri}")
    except Exception as exc:
        print(f"[mlflow] Failed to initialise: {exc}")

# ── Define agents ──────────────────────────────────────────────
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

planner = AssistantAgent(
    name="planner",
    model_client=model_client,
    system_message="You are a planning agent. Break tasks into steps.",
)

executor = AssistantAgent(
    name="executor",
    model_client=model_client,
    system_message="You execute the plan. Say TERMINATE when done.",
)

termination = TextMentionTermination("TERMINATE")

team = RoundRobinGroupChat(
    participants=[planner, executor],
    termination_condition=termination,
    max_turns=6,
)

# All AutoGen traces are automatically sent to MLflow
import asyncio
result = asyncio.run(
    team.run(task="Summarize best practices for agent tracing")
)

Note: AutoGen records tool calls as inner_messages within the trace (ToolCallRequestEvent and ToolCallExecutionEvent) rather than as separate child spans with span_type=TOOL. This means tool invocations appear in the trace Summary and Details view, but MLflow's Overview Tool calls tab will show a count of zero. The trace data is complete — only the overview aggregation is affected.

LlamaIndex

LlamaIndex specializes in RAG pipelines and data-connected agents. MLflow's mlflow.llama_index.autolog() captures document loading, embedding, retrieval, and query engine execution.

mlflow.llama_index.autolog() — Traces RAG retrieval, embedding, and query execution
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import mlflow
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI

# ── Connect to MLflow on OpenShift AI ──────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if _mlflow_uri:
    try:
        _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
        if _token_file and os.path.isfile(_token_file):
            with open(_token_file) as f:
                os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

        mlflow.set_tracking_uri(_mlflow_uri)

        # Workspace support (MLflow operator CR mode)
        _workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
        if _workspace:
            mlflow.set_workspace(_workspace)

        experiment_name = os.environ.get(
            "MLFLOW_EXPERIMENT_NAME", "llamaindex-agent"
        )

        if _workspace:
            import mlflow.tracking.fluent as _fluent

            client = mlflow.MlflowClient()
            exps = client.search_experiments(
                filter_string=f"name = '{experiment_name}'"
            )
            if exps:
                _fluent._active_experiment_id = exps[0].experiment_id
            else:
                _fluent._active_experiment_id = client.create_experiment(
                    experiment_name
                )
        else:
            mlflow.set_experiment(experiment_name)

        mlflow.llama_index.autolog()
        print(f"[mlflow] LlamaIndex tracing enabled → {_mlflow_uri}")
    except Exception as exc:
        print(f"[mlflow] Failed to initialise: {exc}")

# ── Build a RAG pipeline ──────────────────────────────────────
Settings.llm = OpenAI(model="gpt-4o-mini")

documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()

# All LlamaIndex traces are automatically sent to MLflow
response = query_engine.query(
    "What are the key concepts in agent tracing?"
)
print(response)

Google ADK

Google Agent Development Kit (ADK) builds agents using Gemini models with built-in tool use. ADK emits OpenTelemetry spans natively — you configure a TracerProvider with an OTLPSpanExporter pointed at MLflow's /v1/traces endpoint. This requires mlflow>=3.6, opentelemetry-sdk, and opentelemetry-exporter-otlp-proto-http. Set MLFLOW_USE_DEFAULT_TRACER_PROVIDER=false before importing mlflow to avoid duplicate traces.

OpenTelemetry export — Traces agent runs, tool calls, and Gemini model interactions
import os
os.environ["MLFLOW_USE_DEFAULT_TRACER_PROVIDER"] = "false"
import mlflow
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

# ── Connect to MLflow on OpenShift AI ──────────────────────────
mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()

if mlflow_uri:
    from opentelemetry import trace
    from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import SimpleSpanProcessor

    _token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "")
    if _token_file and os.path.isfile(_token_file):
        with open(_token_file) as f:
            os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()

    mlflow.set_tracking_uri(mlflow_uri)

    # Workspace support (MLflow operator CR mode)
    _workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
    if _workspace:
        mlflow.set_workspace(_workspace)

    experiment_name = os.environ.get(
        "MLFLOW_EXPERIMENT_NAME", "google-adk-agent"
    )

    if _workspace:
        import mlflow.tracking.fluent as _fluent

        client = mlflow.MlflowClient()
        exps = client.search_experiments(
            filter_string=f"name = '{experiment_name}'"
        )
        if exps:
            _exp_id = exps[0].experiment_id
        else:
            _exp_id = client.create_experiment(experiment_name)
        _fluent._active_experiment_id = _exp_id
    else:
        _exp_id = mlflow.set_experiment(experiment_name).experiment_id

    # ADK uses OpenTelemetry — configure OTLP exporter for MLflow
    _otel_endpoint = f"{mlflow_uri.rstrip('/')}/v1/traces"
    _otel_headers = {"x-mlflow-experiment-id": _exp_id}
    if _workspace:
        _otel_headers["x-mlflow-workspace"] = _workspace
    _token = os.environ.get("MLFLOW_TRACKING_TOKEN", "")
    if _token:
        _otel_headers["Authorization"] = f"Bearer {_token}"

    _tracer_provider = TracerProvider()
    _tracer_provider.add_span_processor(
        SimpleSpanProcessor(OTLPSpanExporter(
            endpoint=_otel_endpoint,
            headers=_otel_headers,
        ))
    )
    trace.set_tracer_provider(_tracer_provider)

    print(f"[mlflow] Google ADK tracing enabled → {mlflow_uri}")

# ── Define an ADK agent ───────────────────────────────────────
def get_weather(city: str) -> dict:
    """Get current weather for a city."""
    return {
        "city": city,
        "temperature": "72°F",
        "condition": "Sunny",
    }

agent = Agent(
    name="weather_agent",
    model="gemini-2.0-flash",
    description="A helpful weather assistant",
    instruction="Help users check the weather. Use the get_weather "
                "tool when they ask about weather in a specific city.",
    tools=[get_weather],
)

# ── Run the agent ─────────────────────────────────────────────
session_service = InMemorySessionService()
runner = Runner(agent=agent, app_name="weather_app",
                session_service=session_service)

session = session_service.create_session(
    app_name="weather_app", user_id="user1"
)

message = types.Content(
    role="user",
    parts=[types.Part(text="What's the weather in San Francisco?")],
)

import asyncio

async def run():
    async for event in runner.run_async(
        user_id="user1", session_id=session.id, new_message=message
    ):
        if event.is_final_response():
            print(event.content.parts[0].text)

asyncio.run(run())

Note: ADK traces tool calls as named spans in the trace timeline (e.g. execute_tool get_weather), but MLflow's Overview Tool calls tab may show a count of zero. This happens because the OTel span attributes that drive the overview aggregation depend on ADK and MLflow versions aligning on the gen_ai.operation.name semantic convention. The trace data itself is complete — tool inputs, outputs, and timing are all captured in the Details view.

OpenShift Deployment

The Helm chart handles MLflow deployment and injects the correct environment variables into your agent's pod. The configuration differs between standalone and CR mode.

Standalone mode

Deploys MLflow as a Deployment + Service + PVC in your namespace. The agent connects directly via HTTP. This is the simplest setup and works on any OpenShift cluster. See the chart deployment template for a full working example.

values.yaml
mlflow:
  enabled: true
  # "standalone" deploys MLflow as Deployment+Service+PVC
  # "cr" uses the MLflow operator custom resource
  deployMode: "standalone"
  crServiceUrl: "https://mlflow.redhat-ods-applications.svc.cluster.local:8443"
  image:
    repository: ghcr.io/mlflow/mlflow
    tag: v3.10.1
  port: 5500
  args:
    - "mlflow"
    - "server"
    - "--host"
    - "0.0.0.0"
    - "--port"
    - "5500"
    - "--backend-store-uri"
    - "sqlite:////mlflow/mlflow.db"
    - "--default-artifact-root"
    - "/mlflow/mlartifacts"
  persistence:
    size: 5Gi

CR mode (MLflow Operator)

Uses the MLflow operator to manage MLflow as a custom resource. The operator provides a gateway that handles multi-tenant workspace isolation and service account authentication. An init container merges CA certificates for TLS. See the chart CR template for a full working example.

mlflow-cr.yaml
apiVersion: mlflow.opendatahub.io/v1
kind: MLflow
metadata:
  name: mlflow
  labels:
    {{- include "ai-voice-agent.mlflow.labels" . | nindent 4 }}
spec:
  artifactsDestination: 'file:///mlflow/artifacts'
  backendStoreUri: 'sqlite:////mlflow/mlflow.db'
  image:
    image: quay.io/opendatahub/mlflow:odh-stable
  replicas: 1
  serveArtifacts: true
  serviceAccountName: mlflow-sa
  storage:
    accessModes:
      - ReadWriteOnce
    resources:
      requests:
        storage: {{ .Values.mlflow.persistence.size | default "10Gi" }}
  workers: 1
backend-deployment.yaml
# backend-deployment.yaml (Helm template excerpt)
containers:
  - name: backend
    env:
      {{- if eq (.Values.mlflow.deployMode | default "standalone") "cr" }}
      - name: MLFLOW_TRACKING_URI
        value: {{ .Values.mlflow.crServiceUrl | default
          "https://mlflow.redhat-ods-applications.svc.cluster.local:8443" }}
      - name: MLFLOW_WORKSPACE
        value: {{ .Release.Namespace | quote }}
      - name: REQUESTS_CA_BUNDLE
        value: "/tmp/ca-bundle/combined-ca.crt"
      - name: MLFLOW_TRACKING_TOKEN_FILE
        value: "/var/run/secrets/kubernetes.io/serviceaccount/token"
      {{- else }}
      - name: MLFLOW_TRACKING_URI
        value: {{ printf "http://%s-mlflow:%v"
          (include "app.fullname" $) .Values.mlflow.port }}
      {{- end }}
      - name: MLFLOW_EXPERIMENT_NAME
        value: {{ .Values.mlflow.experimentName | default "my-agent" }}
Init container — CA bundle merge
# Merge CA certificates for TLS to MLflow operator gateway
initContainers:
  - name: merge-ca
    image: "{{ .Values.backend.image.repository }}:{{ .Values.backend.image.tag }}"
    command: ["/bin/sh", "-c"]
    args:
      - |
        cat /etc/pki/tls/certs/ca-bundle.crt \
          > /tmp/ca-bundle/combined-ca.crt
        if [ -f /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt ]; then
          cat /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt \
            >> /tmp/ca-bundle/combined-ca.crt
        fi
    volumeMounts:
      - name: ca-bundle
        mountPath: /tmp/ca-bundle