Ejemplos Adicionales: Ejecutar Scripts en la Nube
🧭 Navegación:
- Volver a Ejecutar Scripts en la Nube
- Siguiente: Programar Tareas Automáticas
Ejemplos Prácticos para Despliegue en la Nube
En esta sección, encontrarás ejemplos adicionales y más detallados para desplegar tus scripts de Python en diferentes entornos de nube.
Ejemplo 1: Script de Monitoreo de Sitio Web
Este script verifica si un sitio web está funcionando correctamente y envía una notificación por correo electrónico si detecta algún problema.
#!/usr/bin/env python3
# monitor_sitio.py - Script para monitorear la disponibilidad de un sitio web
import requests
import smtplib
import time
import logging
import os
from email.mime.text import MIMEText
from datetime import datetime
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), "monitor_log.txt")),
logging.StreamHandler()
]
)
# Configuración (mejor usar variables de entorno en producción)
SITIOS_A_MONITOREAR = [
{"url": "https://ejemplo.com", "nombre": "Sitio Principal"},
{"url": "https://api.ejemplo.com", "nombre": "API"},
{"url": "https://blog.ejemplo.com", "nombre": "Blog"}
]
INTERVALO_VERIFICACION = 300 # segundos (5 minutos)
TIEMPO_ESPERA = 10 # segundos para timeout
DESTINATARIOS = ["admin@ejemplo.com", "soporte@ejemplo.com"]
# Configuración de correo (usar variables de entorno en producción)
EMAIL_HOST = "smtp.gmail.com"
EMAIL_PORT = 587
EMAIL_USER = os.environ.get("EMAIL_USER", "")
EMAIL_PASSWORD = os.environ.get("EMAIL_PASSWORD", "")
def verificar_sitio(url, nombre):
"""Verifica si un sitio web está funcionando correctamente."""
try:
inicio = time.time()
respuesta = requests.get(url, timeout=TIEMPO_ESPERA)
tiempo_respuesta = time.time() - inicio
if respuesta.status_code == 200:
logging.info(f"✅ {nombre} ({url}) está funcionando. Tiempo de respuesta: {tiempo_respuesta:.2f}s")
return True, tiempo_respuesta
else:
logging.error(f"❌ {nombre} ({url}) devolvió código de estado: {respuesta.status_code}")
return False, tiempo_respuesta
except requests.RequestException as e:
logging.error(f"❌ Error al verificar {nombre} ({url}): {str(e)}")
return False, 0
def enviar_alerta(sitio, error):
"""Envía una alerta por correo electrónico."""
try:
asunto = f"🚨 ALERTA: {sitio['nombre']} no está disponible"
cuerpo = f"""
Se ha detectado un problema con {sitio['nombre']}.
URL: {sitio['url']}
Fecha y hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Error: {error}
Por favor, verifique el estado del servicio.
Este es un mensaje automático del sistema de monitoreo.
"""
msg = MIMEText(cuerpo)
msg['Subject'] = asunto
msg['From'] = EMAIL_USER
msg['To'] = ", ".join(DESTINATARIOS)
servidor = smtplib.SMTP(EMAIL_HOST, EMAIL_PORT)
servidor.starttls()
servidor.login(EMAIL_USER, EMAIL_PASSWORD)
servidor.send_message(msg)
servidor.quit()
logging.info(f"✉️ Alerta enviada para {sitio['nombre']}")
except Exception as e:
logging.error(f"❌ Error al enviar alerta: {str(e)}")
def main():
"""Función principal del script."""
logging.info("🚀 Iniciando sistema de monitoreo de sitios web")
while True:
for sitio in SITIOS_A_MONITOREAR:
funcionando, tiempo = verificar_sitio(sitio['url'], sitio['nombre'])
if not funcionando:
enviar_alerta(sitio, f"Sitio no disponible o con respuesta incorrecta")
# Esperar hasta la próxima verificación
logging.info(f"💤 Esperando {INTERVALO_VERIFICACION} segundos hasta la próxima verificación")
time.sleep(INTERVALO_VERIFICACION)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("👋 Monitoreo detenido manualmente")
except Exception as e:
logging.critical(f"❌ Error crítico: {str(e)}")
# En producción, aquí podrías enviar una alerta sobre el error del script
Preparación para Despliegue
-
Crear archivo de requisitos:
# requirements.txt requests==2.28.1
-
Configurar variables de entorno en el servidor:
# En el servidor export EMAIL_USER="tu_correo@gmail.com" export EMAIL_PASSWORD="tu_contraseña_o_token" # Para hacerlas permanentes, añadir a ~/.bashrc echo 'export EMAIL_USER="tu_correo@gmail.com"' >> ~/.bashrc echo 'export EMAIL_PASSWORD="tu_contraseña_o_token"' >> ~/.bashrc
-
Script de inicio:
#!/bin/bash # start_monitor.sh cd /opt/monitoring source venv/bin/activate nohup python monitor_sitio.py > monitor.log 2>&1 & echo $! > monitor.pid echo "Monitor iniciado con PID $(cat monitor.pid)"
-
Script de detención:
#!/bin/bash # stop_monitor.sh if [ -f monitor.pid ]; then PID=$(cat monitor.pid) if ps -p $PID > /dev/null; then echo "Deteniendo monitor con PID $PID" kill $PID else echo "El proceso no está en ejecución" fi rm monitor.pid else echo "Archivo PID no encontrado" fi
Ejemplo 2: Servicio Web Simple con Flask
Este ejemplo muestra cómo desplegar un servicio web simple usando Flask, que podría servir como API para tus scripts de automatización.
#!/usr/bin/env python3
# api_servicio.py - API simple para gestionar tareas
from flask import Flask, request, jsonify
import os
import json
import logging
from datetime import datetime
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("api_log.txt"),
logging.StreamHandler()
]
)
app = Flask(__name__)
# Ruta al archivo de datos
DATOS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tareas.json")
def cargar_tareas():
"""Carga las tareas desde el archivo JSON."""
try:
if os.path.exists(DATOS_PATH):
with open(DATOS_PATH, 'r') as f:
return json.load(f)
else:
return {"tareas": []}
except Exception as e:
logging.error(f"Error al cargar tareas: {e}")
return {"tareas": []}
def guardar_tareas(datos):
"""Guarda las tareas en el archivo JSON."""
try:
with open(DATOS_PATH, 'w') as f:
json.dump(datos, f, indent=2)
return True
except Exception as e:
logging.error(f"Error al guardar tareas: {e}")
return False
@app.route('/api/tareas', methods=['GET'])
def obtener_tareas():
"""Endpoint para obtener todas las tareas."""
datos = cargar_tareas()
return jsonify(datos)
@app.route('/api/tareas', methods=['POST'])
def crear_tarea():
"""Endpoint para crear una nueva tarea."""
try:
nueva_tarea = request.json
if not nueva_tarea or 'titulo' not in nueva_tarea:
return jsonify({"error": "Se requiere un título para la tarea"}), 400
datos = cargar_tareas()
# Generar ID para la nueva tarea
tarea_id = 1
if datos["tareas"]:
tarea_id = max(tarea["id"] for tarea in datos["tareas"]) + 1
# Crear la tarea con datos adicionales
tarea_completa = {
"id": tarea_id,
"titulo": nueva_tarea["titulo"],
"descripcion": nueva_tarea.get("descripcion", ""),
"completada": False,
"fecha_creacion": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
datos["tareas"].append(tarea_completa)
if guardar_tareas(datos):
return jsonify(tarea_completa), 201
else:
return jsonify({"error": "Error al guardar la tarea"}), 500
except Exception as e:
logging.error(f"Error al crear tarea: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/tareas/<int:tarea_id>', methods=['GET'])
def obtener_tarea(tarea_id):
"""Endpoint para obtener una tarea específica."""
datos = cargar_tareas()
for tarea in datos["tareas"]:
if tarea["id"] == tarea_id:
return jsonify(tarea)
return jsonify({"error": "Tarea no encontrada"}), 404
@app.route('/api/tareas/<int:tarea_id>', methods=['PUT'])
def actualizar_tarea(tarea_id):
"""Endpoint para actualizar una tarea existente."""
try:
actualizacion = request.json
datos = cargar_tareas()
for i, tarea in enumerate(datos["tareas"]):
if tarea["id"] == tarea_id:
# Actualizar campos permitidos
if "titulo" in actualizacion:
datos["tareas"][i]["titulo"] = actualizacion["titulo"]
if "descripcion" in actualizacion:
datos["tareas"][i]["descripcion"] = actualizacion["descripcion"]
if "completada" in actualizacion:
datos["tareas"][i]["completada"] = actualizacion["completada"]
datos["tareas"][i]["fecha_actualizacion"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if guardar_tareas(datos):
return jsonify(datos["tareas"][i])
else:
return jsonify({"error": "Error al guardar la tarea"}), 500
return jsonify({"error": "Tarea no encontrada"}), 404
except Exception as e:
logging.error(f"Error al actualizar tarea: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/tareas/<int:tarea_id>', methods=['DELETE'])
def eliminar_tarea(tarea_id):
"""Endpoint para eliminar una tarea."""
try:
datos = cargar_tareas()
for i, tarea in enumerate(datos["tareas"]):
if tarea["id"] == tarea_id:
tarea_eliminada = datos["tareas"].pop(i)
if guardar_tareas(datos):
return jsonify({"mensaje": f"Tarea '{tarea_eliminada['titulo']}' eliminada"})
else:
return jsonify({"error": "Error al guardar cambios"}), 500
return jsonify({"error": "Tarea no encontrada"}), 404
except Exception as e:
logging.error(f"Error al eliminar tarea: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/estado', methods=['GET'])
def estado_servicio():
"""Endpoint para verificar el estado del servicio."""
return jsonify({
"estado": "activo",
"version": "1.0.0",
"fecha_hora": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
if __name__ == '__main__':
# En desarrollo
# app.run(debug=True)
# En producción
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)))
Preparación para Despliegue
-
Crear archivo de requisitos:
# requirements.txt flask==2.2.3 gunicorn==20.1.0
-
Archivo de configuración para Gunicorn:
# gunicorn_config.py bind = "0.0.0.0:5000" workers = 4 accesslog = "access.log" errorlog = "error.log" capture_output = True
-
Script de inicio:
#!/bin/bash # start_api.sh cd /opt/api_service source venv/bin/activate nohup gunicorn -c gunicorn_config.py api_servicio:app > api.log 2>&1 & echo $! > api.pid echo "API iniciada con PID $(cat api.pid)"
-
Configuración de Nginx como proxy inverso:
# /etc/nginx/sites-available/api_service server { listen 80; server_name api.tudominio.com; location / { proxy_pass http://localhost:5000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } }
Ejemplo 3: Script de Respaldo en AWS S3
Este script realiza copias de seguridad de archivos y los sube a Amazon S3.
#!/usr/bin/env python3
# s3_backup.py - Script para hacer respaldos en AWS S3
import os
import sys
import boto3
import logging
import tarfile
import tempfile
from datetime import datetime
from botocore.exceptions import ClientError
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), "backup_log.txt")),
logging.StreamHandler()
]
)
# Configuración (mejor usar variables de entorno en producción)
S3_BUCKET = os.environ.get("S3_BUCKET", "mi-bucket-de-respaldos")
S3_PREFIX = os.environ.get("S3_PREFIX", "backups/")
BACKUP_DIRS = [
"/var/www/html",
"/etc/nginx",
"/home/usuario/datos"
]
MAX_BACKUPS = 7 # Mantener solo los últimos 7 respaldos
def crear_archivo_tar(directorios):
"""Crea un archivo tar.gz con los directorios especificados."""
fecha_actual = datetime.now().strftime("%Y%m%d_%H%M%S")
nombre_archivo = f"backup_{fecha_actual}.tar.gz"
ruta_temporal = os.path.join(tempfile.gettempdir(), nombre_archivo)
try:
with tarfile.open(ruta_temporal, "w:gz") as tar:
for directorio in directorios:
if os.path.exists(directorio):
logging.info(f"Añadiendo {directorio} al archivo...")
# Obtener ruta base para mantener estructura relativa
base_dir = os.path.dirname(directorio)
# Añadir al tar manteniendo estructura relativa
tar.add(directorio, arcname=os.path.relpath(directorio, base_dir))
else:
logging.warning(f"El directorio {directorio} no existe, se omitirá")
logging.info(f"Archivo tar creado: {ruta_temporal}")
return ruta_temporal, nombre_archivo
except Exception as e:
logging.error(f"Error al crear archivo tar: {e}")
return None, None
def subir_a_s3(archivo_local, nombre_archivo):
"""Sube un archivo a Amazon S3."""
try:
s3_client = boto3.client('s3')
s3_key = f"{S3_PREFIX}{nombre_archivo}"
logging.info(f"Subiendo {archivo_local} a s3://{S3_BUCKET}/{s3_key}...")
with open(archivo_local, "rb") as file:
s3_client.upload_fileobj(file, S3_BUCKET, s3_key)
logging.info("Archivo subido exitosamente")
return True
except ClientError as e:
logging.error(f"Error de AWS al subir archivo: {e}")
return False
except Exception as e:
logging.error(f"Error al subir archivo: {e}")
return False
def listar_backups():
"""Lista los backups existentes en S3."""
try:
s3_client = boto3.client('s3')
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET,
Prefix=S3_PREFIX
)
if 'Contents' in response:
return sorted([
{
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified']
}
for obj in response['Contents']
], key=lambda x: x['last_modified'])
else:
return []
except Exception as e:
logging.error(f"Error al listar backups: {e}")
return []
def eliminar_backups_antiguos():
"""Elimina los backups más antiguos, manteniendo solo los últimos MAX_BACKUPS."""
try:
backups = listar_backups()
if len(backups) <= MAX_BACKUPS:
logging.info(f"Solo hay {len(backups)} backups, no es necesario eliminar ninguno")
return
# Calcular cuántos backups eliminar
a_eliminar = len(backups) - MAX_BACKUPS
backups_a_eliminar = backups[:a_eliminar]
s3_client = boto3.client('s3')
for backup in backups_a_eliminar:
logging.info(f"Eliminando backup antiguo: {backup['key']}")
s3_client.delete_object(
Bucket=S3_BUCKET,
Key=backup['key']
)
logging.info(f"Se eliminaron {a_eliminar} backups antiguos")
except Exception as e:
logging.error(f"Error al eliminar backups antiguos: {e}")
def main():
"""Función principal del script."""
logging.info("🚀 Iniciando proceso de respaldo")
# Crear archivo tar
archivo_local, nombre_archivo = crear_archivo_tar(BACKUP_DIRS)
if not archivo_local:
logging.error("❌ No se pudo crear el archivo de respaldo")
sys.exit(1)
# Subir a S3
if subir_a_s3(archivo_local, nombre_archivo):
logging.info("✅ Respaldo completado exitosamente")
# Eliminar archivo temporal
os.remove(archivo_local)
logging.info(f"🧹 Archivo temporal eliminado: {archivo_local}")
# Eliminar backups antiguos
eliminar_backups_antiguos()
else:
logging.error("❌ No se pudo completar el respaldo")
sys.exit(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("👋 Proceso interrumpido manualmente")
sys.exit(0)
except Exception as e:
logging.critical(f"❌ Error crítico: {str(e)}")
sys.exit(1)
Preparación para Despliegue
-
Crear archivo de requisitos:
# requirements.txt boto3==1.26.90
-
Configurar credenciales de AWS:
# En el servidor mkdir -p ~/.aws # Crear archivo de credenciales cat > ~/.aws/credentials << EOF [default] aws_access_key_id = TU_ACCESS_KEY aws_secret_access_key = TU_SECRET_KEY EOF # Crear archivo de configuración cat > ~/.aws/config << EOF [default] region = us-east-1 output = json EOF # Establecer permisos adecuados chmod 600 ~/.aws/credentials
-
Configurar variables de entorno:
export S3_BUCKET="mi-bucket-de-respaldos" export S3_PREFIX="servidor1/backups/" # Para hacerlas permanentes echo 'export S3_BUCKET="mi-bucket-de-respaldos"' >> ~/.bashrc echo 'export S3_PREFIX="servidor1/backups/"' >> ~/.bashrc
Consejos Adicionales para Despliegue en la Nube
Monitoreo y Alertas
Para asegurar que tus scripts funcionen correctamente en la nube:
-
Implementa logging detallado:
import logging # Configuración básica logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) # Uso en diferentes partes del código logger = logging.getLogger("mi_script") logger.info("Iniciando proceso") logger.warning("Advertencia: recurso no encontrado") logger.error("Error al procesar datos")
-
Configura alertas por correo o SMS para errores críticos.
-
Utiliza servicios de monitoreo como:
- AWS CloudWatch
- Google Cloud Monitoring
- Datadog
- Prometheus + Grafana
Seguridad
-
Nunca almacenes credenciales en el código:
# MAL password = "mi_contraseña_secreta" # No hagas esto # BIEN import os password = os.environ.get("MI_PASSWORD")
-
Usa un usuario con privilegios limitados para ejecutar tus scripts.
-
Implementa autenticación para APIs y servicios web:
# Ejemplo simple con Flask from functools import wraps from flask import request, jsonify def require_api_key(f): @wraps(f) def decorated(*args, **kwargs): api_key = request.headers.get('X-API-Key') if api_key and api_key == os.environ.get('API_KEY'): return f(*args, **kwargs) return jsonify({"error": "Acceso no autorizado"}), 401 return decorated @app.route('/api/datos') @require_api_key def obtener_datos(): return jsonify({"datos": "información confidencial"})
Recuperación ante Fallos
-
Implementa reintentos para operaciones que pueden fallar:
def operacion_con_reintentos(max_intentos=3, espera=5): for intento in range(max_intentos): try: # Operación que puede fallar resultado = operacion_riesgosa() return resultado except Exception as e: logging.warning(f"Intento {intento+1}/{max_intentos} falló: {e}") if intento < max_intentos - 1: logging.info(f"Esperando {espera} segundos antes de reintentar...") time.sleep(espera) else: logging.error("Se agotaron los reintentos") raise
-
Guarda el estado para poder recuperarte de interrupciones:
def procesar_items(items): # Cargar estado anterior si existe estado_archivo = "estado_procesamiento.json" if os.path.exists(estado_archivo): with open(estado_archivo, 'r') as f: estado = json.load(f) items_procesados = estado.get("procesados", []) logging.info(f"Recuperando estado: {len(items_procesados)} items ya procesados") else: items_procesados = [] for item in items: if item["id"] in items_procesados: logging.info(f"Item {item['id']} ya procesado, omitiendo") continue try: procesar_item(item) items_procesados.append(item["id"]) # Guardar estado después de cada item with open(estado_archivo, 'w') as f: json.dump({"procesados": items_procesados}, f) except Exception as e: logging.error(f"Error al procesar item {item['id']}: {e}")
Recursos Adicionales
- AWS Lambda Developer Guide
- Google Cloud Functions Documentation
- DigitalOcean App Platform Documentation
- Heroku Dev Center
- Flask Deployment Options
🧭 Navegación:
- Volver a Ejecutar Scripts en la Nube
- Siguiente: Programar Tareas Automáticas