Connect to MLFlow
Send agent traces from any framework to MLflow on Red Hat OpenShift AI.
View as MarkdownMLflow tracing captures every LLM call, tool invocation, and agent state transition as structured spans. On OpenShift AI, MLflow runs as a managed service that your agent connects to via environment variables — no code changes needed when moving between standalone and operator-managed deployments.
The pattern is the same across all frameworks: read the tracking URI from the environment
and optionally authenticate with a service account token. Most frameworks use
autolog() for automatic trace collection. Google ADK uses OpenTelemetry natively —
you configure a TracerProvider with an OTLP exporter pointed at MLflow instead.
Either way, every trace is automatically collected, including LLM inputs/outputs,
latency, token counts, and tool results.
OpenShift AI Setup
MLflow on OpenShift AI supports two deployment modes: standalone (a Deployment + Service + PVC managed by your Helm chart) and CR mode (an MLflow custom resource managed by the MLflow operator). Both expose the same tracking API — only the connection details differ.
Environment variables
Your agent reads these environment variables at startup. In standalone mode, only
MLFLOW_TRACKING_URI is required. In CR mode, the workspace and token file are also needed.
# Required
MLFLOW_TRACKING_URI=http://mlflow:5500
# Optional
MLFLOW_EXPERIMENT_NAME=my-agent
# OpenShift AI / CR mode only
MLFLOW_WORKSPACE=my-namespace
MLFLOW_TRACKING_TOKEN_FILE=/var/run/secrets/kubernetes.io/serviceaccount/token
REQUESTS_CA_BUNDLE=/tmp/ca-bundle/combined-ca.crt| Variable | Required | Description |
|---|---|---|
MLFLOW_TRACKING_URI | Yes | MLflow server URL. Set automatically by the Helm chart. |
MLFLOW_EXPERIMENT_NAME | No | Experiment name. Defaults to the agent name. |
MLFLOW_WORKSPACE | CR only | Namespace for multi-tenant isolation via the operator gateway. |
MLFLOW_TRACKING_TOKEN_FILE | CR only | Path to the service account token for gateway authentication. |
REQUESTS_CA_BUNDLE | CR only | CA bundle for TLS to the operator-managed MLflow gateway. |
Authentication
In CR mode, the agent authenticates to the MLflow operator gateway using a Kubernetes service account token. The token is mounted at the standard path and read at startup:
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
if _token_file and os.path.isfile(_token_file):
with open(_token_file) as f:
os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()The operator gateway also requires a merged CA bundle (system CAs + Kubernetes service CA) for TLS verification. This is handled by an init container in the deployment.
Gateway RBAC
In CR mode, the MLflow operator gateway authorizes requests using Kubernetes RBAC.
The pod's service account must have the mlflow-operator-mlflow-integration ClusterRole
bound in the agent's namespace. This grants permission to use the gateway endpoint
(gatewayendpoints/use) and manage experiments, datasets, and registered models.
# Grant the pod's service account access to the MLflow operator gateway
oc create rolebinding agent-mlflow \
--clusterrole=mlflow-operator-mlflow-integration \
--serviceaccount=<namespace>:default \
-n <namespace>Replace <namespace> with the namespace where your agent runs (e.g. basic-agents).
If your pod uses a named service account instead of default, substitute that name.
This is only required for CR mode — standalone deployments connect directly to MLflow
without going through the operator gateway.
Agent Frameworks
LangGraph
LangGraph is the most common framework for
building stateful, multi-actor agent applications. MLflow's mlflow.langchain.autolog()
automatically traces all LangChain and LangGraph components — LLM calls, tool executions,
graph node transitions, and state checkpoints.
This example is from the bank-voice-agent reference architecture, which runs a multi-agent banking assistant on OpenShift AI with full MLflow observability.
import os
import mlflow
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, StateGraph
from src.graph import build_graph
# ── Optional MLflow tracing ──────────────────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()
_mlflow_enabled = False
_mlflow_experiment_id: str | None = None
if _mlflow_uri:
try:
# Token file auth (OpenShift AI / MLflow operator CR mode)
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
if _token_file and os.path.isfile(_token_file):
with open(_token_file) as f:
os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()
mlflow.set_tracking_uri(_mlflow_uri)
# Workspace support (MLflow operator CR mode)
_workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
if _workspace:
mlflow.set_workspace(_workspace)
experiment_name = os.environ.get(
"MLFLOW_EXPERIMENT_NAME", "my-agent"
)
if _workspace:
# MLflow operator gateway blocks get_experiment.
# Use search_experiments to find or create.
import mlflow.tracking.fluent as _fluent
client = mlflow.MlflowClient()
exps = client.search_experiments(
filter_string=f"name = '{experiment_name}'"
)
if exps:
_mlflow_experiment_id = exps[0].experiment_id
else:
_mlflow_experiment_id = client.create_experiment(
experiment_name
)
_fluent._active_experiment_id = _mlflow_experiment_id
else:
mlflow.set_experiment(experiment_name)
mlflow.langchain.autolog()
_mlflow_enabled = True
print(f"[mlflow] Tracing enabled → {_mlflow_uri}")
except Exception as exc:
print(f"[mlflow] Failed to initialise: {exc}")With autolog() enabled, every call to graph.invoke() or graph.stream() produces
a trace with spans for each node, LLM invocation, and tool call. No manual callbacks
are needed.
CrewAI
CrewAI orchestrates role-based AI agents working together as a crew.
MLflow's mlflow.crewai.autolog() captures each agent's task execution, tool calls, and
crew-level orchestration.
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import mlflow
from crewai import Agent, Task, Crew, Process
# ── Connect to MLflow on OpenShift AI ──────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()
if _mlflow_uri:
try:
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
if _token_file and os.path.isfile(_token_file):
with open(_token_file) as f:
os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()
mlflow.set_tracking_uri(_mlflow_uri)
# Workspace support (MLflow operator CR mode)
_workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
if _workspace:
mlflow.set_workspace(_workspace)
experiment_name = os.environ.get(
"MLFLOW_EXPERIMENT_NAME", "crewai-agent"
)
if _workspace:
import mlflow.tracking.fluent as _fluent
client = mlflow.MlflowClient()
exps = client.search_experiments(
filter_string=f"name = '{experiment_name}'"
)
if exps:
_fluent._active_experiment_id = exps[0].experiment_id
else:
_fluent._active_experiment_id = client.create_experiment(
experiment_name
)
else:
mlflow.set_experiment(experiment_name)
mlflow.crewai.autolog()
print(f"[mlflow] CrewAI tracing enabled → {_mlflow_uri}")
except Exception as exc:
print(f"[mlflow] Failed to initialise: {exc}")
# ── Define agents and tasks ────────────────────────────────────
researcher = Agent(
role="Researcher",
goal="Find accurate information on the given topic",
backstory="You are an expert research analyst.",
verbose=True,
)
writer = Agent(
role="Writer",
goal="Write a clear summary based on the research",
backstory="You are a technical writer.",
verbose=True,
)
research_task = Task(
description="Research the topic: {topic}",
expected_output="A detailed summary of findings",
agent=researcher,
)
write_task = Task(
description="Write a report based on the research",
expected_output="A well-structured report",
agent=writer,
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
verbose=True,
)
# All CrewAI traces are automatically sent to MLflow
result = crew.kickoff(inputs={"topic": "AI agent observability"})AutoGen
AutoGen enables multi-agent conversations where
agents collaborate, debate, and solve problems together. MLflow's mlflow.autogen.autolog()
traces each agent turn, message exchange, and termination condition.
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import mlflow
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
# ── Connect to MLflow on OpenShift AI ──────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()
if _mlflow_uri:
try:
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
if _token_file and os.path.isfile(_token_file):
with open(_token_file) as f:
os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()
mlflow.set_tracking_uri(_mlflow_uri)
# Workspace support (MLflow operator CR mode)
_workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
if _workspace:
mlflow.set_workspace(_workspace)
experiment_name = os.environ.get(
"MLFLOW_EXPERIMENT_NAME", "autogen-agent"
)
if _workspace:
import mlflow.tracking.fluent as _fluent
client = mlflow.MlflowClient()
exps = client.search_experiments(
filter_string=f"name = '{experiment_name}'"
)
if exps:
_fluent._active_experiment_id = exps[0].experiment_id
else:
_fluent._active_experiment_id = client.create_experiment(
experiment_name
)
else:
mlflow.set_experiment(experiment_name)
mlflow.autogen.autolog()
print(f"[mlflow] AutoGen tracing enabled → {_mlflow_uri}")
except Exception as exc:
print(f"[mlflow] Failed to initialise: {exc}")
# ── Define agents ──────────────────────────────────────────────
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
planner = AssistantAgent(
name="planner",
model_client=model_client,
system_message="You are a planning agent. Break tasks into steps.",
)
executor = AssistantAgent(
name="executor",
model_client=model_client,
system_message="You execute the plan. Say TERMINATE when done.",
)
termination = TextMentionTermination("TERMINATE")
team = RoundRobinGroupChat(
participants=[planner, executor],
termination_condition=termination,
max_turns=6,
)
# All AutoGen traces are automatically sent to MLflow
import asyncio
result = asyncio.run(
team.run(task="Summarize best practices for agent tracing")
)Note: AutoGen records tool calls as
inner_messageswithin the trace (ToolCallRequestEventandToolCallExecutionEvent) rather than as separate child spans withspan_type=TOOL. This means tool invocations appear in the trace Summary and Details view, but MLflow's Overview Tool calls tab will show a count of zero. The trace data is complete — only the overview aggregation is affected.
LlamaIndex
LlamaIndex specializes in RAG pipelines and data-connected agents.
MLflow's mlflow.llama_index.autolog() captures document loading, embedding, retrieval, and
query engine execution.
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import mlflow
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
# ── Connect to MLflow on OpenShift AI ──────────────────────────
_mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()
if _mlflow_uri:
try:
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "").strip()
if _token_file and os.path.isfile(_token_file):
with open(_token_file) as f:
os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()
mlflow.set_tracking_uri(_mlflow_uri)
# Workspace support (MLflow operator CR mode)
_workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
if _workspace:
mlflow.set_workspace(_workspace)
experiment_name = os.environ.get(
"MLFLOW_EXPERIMENT_NAME", "llamaindex-agent"
)
if _workspace:
import mlflow.tracking.fluent as _fluent
client = mlflow.MlflowClient()
exps = client.search_experiments(
filter_string=f"name = '{experiment_name}'"
)
if exps:
_fluent._active_experiment_id = exps[0].experiment_id
else:
_fluent._active_experiment_id = client.create_experiment(
experiment_name
)
else:
mlflow.set_experiment(experiment_name)
mlflow.llama_index.autolog()
print(f"[mlflow] LlamaIndex tracing enabled → {_mlflow_uri}")
except Exception as exc:
print(f"[mlflow] Failed to initialise: {exc}")
# ── Build a RAG pipeline ──────────────────────────────────────
Settings.llm = OpenAI(model="gpt-4o-mini")
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
# All LlamaIndex traces are automatically sent to MLflow
response = query_engine.query(
"What are the key concepts in agent tracing?"
)
print(response)Google ADK
Google Agent Development Kit (ADK) builds agents using
Gemini models with built-in tool use. ADK emits OpenTelemetry spans natively — you configure
a TracerProvider with an OTLPSpanExporter pointed at MLflow's /v1/traces endpoint.
This requires mlflow>=3.6, opentelemetry-sdk, and opentelemetry-exporter-otlp-proto-http.
Set MLFLOW_USE_DEFAULT_TRACER_PROVIDER=false before importing mlflow to avoid duplicate traces.
import os
os.environ["MLFLOW_USE_DEFAULT_TRACER_PROVIDER"] = "false"
import mlflow
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
# ── Connect to MLflow on OpenShift AI ──────────────────────────
mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI", "").strip()
if mlflow_uri:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
_token_file = os.environ.get("MLFLOW_TRACKING_TOKEN_FILE", "")
if _token_file and os.path.isfile(_token_file):
with open(_token_file) as f:
os.environ["MLFLOW_TRACKING_TOKEN"] = f.read().strip()
mlflow.set_tracking_uri(mlflow_uri)
# Workspace support (MLflow operator CR mode)
_workspace = os.environ.get("MLFLOW_WORKSPACE", "").strip()
if _workspace:
mlflow.set_workspace(_workspace)
experiment_name = os.environ.get(
"MLFLOW_EXPERIMENT_NAME", "google-adk-agent"
)
if _workspace:
import mlflow.tracking.fluent as _fluent
client = mlflow.MlflowClient()
exps = client.search_experiments(
filter_string=f"name = '{experiment_name}'"
)
if exps:
_exp_id = exps[0].experiment_id
else:
_exp_id = client.create_experiment(experiment_name)
_fluent._active_experiment_id = _exp_id
else:
_exp_id = mlflow.set_experiment(experiment_name).experiment_id
# ADK uses OpenTelemetry — configure OTLP exporter for MLflow
_otel_endpoint = f"{mlflow_uri.rstrip('/')}/v1/traces"
_otel_headers = {"x-mlflow-experiment-id": _exp_id}
if _workspace:
_otel_headers["x-mlflow-workspace"] = _workspace
_token = os.environ.get("MLFLOW_TRACKING_TOKEN", "")
if _token:
_otel_headers["Authorization"] = f"Bearer {_token}"
_tracer_provider = TracerProvider()
_tracer_provider.add_span_processor(
SimpleSpanProcessor(OTLPSpanExporter(
endpoint=_otel_endpoint,
headers=_otel_headers,
))
)
trace.set_tracer_provider(_tracer_provider)
print(f"[mlflow] Google ADK tracing enabled → {mlflow_uri}")
# ── Define an ADK agent ───────────────────────────────────────
def get_weather(city: str) -> dict:
"""Get current weather for a city."""
return {
"city": city,
"temperature": "72°F",
"condition": "Sunny",
}
agent = Agent(
name="weather_agent",
model="gemini-2.0-flash",
description="A helpful weather assistant",
instruction="Help users check the weather. Use the get_weather "
"tool when they ask about weather in a specific city.",
tools=[get_weather],
)
# ── Run the agent ─────────────────────────────────────────────
session_service = InMemorySessionService()
runner = Runner(agent=agent, app_name="weather_app",
session_service=session_service)
session = session_service.create_session(
app_name="weather_app", user_id="user1"
)
message = types.Content(
role="user",
parts=[types.Part(text="What's the weather in San Francisco?")],
)
import asyncio
async def run():
async for event in runner.run_async(
user_id="user1", session_id=session.id, new_message=message
):
if event.is_final_response():
print(event.content.parts[0].text)
asyncio.run(run())Note: ADK traces tool calls as named spans in the trace timeline (e.g.
execute_tool get_weather), but MLflow's Overview Tool calls tab may show a count of zero. This happens because the OTel span attributes that drive the overview aggregation depend on ADK and MLflow versions aligning on thegen_ai.operation.namesemantic convention. The trace data itself is complete — tool inputs, outputs, and timing are all captured in the Details view.
OpenShift Deployment
The Helm chart handles MLflow deployment and injects the correct environment variables into your agent's pod. The configuration differs between standalone and CR mode.
Standalone mode
Deploys MLflow as a Deployment + Service + PVC in your namespace. The agent connects directly via HTTP. This is the simplest setup and works on any OpenShift cluster. See the chart deployment template for a full working example.
mlflow:
enabled: true
# "standalone" deploys MLflow as Deployment+Service+PVC
# "cr" uses the MLflow operator custom resource
deployMode: "standalone"
crServiceUrl: "https://mlflow.redhat-ods-applications.svc.cluster.local:8443"
image:
repository: ghcr.io/mlflow/mlflow
tag: v3.10.1
port: 5500
args:
- "mlflow"
- "server"
- "--host"
- "0.0.0.0"
- "--port"
- "5500"
- "--backend-store-uri"
- "sqlite:////mlflow/mlflow.db"
- "--default-artifact-root"
- "/mlflow/mlartifacts"
persistence:
size: 5GiCR mode (MLflow Operator)
Uses the MLflow operator to manage MLflow as a custom resource. The operator provides a gateway that handles multi-tenant workspace isolation and service account authentication. An init container merges CA certificates for TLS. See the chart CR template for a full working example.
apiVersion: mlflow.opendatahub.io/v1
kind: MLflow
metadata:
name: mlflow
labels:
{{- include "ai-voice-agent.mlflow.labels" . | nindent 4 }}
spec:
artifactsDestination: 'file:///mlflow/artifacts'
backendStoreUri: 'sqlite:////mlflow/mlflow.db'
image:
image: quay.io/opendatahub/mlflow:odh-stable
replicas: 1
serveArtifacts: true
serviceAccountName: mlflow-sa
storage:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.mlflow.persistence.size | default "10Gi" }}
workers: 1# backend-deployment.yaml (Helm template excerpt)
containers:
- name: backend
env:
{{- if eq (.Values.mlflow.deployMode | default "standalone") "cr" }}
- name: MLFLOW_TRACKING_URI
value: {{ .Values.mlflow.crServiceUrl | default
"https://mlflow.redhat-ods-applications.svc.cluster.local:8443" }}
- name: MLFLOW_WORKSPACE
value: {{ .Release.Namespace | quote }}
- name: REQUESTS_CA_BUNDLE
value: "/tmp/ca-bundle/combined-ca.crt"
- name: MLFLOW_TRACKING_TOKEN_FILE
value: "/var/run/secrets/kubernetes.io/serviceaccount/token"
{{- else }}
- name: MLFLOW_TRACKING_URI
value: {{ printf "http://%s-mlflow:%v"
(include "app.fullname" $) .Values.mlflow.port }}
{{- end }}
- name: MLFLOW_EXPERIMENT_NAME
value: {{ .Values.mlflow.experimentName | default "my-agent" }}# Merge CA certificates for TLS to MLflow operator gateway
initContainers:
- name: merge-ca
image: "{{ .Values.backend.image.repository }}:{{ .Values.backend.image.tag }}"
command: ["/bin/sh", "-c"]
args:
- |
cat /etc/pki/tls/certs/ca-bundle.crt \
> /tmp/ca-bundle/combined-ca.crt
if [ -f /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt ]; then
cat /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt \
>> /tmp/ca-bundle/combined-ca.crt
fi
volumeMounts:
- name: ca-bundle
mountPath: /tmp/ca-bundle