mcp-65-tools.mdcd ../blog
mcp-65-tools.md
blog / mcp-65-tools.md · 847 lines · 18 min read

~/asynclabs/blog $ cat mcp-65-tools.md | head -1

# Cómo construimos un MCP que conecta IA con 65 herramientas operacionales

~/asynclabs/blog $ stat --format='%s lines, %Y modified' mcp-65-tools.md

847 lines, 2026-03-23 · tags: [mcp, claude-ai, fastapi, sse, telecom]

~/asynclabs/blog $ asynclabs render --format=terminal mcp-65-tools.md

Un operador telecom líder tenía un problema que cualquier empresa con más de 3 sistemas internos reconoce: la información estaba fragmentada en 6+ aplicaciones distintas. Sistema de tareas para gestión de incidencias. Sistema de sitios para inventario de infraestructura. Monitoreo de red para alarmas en tiempo real. Field service para gestión de cuadrillas en terreno. Un panel Excel de priorización. Email para coordinación. Un operador necesitaba 10-15 minutos y 6 pestañas abiertas para responder una pregunta como "¿cuáles son las tareas críticas estancadas del sitio SITE-001?"

El costo no era solo tiempo. Era context switching, errores de transcripción, y decisiones basadas en datos incompletos porque nadie tiene la paciencia de cruzar 6 sistemas para cada consulta.

# Arquitectura: Model Context Protocol

Diseñamos un servidor MCP (Model Context Protocol) que conecta Claude a toda la infraestructura operacional del cliente. El operador pregunta en lenguaje natural. Claude orquesta las herramientas necesarias, ejecuta las queries, consolida los resultados y responde en <2 segundos.

MCP SERVER (FastAPI)
════════════════════════════════════

  SSE/HTTP ──▶ NLP Pipeline ──▶ Agentic Loop
  Endpoint     (Entity Ext)     (max 10 iter)
                                     │
                    ┌────────────────┤
                    ▼                ▼
              Tool Registry    Tool Executor
              (65 active)      (asyncio.gather)
                                     │
         ┌───────────────────────────┤
         │      Guardrails Layer     │
         │  Anti-hallucination       │
         │  Write protection         │
         │  Scope restriction        │
         └───────────────────────────┤
                                     ▼
         External Systems (HTTP/2 + CB)
         Tasks · Sites · Panel · KPIs
         Field · Network · Email · WA

El servidor expone 65 herramientas activas organizadas en 13 módulos. Cada herramienta hereda de BaseTool con validación Pydantic v2, flag cacheable para Redis, y schema auto-generado para Claude.

## Capa 1: NLP Pipeline (Entity Extraction)

Cada mensaje del operador pasa primero por extracción de entidades. Claude Haiku identifica en ~200ms: sitios (SITE-001, SITE-002), números de tarea, estados, fechas, zonas, gestores, tipos de trabajo, niveles de criticidad. Para mensajes cortos (<80 chars), un fast-path regex lo resuelve en <1ms.

# Entity extraction prompt (simplified)
SYSTEM_PROMPT = """
Extract operational entities from the operator message.
Return JSON with these fields:
sites: list[str] # e.g. ["SITE-001", "SITE-002"]
task_ids: list[str] # e.g. ["T-12345"]
statuses: list[str] # e.g. ["open", "stalled"]
date_range: {from, to} # ISO 8601
zones: list[str] # geographic zones
managers: list[str] # assigned managers
criticality: str # "critical" | "high" | "medium" | "low"
work_type: str # "POP" | "TOT" | "preventive" | ...
Rules:
- site IDs are alphanumeric: SITE-XXX, XX-YYYY
- task IDs always start with T-
- dates relative to {current_date}
- if ambiguous, return null (never guess)
"""

El SmartParameterMapper toma las entidades extraídas y las mapea a los parámetros de cada herramienta usando el JSON Schema de cada tool. Si Claude necesita get_tasks_by_site, el mapper sabe que site_id viene de entities.sites[0] y status de entities.statuses.

class SmartParameterMapper:
"""Maps extracted entities to tool parameter schemas."""
def map(self, entities: dict, tool: BaseTool) -> dict:
schema = tool.get_input_schema()
params = {}
for field_name, field_info in schema["properties"].items():
# Direct entity match
if field_name in entities and entities[field_name]:
params[field_name] = entities[field_name]
continue
# Alias resolution (site_id ← sites[0])
alias = self.ALIASES.get(field_name)
if alias and alias in entities:
value = entities[alias]
params[field_name] = value[0] if isinstance(value, list) else value
# Validate with Pydantic
return tool.InputModel(**params).model_dump()

## Capa 2: Agentic Loop (Tool Orchestration)

Claude recibe el mensaje del operador, las entidades extraídas, y el catálogo de 65 herramientas con sus schemas. Decide qué herramientas llamar en cada iteración. Max 10 iteraciones. Si necesita más datos, encadena llamadas automáticamente.

# Agentic loop trace (real example, redacted)
# Query: "resumen de tareas críticas estancadas en zona norte"
[iter 1] Claude → tool_call: search_tasks(
status="stalled", criticality="critical", zone="norte"
)
→ result: 12 tasks found
[iter 2] Claude → tool_call: get_kpi_summary(zone="norte", period="7d")
→ result: {mttr: 4.2h, open: 45, closed: 31, sla_breach: 8}
[iter 3] Claude → tool_call: get_panel_summary(zone="norte", criticality="critical")
→ result: {escalated: 3, aging_gt_48h: 5, unassigned: 2}
[iter 4] Claude → final_response:
"En zona norte hay 12 tareas críticas estancadas.
MTTR últimos 7 días: 4.2h. 8 en breach de SLA.
3 escaladas, 5 con más de 48h sin movimiento,
2 sin gestor asignado."
Total time: 1.8s (4 iterations, 3 parallel tool calls)

El LoopDetector previene loops infinitos con 3 reglas: max 5 llamadas idénticas (same tool + same params), max 4 errores consecutivos, y detección de ciclos (A→B→A→B). Si cualquier regla se activa, el loop termina con el mejor resultado disponible.

class LoopDetector:
def __init__(self, max_identical: int = 5, max_errors: int = 4):
self.call_history: list[str] = []
self.error_count: int = 0
self.seen_cycles: set[tuple] = set()
def check(self, tool_name: str, params: dict) -> bool:
call_sig = f"{tool_name}:{hash(frozenset(params.items()))}"
# Rule 1: identical calls
identical = self.call_history.count(call_sig)
if identical >= self.max_identical:
logger.warning(f"Loop detected: {tool_name} called {identical}x")
return True # STOP
# Rule 2: consecutive errors
if self.error_count >= self.max_errors:
return True # STOP
# Rule 3: A→B→A→B cycle detection
if len(self.call_history) >= 4:
last_4 = tuple(self.call_history[-4:])
if last_4[:2] == last_4[2:]:
return True # STOP
self.call_history.append(call_sig)
return False # CONTINUE

## Capa 3: Parallel Execution + Circuit Breaker

Cuando Claude decide llamar múltiples herramientas en una iteración, se ejecutan con asyncio.gather(). HTTP client con httpx HTTP/2 (connection pooling, multiplexing), retry con exponential backoff, y circuit breaker por servicio externo.

# Circuit breaker implementation (per external service)
class CircuitBreaker:
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject all calls
HALF_OPEN = "half_open" # Testing recovery
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
async def call(self, func, *args, **kwargs):
if self.state == self.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = self.HALF_OPEN # Try one request
else:
raise CircuitOpenError(
f"Circuit open, retry in "
f"{self.recovery_timeout - elapsed:.0f}s"
)
try:
result = await func(*args, **kwargs)
if self.state == self.HALF_OPEN:
self.state = self.CLOSED # Recovery confirmed
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN # Trip the breaker
logger.error(f"Circuit OPEN after {self.failure_count} failures")
raise
# State transitions:
# CLOSED ──[5 failures]──▶ OPEN ──[60s]──▶ HALF_OPEN ──[success]──▶ CLOSED
# │
# └──[failure]──▶ OPEN

Cada servicio externo tiene su propio circuit breaker. Si el API de tareas se cae, las herramientas de KPIs y sitios siguen operando. El operador recibe una respuesta parcial con un aviso de qué servicio está degradado.

## Capa 4: Guardrails (Anti-Hallucination + Write Protection)

Los guardrails no son un "nice to have". En un centro de control de operaciones, una respuesta inventada puede despachar una cuadrilla al sitio equivocado.

GUARDRAILS MATRIX
═════════════════════════════════════

Anti-hallucination
  Tool calls OBLIGATORIOS para datos
  operacionales. Claude NO puede decir
  "hay 5 tareas" sin search_tasks().
  Correction: contador, no booleano.

Write protection
  dry_run=true por defecto en las 5
  herramientas de escritura. Preview
  primero, confirma, luego ejecuta.

Scope restriction
  Solo consultas operacionales del
  centro de control. Fuera de scope
  → respuesta educada + redirect.

Data sanitization
  INT64_MAX → "N/D"
  Epoch 0 → "Sin registro"
  Nulls → manejo explícito

Rate limiting
  10 tool calls / iteración
  50 tool calls / conversación
  3 write ops / conversación

PII filtering
  Nombres → iniciales en logs
  RUTs → masked
  Teléfonos → últimos 4 dígitos

La verificación anti-hallucination funciona con un contador de correcciones, no un booleano. Si Claude genera un dato sin tool call respaldándolo, el sistema incrementa el contador y fuerza re-generación. Esto permite tracking de qué tipos de preguntas tienden a producir hallucinations.

class AntiHallucinationGuard:
"""Ensures operational data is always backed by tool calls."""
OPERATIONAL_PATTERNS = [
r"\d+ tareas?", # "hay 5 tareas"
r"MTTR.*\d+", # "MTTR es 4.2h"
r"SLA.*\d+%", # "cumplimiento SLA 92%"
r"sitio.*(?:activo|down)", # "sitio SITE-001 activo"
]
def validate(self, response: str, tool_calls: list[ToolCall]) -> ValidationResult:
corrections = 0
for pattern in self.OPERATIONAL_PATTERNS:
if re.search(pattern, response):
# Check if any tool call supports this claim
if not self._find_supporting_call(pattern, tool_calls):
corrections += 1
logger.warning(
f"Unsupported claim detected: {pattern}"
)
if corrections > 0:
return ValidationResult(
valid=False,
corrections=corrections,
action="regenerate_with_tool_calls"
)
return ValidationResult(valid=True, corrections=0)

## Los 13 módulos: 65 tools activas, 35 deshabilitadas

El registry final tiene 100 herramientas definidas. 65 activas, 35 deshabilitadas con razón documentada en DISABLED_TOOLS. Las deshabilitaciones no se borran, se mantienen para auditoría.

# Tool registry structure
MODULE ACTIVE DISABLED NOTES
─────────────────────────────────────────────────────────
tasks 20 8 Core search + detail + SLA + timeline
tasks_extra 10 5 Aging, stalled, stats aggregations
kpis 16 4 Realtime, period, MTTR, patterns
field 12 6 GPS, distance, task creation
panel 11 3 Multi-criteria filter, escalations
sites 11 4 Consolidated info (replaced 3 legacy)
analysis 5 0 Briefing, classification
network 5 2 Health, alarm-task correlation
email 3 1 Send, templates, drafts
whatsapp 3 0 Send, media, templates
health 3 0 System health, uptime, latency
monitors 4 1 Status, alerts, thresholds
quick 2 1 Fast lookups, shortcuts
─────────────────────────────────────────────────────────
TOTAL 65 35 100 tools defined

Ejemplo de deshabilitación documentada:

DISABLED_TOOLS = {
"get_task_details": {
"reason": "Latency 30-90s. Replaced by get_field_task_details (<1s)",
"disabled_date": "2025-11-15",
"replacement": "get_field_task_details",
},
"get_site_basic_info": {
"reason": "Consolidated into get_site_info (single call vs 3)",
"disabled_date": "2025-12-01",
"replacement": "get_site_info",
},
"get_kpi_raw_data": {
"reason": "0 operational usage in 30 days. Heavy payload (>2MB)",
"disabled_date": "2026-01-10",
"replacement": None,
},
}

Cada herramienta sigue la misma estructura:

class SearchTasks(BaseTool):
"""Search tasks with multi-criteria filters."""
name = "search_tasks"
description = "Search operational tasks by status, site, zone, criticality..."
cacheable = True
cache_ttl = 30 # seconds
class InputModel(BaseModel):
status: str | None = Field(None, description="Task status filter")
site_id: str | None = Field(None, description="Site identifier")
zone: str | None = Field(None, description="Geographic zone")
criticality: str | None = Field(None, enum=["critical","high","medium","low"])
date_from: str | None = Field(None, description="ISO 8601 start date")
date_to: str | None = Field(None, description="ISO 8601 end date")
limit: int = Field(50, ge=1, le=200)
async def execute(self, params: dict) -> ToolResult:
validated = self.InputModel(**params)
async with self.http_client() as client:
response = await client.get(
f"{self.base_url}/api/tasks/search",
params=validated.model_dump(exclude_none=True),
timeout=10.0,
)
response.raise_for_status()
tasks = response.json()["data"]
return ToolResult(
data=tasks,
count=len(tasks),
source="tasks_api",
cached=False,
)

## WhatsApp Relay: IA en terreno

Los técnicos en campo no tienen acceso a la web app. Integramos un relay WhatsApp via Twilio que conecta al mismo motor MCP.

# WhatsApp integration flow

Técnico → Twilio → Relay → MCP Server
                              │
  "estado sitio SITE-002"     │
  ──────────────────────────▶ │
                              │
     NLP + Agentic Loop       │
     (same engine as web)     │
                              │
  ◀────────────────────────── │
  "SITE-002:                  │
   3 tareas activas           │
   0 alarmas críticas         │
   Acceso: autorizado"        │

# Polling: every 5s
# Apache reverse proxy + Let's Encrypt
# Media: images/docs via MediaProcessor
# Response: max 1500 chars (WA limit)

El mismo motor MCP procesa mensajes de WhatsApp y de la web. La única diferencia es el ResponseFormatter: la web recibe markdown con tablas, WhatsApp recibe texto plano truncado a 1500 caracteres con priorización de información crítica primero.

## Observabilidad: Prometheus + Structured Logging

# Key metrics exposed to Prometheus
mcp_tool_calls_total{tool, status} # Counter per tool
mcp_tool_latency_seconds{tool} # Histogram (p50, p95, p99)
mcp_agentic_loop_iterations{conversation} # Gauge per conversation
mcp_circuit_breaker_state{service} # 0=closed, 1=open, 2=half_open
mcp_entity_extraction_seconds # Histogram NLP latency
mcp_hallucination_corrections_total # Counter anti-halluc triggers
mcp_cache_hit_ratio{module} # Gauge per module
mcp_whatsapp_messages_total{direction} # Counter in/out
# Alert rules (Prometheus → AlertManager)
- alert: MCPHighLatency
expr: histogram_quantile(0.95, mcp_tool_latency_seconds) > 5
for: 5m
- alert: MCPCircuitOpen
expr: mcp_circuit_breaker_state == 1
for: 1m
- alert: MCPHighHallucinationRate
expr: rate(mcp_hallucination_corrections_total[5m]) > 0.1
for: 10m

Structured logging con JSON lines permite búsqueda rápida en Grafana Loki. Cada log entry incluye conversation_id, tool_name, latency_ms, cache_hit, y user_id (hashed).

## Certificación: 123/123 tests en 19 categorías

CATEGORY TESTS PASS FAIL TIME(s)
────────────────────────────────────────────────────
Health 3 3 0 12
Task Search 10 10 0 145
Task Detail 10 10 0 132
Panel 8 8 0 98
Sites 6 6 0 67
KPIs 8 8 0 104
Autorización de acceso 6 6 0 78
Employees 5 5 0 54
Write Operations 8 8 0 112
Monitoreo de red 4 4 0 48
Emojis 5 5 0 35
Dates 4 4 0 42
Security 3 3 0 28
Anti-Hallucination 5 5 0 89
Conversational Flow 20 20 0 287
Parity 8 8 0 156
Email 2 2 0 24
Analysis 3 3 0 67
Format 5 5 0 45
────────────────────────────────────────────────────
TOTAL 123 123 0 1623s
Verdict: ✓ GO FOR PRODUCTION

Los tests de Anti-Hallucination son los más interesantes: preguntas diseñadas para inducir respuestas inventadas. "¿Cuántas tareas resolvió el gestor Martinez ayer?", si Claude responde sin llamar a get_tasks_by_manager(), el test falla. Los 5 pasaron: el sistema siempre ejecuta tool calls antes de afirmar datos operacionales.

Los tests de Conversational Flow (20 tests) verifican conversaciones multi-turno: preguntas de seguimiento ("¿y las de zona sur?"), cambios de contexto, referencias anafóricas ("dame más detalles de la tercera"), y edge cases como preguntas fuera de scope ("¿cuál es el clima hoy?").

## El patrón MCP como arquitectura reutilizable

Lo más valioso de este proyecto no es la implementación específica, es el patrón. Cualquier empresa con APIs internas puede implementar el mismo enfoque:

# The MCP pattern (generalized)
1. TOOL REGISTRY
- Pydantic v2 schemas for input validation
- Auto-generated JSON Schema for LLM consumption
- Cacheable flag + TTL per tool
- Disabled tools with documented reasons
2. NLP PIPELINE
- Entity extraction (LLM or regex fast-path)
- Parameter mapping (entities → tool params)
- Context management (conversation history)
3. AGENTIC LOOP
- LLM decides tool calls per iteration
- Loop detection (identical calls, errors, cycles)
- Max iterations cap
- Partial result assembly
4. EXECUTION LAYER
- asyncio.gather() for parallel tool calls
- httpx HTTP/2 with connection pooling
- Circuit breaker per external service
- Retry with exponential backoff
5. GUARDRAILS
- Anti-hallucination (tool call verification)
- Write protection (dry_run default)
- Scope restriction
- Data sanitization
- Rate limiting
6. OBSERVABILITY
- Prometheus metrics per tool
- Structured logging (JSON lines)
- Alerting on latency, errors, hallucinations

La IA deja de ser un chatbot que sugiere respuestas genéricas. Se convierte en un operador autónomo de infraestructura que ejecuta queries reales contra sistemas reales, con guardrails que garantizan que cada dato está respaldado por una fuente verificable. El operador humano pasa de copiar datos entre pestañas a tomar decisiones con información consolidada en <2 segundos.

mainblog/mcp-65-tools.md
MarkdownUTF-8847 lines18 min

¿Quieres un MCP con IA integrado a tus sistemas?

Diseñamos servidores MCP a medida que integran tus APIs, bases de datos y herramientas en un solo asistente con guardrails enterprise.

Conversemos sobre tu caso