Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Ejemplos Adicionales: Ejecutar Scripts en la Nube

🧭 Navegación:

Ejemplos Prácticos para Despliegue en la Nube

En esta sección, encontrarás ejemplos adicionales y más detallados para desplegar tus scripts de Python en diferentes entornos de nube.

Ejemplo 1: Script de Monitoreo de Sitio Web

Este script verifica si un sitio web está funcionando correctamente y envía una notificación por correo electrónico si detecta algún problema.

#!/usr/bin/env python3
# monitor_sitio.py - Script para monitorear la disponibilidad de un sitio web

import requests
import smtplib
import time
import logging
import os
from email.mime.text import MIMEText
from datetime import datetime

# Configuración de logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), "monitor_log.txt")),
        logging.StreamHandler()
    ]
)

# Configuración (mejor usar variables de entorno en producción)
SITIOS_A_MONITOREAR = [
    {"url": "https://ejemplo.com", "nombre": "Sitio Principal"},
    {"url": "https://api.ejemplo.com", "nombre": "API"},
    {"url": "https://blog.ejemplo.com", "nombre": "Blog"}
]
INTERVALO_VERIFICACION = 300  # segundos (5 minutos)
TIEMPO_ESPERA = 10  # segundos para timeout
DESTINATARIOS = ["admin@ejemplo.com", "soporte@ejemplo.com"]

# Configuración de correo (usar variables de entorno en producción)
EMAIL_HOST = "smtp.gmail.com"
EMAIL_PORT = 587
EMAIL_USER = os.environ.get("EMAIL_USER", "")
EMAIL_PASSWORD = os.environ.get("EMAIL_PASSWORD", "")

def verificar_sitio(url, nombre):
    """Verifica si un sitio web está funcionando correctamente."""
    try:
        inicio = time.time()
        respuesta = requests.get(url, timeout=TIEMPO_ESPERA)
        tiempo_respuesta = time.time() - inicio
        
        if respuesta.status_code == 200:
            logging.info(f"✅ {nombre} ({url}) está funcionando. Tiempo de respuesta: {tiempo_respuesta:.2f}s")
            return True, tiempo_respuesta
        else:
            logging.error(f"❌ {nombre} ({url}) devolvió código de estado: {respuesta.status_code}")
            return False, tiempo_respuesta
    except requests.RequestException as e:
        logging.error(f"❌ Error al verificar {nombre} ({url}): {str(e)}")
        return False, 0

def enviar_alerta(sitio, error):
    """Envía una alerta por correo electrónico."""
    try:
        asunto = f"🚨 ALERTA: {sitio['nombre']} no está disponible"
        
        cuerpo = f"""
        Se ha detectado un problema con {sitio['nombre']}.
        
        URL: {sitio['url']}
        Fecha y hora: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        Error: {error}
        
        Por favor, verifique el estado del servicio.
        
        Este es un mensaje automático del sistema de monitoreo.
        """
        
        msg = MIMEText(cuerpo)
        msg['Subject'] = asunto
        msg['From'] = EMAIL_USER
        msg['To'] = ", ".join(DESTINATARIOS)
        
        servidor = smtplib.SMTP(EMAIL_HOST, EMAIL_PORT)
        servidor.starttls()
        servidor.login(EMAIL_USER, EMAIL_PASSWORD)
        servidor.send_message(msg)
        servidor.quit()
        
        logging.info(f"✉️ Alerta enviada para {sitio['nombre']}")
    except Exception as e:
        logging.error(f"❌ Error al enviar alerta: {str(e)}")

def main():
    """Función principal del script."""
    logging.info("🚀 Iniciando sistema de monitoreo de sitios web")
    
    while True:
        for sitio in SITIOS_A_MONITOREAR:
            funcionando, tiempo = verificar_sitio(sitio['url'], sitio['nombre'])
            
            if not funcionando:
                enviar_alerta(sitio, f"Sitio no disponible o con respuesta incorrecta")
        
        # Esperar hasta la próxima verificación
        logging.info(f"💤 Esperando {INTERVALO_VERIFICACION} segundos hasta la próxima verificación")
        time.sleep(INTERVALO_VERIFICACION)

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        logging.info("👋 Monitoreo detenido manualmente")
    except Exception as e:
        logging.critical(f"❌ Error crítico: {str(e)}")
        # En producción, aquí podrías enviar una alerta sobre el error del script

Preparación para Despliegue

  1. Crear archivo de requisitos:

    # requirements.txt
    requests==2.28.1
    
  2. Configurar variables de entorno en el servidor:

    # En el servidor
    export EMAIL_USER="tu_correo@gmail.com"
    export EMAIL_PASSWORD="tu_contraseña_o_token"
    
    # Para hacerlas permanentes, añadir a ~/.bashrc
    echo 'export EMAIL_USER="tu_correo@gmail.com"' >> ~/.bashrc
    echo 'export EMAIL_PASSWORD="tu_contraseña_o_token"' >> ~/.bashrc
    
  3. Script de inicio:

    #!/bin/bash
    # start_monitor.sh
    cd /opt/monitoring
    source venv/bin/activate
    nohup python monitor_sitio.py > monitor.log 2>&1 &
    echo $! > monitor.pid
    echo "Monitor iniciado con PID $(cat monitor.pid)"
    
  4. Script de detención:

    #!/bin/bash
    # stop_monitor.sh
    if [ -f monitor.pid ]; then
      PID=$(cat monitor.pid)
      if ps -p $PID > /dev/null; then
        echo "Deteniendo monitor con PID $PID"
        kill $PID
      else
        echo "El proceso no está en ejecución"
      fi
      rm monitor.pid
    else
      echo "Archivo PID no encontrado"
    fi
    

Ejemplo 2: Servicio Web Simple con Flask

Este ejemplo muestra cómo desplegar un servicio web simple usando Flask, que podría servir como API para tus scripts de automatización.

#!/usr/bin/env python3
# api_servicio.py - API simple para gestionar tareas

from flask import Flask, request, jsonify
import os
import json
import logging
from datetime import datetime

# Configuración de logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("api_log.txt"),
        logging.StreamHandler()
    ]
)

app = Flask(__name__)

# Ruta al archivo de datos
DATOS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tareas.json")

def cargar_tareas():
    """Carga las tareas desde el archivo JSON."""
    try:
        if os.path.exists(DATOS_PATH):
            with open(DATOS_PATH, 'r') as f:
                return json.load(f)
        else:
            return {"tareas": []}
    except Exception as e:
        logging.error(f"Error al cargar tareas: {e}")
        return {"tareas": []}

def guardar_tareas(datos):
    """Guarda las tareas en el archivo JSON."""
    try:
        with open(DATOS_PATH, 'w') as f:
            json.dump(datos, f, indent=2)
        return True
    except Exception as e:
        logging.error(f"Error al guardar tareas: {e}")
        return False

@app.route('/api/tareas', methods=['GET'])
def obtener_tareas():
    """Endpoint para obtener todas las tareas."""
    datos = cargar_tareas()
    return jsonify(datos)

@app.route('/api/tareas', methods=['POST'])
def crear_tarea():
    """Endpoint para crear una nueva tarea."""
    try:
        nueva_tarea = request.json
        
        if not nueva_tarea or 'titulo' not in nueva_tarea:
            return jsonify({"error": "Se requiere un título para la tarea"}), 400
        
        datos = cargar_tareas()
        
        # Generar ID para la nueva tarea
        tarea_id = 1
        if datos["tareas"]:
            tarea_id = max(tarea["id"] for tarea in datos["tareas"]) + 1
        
        # Crear la tarea con datos adicionales
        tarea_completa = {
            "id": tarea_id,
            "titulo": nueva_tarea["titulo"],
            "descripcion": nueva_tarea.get("descripcion", ""),
            "completada": False,
            "fecha_creacion": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        
        datos["tareas"].append(tarea_completa)
        
        if guardar_tareas(datos):
            return jsonify(tarea_completa), 201
        else:
            return jsonify({"error": "Error al guardar la tarea"}), 500
    
    except Exception as e:
        logging.error(f"Error al crear tarea: {e}")
        return jsonify({"error": str(e)}), 500

@app.route('/api/tareas/<int:tarea_id>', methods=['GET'])
def obtener_tarea(tarea_id):
    """Endpoint para obtener una tarea específica."""
    datos = cargar_tareas()
    
    for tarea in datos["tareas"]:
        if tarea["id"] == tarea_id:
            return jsonify(tarea)
    
    return jsonify({"error": "Tarea no encontrada"}), 404

@app.route('/api/tareas/<int:tarea_id>', methods=['PUT'])
def actualizar_tarea(tarea_id):
    """Endpoint para actualizar una tarea existente."""
    try:
        actualizacion = request.json
        datos = cargar_tareas()
        
        for i, tarea in enumerate(datos["tareas"]):
            if tarea["id"] == tarea_id:
                # Actualizar campos permitidos
                if "titulo" in actualizacion:
                    datos["tareas"][i]["titulo"] = actualizacion["titulo"]
                if "descripcion" in actualizacion:
                    datos["tareas"][i]["descripcion"] = actualizacion["descripcion"]
                if "completada" in actualizacion:
                    datos["tareas"][i]["completada"] = actualizacion["completada"]
                
                datos["tareas"][i]["fecha_actualizacion"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                
                if guardar_tareas(datos):
                    return jsonify(datos["tareas"][i])
                else:
                    return jsonify({"error": "Error al guardar la tarea"}), 500
        
        return jsonify({"error": "Tarea no encontrada"}), 404
    
    except Exception as e:
        logging.error(f"Error al actualizar tarea: {e}")
        return jsonify({"error": str(e)}), 500

@app.route('/api/tareas/<int:tarea_id>', methods=['DELETE'])
def eliminar_tarea(tarea_id):
    """Endpoint para eliminar una tarea."""
    try:
        datos = cargar_tareas()
        
        for i, tarea in enumerate(datos["tareas"]):
            if tarea["id"] == tarea_id:
                tarea_eliminada = datos["tareas"].pop(i)
                
                if guardar_tareas(datos):
                    return jsonify({"mensaje": f"Tarea '{tarea_eliminada['titulo']}' eliminada"})
                else:
                    return jsonify({"error": "Error al guardar cambios"}), 500
        
        return jsonify({"error": "Tarea no encontrada"}), 404
    
    except Exception as e:
        logging.error(f"Error al eliminar tarea: {e}")
        return jsonify({"error": str(e)}), 500

@app.route('/api/estado', methods=['GET'])
def estado_servicio():
    """Endpoint para verificar el estado del servicio."""
    return jsonify({
        "estado": "activo",
        "version": "1.0.0",
        "fecha_hora": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    })

if __name__ == '__main__':
    # En desarrollo
    # app.run(debug=True)
    
    # En producción
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)))

Preparación para Despliegue

  1. Crear archivo de requisitos:

    # requirements.txt
    flask==2.2.3
    gunicorn==20.1.0
    
  2. Archivo de configuración para Gunicorn:

    # gunicorn_config.py
    bind = "0.0.0.0:5000"
    workers = 4
    accesslog = "access.log"
    errorlog = "error.log"
    capture_output = True
    
  3. Script de inicio:

    #!/bin/bash
    # start_api.sh
    cd /opt/api_service
    source venv/bin/activate
    nohup gunicorn -c gunicorn_config.py api_servicio:app > api.log 2>&1 &
    echo $! > api.pid
    echo "API iniciada con PID $(cat api.pid)"
    
  4. Configuración de Nginx como proxy inverso:

    # /etc/nginx/sites-available/api_service
    server {
        listen 80;
        server_name api.tudominio.com;
    
        location / {
            proxy_pass http://localhost:5000;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }
    }
    

Ejemplo 3: Script de Respaldo en AWS S3

Este script realiza copias de seguridad de archivos y los sube a Amazon S3.

#!/usr/bin/env python3
# s3_backup.py - Script para hacer respaldos en AWS S3

import os
import sys
import boto3
import logging
import tarfile
import tempfile
from datetime import datetime
from botocore.exceptions import ClientError

# Configuración de logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), "backup_log.txt")),
        logging.StreamHandler()
    ]
)

# Configuración (mejor usar variables de entorno en producción)
S3_BUCKET = os.environ.get("S3_BUCKET", "mi-bucket-de-respaldos")
S3_PREFIX = os.environ.get("S3_PREFIX", "backups/")
BACKUP_DIRS = [
    "/var/www/html",
    "/etc/nginx",
    "/home/usuario/datos"
]
MAX_BACKUPS = 7  # Mantener solo los últimos 7 respaldos

def crear_archivo_tar(directorios):
    """Crea un archivo tar.gz con los directorios especificados."""
    fecha_actual = datetime.now().strftime("%Y%m%d_%H%M%S")
    nombre_archivo = f"backup_{fecha_actual}.tar.gz"
    ruta_temporal = os.path.join(tempfile.gettempdir(), nombre_archivo)
    
    try:
        with tarfile.open(ruta_temporal, "w:gz") as tar:
            for directorio in directorios:
                if os.path.exists(directorio):
                    logging.info(f"Añadiendo {directorio} al archivo...")
                    
                    # Obtener ruta base para mantener estructura relativa
                    base_dir = os.path.dirname(directorio)
                    
                    # Añadir al tar manteniendo estructura relativa
                    tar.add(directorio, arcname=os.path.relpath(directorio, base_dir))
                else:
                    logging.warning(f"El directorio {directorio} no existe, se omitirá")
        
        logging.info(f"Archivo tar creado: {ruta_temporal}")
        return ruta_temporal, nombre_archivo
    except Exception as e:
        logging.error(f"Error al crear archivo tar: {e}")
        return None, None

def subir_a_s3(archivo_local, nombre_archivo):
    """Sube un archivo a Amazon S3."""
    try:
        s3_client = boto3.client('s3')
        s3_key = f"{S3_PREFIX}{nombre_archivo}"
        
        logging.info(f"Subiendo {archivo_local} a s3://{S3_BUCKET}/{s3_key}...")
        
        with open(archivo_local, "rb") as file:
            s3_client.upload_fileobj(file, S3_BUCKET, s3_key)
        
        logging.info("Archivo subido exitosamente")
        return True
    except ClientError as e:
        logging.error(f"Error de AWS al subir archivo: {e}")
        return False
    except Exception as e:
        logging.error(f"Error al subir archivo: {e}")
        return False

def listar_backups():
    """Lista los backups existentes en S3."""
    try:
        s3_client = boto3.client('s3')
        response = s3_client.list_objects_v2(
            Bucket=S3_BUCKET,
            Prefix=S3_PREFIX
        )
        
        if 'Contents' in response:
            return sorted([
                {
                    'key': obj['Key'],
                    'size': obj['Size'],
                    'last_modified': obj['LastModified']
                }
                for obj in response['Contents']
            ], key=lambda x: x['last_modified'])
        else:
            return []
    except Exception as e:
        logging.error(f"Error al listar backups: {e}")
        return []

def eliminar_backups_antiguos():
    """Elimina los backups más antiguos, manteniendo solo los últimos MAX_BACKUPS."""
    try:
        backups = listar_backups()
        
        if len(backups) <= MAX_BACKUPS:
            logging.info(f"Solo hay {len(backups)} backups, no es necesario eliminar ninguno")
            return
        
        # Calcular cuántos backups eliminar
        a_eliminar = len(backups) - MAX_BACKUPS
        backups_a_eliminar = backups[:a_eliminar]
        
        s3_client = boto3.client('s3')
        
        for backup in backups_a_eliminar:
            logging.info(f"Eliminando backup antiguo: {backup['key']}")
            s3_client.delete_object(
                Bucket=S3_BUCKET,
                Key=backup['key']
            )
        
        logging.info(f"Se eliminaron {a_eliminar} backups antiguos")
    except Exception as e:
        logging.error(f"Error al eliminar backups antiguos: {e}")

def main():
    """Función principal del script."""
    logging.info("🚀 Iniciando proceso de respaldo")
    
    # Crear archivo tar
    archivo_local, nombre_archivo = crear_archivo_tar(BACKUP_DIRS)
    if not archivo_local:
        logging.error("❌ No se pudo crear el archivo de respaldo")
        sys.exit(1)
    
    # Subir a S3
    if subir_a_s3(archivo_local, nombre_archivo):
        logging.info("✅ Respaldo completado exitosamente")
        
        # Eliminar archivo temporal
        os.remove(archivo_local)
        logging.info(f"🧹 Archivo temporal eliminado: {archivo_local}")
        
        # Eliminar backups antiguos
        eliminar_backups_antiguos()
    else:
        logging.error("❌ No se pudo completar el respaldo")
        sys.exit(1)

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        logging.info("👋 Proceso interrumpido manualmente")
        sys.exit(0)
    except Exception as e:
        logging.critical(f"❌ Error crítico: {str(e)}")
        sys.exit(1)

Preparación para Despliegue

  1. Crear archivo de requisitos:

    # requirements.txt
    boto3==1.26.90
    
  2. Configurar credenciales de AWS:

    # En el servidor
    mkdir -p ~/.aws
    
    # Crear archivo de credenciales
    cat > ~/.aws/credentials << EOF
    [default]
    aws_access_key_id = TU_ACCESS_KEY
    aws_secret_access_key = TU_SECRET_KEY
    EOF
    
    # Crear archivo de configuración
    cat > ~/.aws/config << EOF
    [default]
    region = us-east-1
    output = json
    EOF
    
    # Establecer permisos adecuados
    chmod 600 ~/.aws/credentials
    
  3. Configurar variables de entorno:

    export S3_BUCKET="mi-bucket-de-respaldos"
    export S3_PREFIX="servidor1/backups/"
    
    # Para hacerlas permanentes
    echo 'export S3_BUCKET="mi-bucket-de-respaldos"' >> ~/.bashrc
    echo 'export S3_PREFIX="servidor1/backups/"' >> ~/.bashrc
    

Consejos Adicionales para Despliegue en la Nube

Monitoreo y Alertas

Para asegurar que tus scripts funcionen correctamente en la nube:

  1. Implementa logging detallado:

    import logging
    
    # Configuración básica
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("app.log"),
            logging.StreamHandler()
        ]
    )
    
    # Uso en diferentes partes del código
    logger = logging.getLogger("mi_script")
    logger.info("Iniciando proceso")
    logger.warning("Advertencia: recurso no encontrado")
    logger.error("Error al procesar datos")
    
  2. Configura alertas por correo o SMS para errores críticos.

  3. Utiliza servicios de monitoreo como:

    • AWS CloudWatch
    • Google Cloud Monitoring
    • Datadog
    • Prometheus + Grafana

Seguridad

  1. Nunca almacenes credenciales en el código:

    # MAL
    password = "mi_contraseña_secreta"  # No hagas esto
    
    # BIEN
    import os
    password = os.environ.get("MI_PASSWORD")
    
  2. Usa un usuario con privilegios limitados para ejecutar tus scripts.

  3. Implementa autenticación para APIs y servicios web:

    # Ejemplo simple con Flask
    from functools import wraps
    from flask import request, jsonify
    
    def require_api_key(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            api_key = request.headers.get('X-API-Key')
            if api_key and api_key == os.environ.get('API_KEY'):
                return f(*args, **kwargs)
            return jsonify({"error": "Acceso no autorizado"}), 401
        return decorated
    
    @app.route('/api/datos')
    @require_api_key
    def obtener_datos():
        return jsonify({"datos": "información confidencial"})
    

Recuperación ante Fallos

  1. Implementa reintentos para operaciones que pueden fallar:

    def operacion_con_reintentos(max_intentos=3, espera=5):
        for intento in range(max_intentos):
            try:
                # Operación que puede fallar
                resultado = operacion_riesgosa()
                return resultado
            except Exception as e:
                logging.warning(f"Intento {intento+1}/{max_intentos} falló: {e}")
                if intento < max_intentos - 1:
                    logging.info(f"Esperando {espera} segundos antes de reintentar...")
                    time.sleep(espera)
                else:
                    logging.error("Se agotaron los reintentos")
                    raise
    
  2. Guarda el estado para poder recuperarte de interrupciones:

    def procesar_items(items):
        # Cargar estado anterior si existe
        estado_archivo = "estado_procesamiento.json"
        if os.path.exists(estado_archivo):
            with open(estado_archivo, 'r') as f:
                estado = json.load(f)
            items_procesados = estado.get("procesados", [])
            logging.info(f"Recuperando estado: {len(items_procesados)} items ya procesados")
        else:
            items_procesados = []
        
        for item in items:
            if item["id"] in items_procesados:
                logging.info(f"Item {item['id']} ya procesado, omitiendo")
                continue
            
            try:
                procesar_item(item)
                items_procesados.append(item["id"])
                
                # Guardar estado después de cada item
                with open(estado_archivo, 'w') as f:
                    json.dump({"procesados": items_procesados}, f)
            except Exception as e:
                logging.error(f"Error al procesar item {item['id']}: {e}")
    

Recursos Adicionales


🧭 Navegación: