Building ReAct AI agents with sandboxed Python code execution using AWS Bedrock and LangGraph

In industrial environments, data analysis is crucial for optimizing processes, detecting anomalies, and making informed decisions. Manufacturing plants, energy systems, and industrial IoT generate massive amounts of data from sensors, machines, and control systems. Traditionally, analyzing this data requires specialized knowledge in both industrial processes and data science, creating a bottleneck for quick insights.

I’ve been exploring agentic AI frameworks lately, particularly for complex data analysis tasks. While working on industrial data problems, I realized that combining the reasoning capabilities of Large Language Models with specialized tools could create a powerful solution for industrial data analysis. This project demonstrates how to build a ReAct ( Reasoning and Acting) AI agent using LangGraph that can analyze manufacturing data, understand industrial processes, and provide actionable insights.

The goal of this project is to create an AI agent that can analyze industrial datasets (manufacturing metrics, sensor readings, process control data) and provide expert-level insights about production optimization, quality control, and process efficiency. Using LangGraph’s ReAct agent framework with AWS Bedrock, the system can execute Python code dynamically in a sandboxed environment, process large datasets, and reason about industrial contexts.

The dataset is a fake sample of industrial data with manufacturing metrics like temperature, speed, humidity, pressure, operator experience, scrap rates, and unplanned stops. In fact, I’ve generated the dataset using chatgpt

This project uses several key components:

  • LangGraph ReAct Agent: For building the multi-tool AI agent with ReAct (Reasoning and Acting) patterns that can dynamically choose tools and reason about results
  • AWS Bedrock: Claude Sonnet 4 as the underlying LLM for reasoning and code generation
  • Sandboxed Code Interpreter: Secure execution of Python code for data analysis using AWS Agent Core. One tool taken from strands-agents-tools library.
  • Industrial Domain Expertise: Specialized system prompts with knowledge of manufacturing processes, quality control, and industrial IoT

The agent has access to powerful tools:

  • Code Interpreter: Executes Python code safely in a sandboxed AWS environment using pandas, numpy, scipy, and other scientific libraries
  • Data Processing: Handles large industrial datasets with memory-efficient strategies
  • Industrial Context: Understands manufacturing processes, sensor data, and quality metrics

The system uses AWS Agent Core’s sandboxed code interpreter, which means:

  • Python code is executed in an isolated environment
  • No risk to the host system
  • Access to scientific computing libraries (pandas, numpy, scipy)
  • Memory management for large datasets

The core of the system is surprisingly simple. The ReAct agent is built using LangGraph’s create_react_agent with custom tools:

from langgraph.prebuilt import create_react_agent
from typing import List
import pandas as pd
from langchain_core.callbacks import BaseCallbackHandler


def analyze_df(df: pd.DataFrame, system_prompt: str, user_prompt: str,
               callbacks: List[BaseCallbackHandler], streaming: bool = False):
    code_interpreter_tools = CodeInterpreter()
    tools = code_interpreter_tools.get_tools()

    agent = create_react_agent(
        model=get_llm(model=DEFAULT_MODEL, streaming=streaming,
                      budget_tokens=12288, callbacks=callbacks),
        tools=tools,
        prompt=system_prompt
    )

    agent_prompt = f"""
    I have a DataFrame with the following data:
    - Columns: {list(df.columns)}
    - Shape: {df.shape}
    - data: {df}
    
    The output must be an executive summary with the key points.
    The response must be only markdown, not plots.
    """
    messages = [
        ("user", agent_prompt),
        ("user", user_prompt)
    ]
    agent_input = {"messages": messages}
    return agent. Invoke(agent_input)

The ReAct pattern (Reasoning and Acting) allows the agent to:

  1. Reason about what analysis is needed
  2. Act by calling the appropriate tools (in this case: code interpreter)
  3. Observe the results of code execution
  4. Re-reason and potentially call more tools if needed

This creates a dynamic loop where the agent can iteratively analyze data, examine results, and refine its approach – much more powerful than a single code execution.

The magic happens in the system prompt, which provides the agent with industrial domain expertise:

SYSTEM_PROMPT = """
# Industrial Data Analysis Agent - System Prompt

You are an expert AI agent specialized in industrial data analysis and programming. 
You excel at solving complex data problems in manufacturing, process control, 
energy systems, and industrial IoT environments.

## Core Capabilities
- Execute Python code using pandas, numpy, scipy
- Handle large datasets with chunking strategies  
- Process time-series data, sensor readings, production metrics
- Perform statistical analysis, anomaly detection, predictive modeling

## Industrial Domain Expertise
- Manufacturing processes and production optimization
- Process control systems (PID controllers, SCADA, DCS)
- Industrial IoT sensor data and telemetry
- Quality control and Six Sigma methodologies
- Energy consumption analysis and optimization
- Predictive maintenance and failure analysis
"""

The code interpreter tool is wrapped with safety validations:

def validate_code_ast(code: str) -> bool:
    """Validate Python code using AST to ensure safety."""
    try:
        ast.parse(code)
        return True
    except SyntaxError:
        return False


@tool
def code_interpreter(code: str) -> str:
    """Executes Python code in a sandboxed environment."""
    if not validate_code_ast(code):
        raise UnsafeCodeError("Unsafe code or syntax errors.")

    return code_tool(code_interpreter_input={
        "action": {
            "type": "executeCode",
            "session_name": session_name,
            "code": code,
            "language": "python"
        }
    })
The system uses Claude Sonnet 4 through AWS Bedrock with optimized parameters for industrial analysis:
def get_llm(model: str = DEFAULT_MODEL, max_tokens: int = 4096,
            temperature: float = TemperatureLevel.BALANCED,
            top_k: int = TopKLevel.DIVERSE,
            top_p: float = TopPLevel.CREATIVE) -> BaseChatModel:
    model_kwargs = {
        "max_tokens": max_tokens,
        "temperature": temperature,
        "top_k": top_k,
        "top_p": top_p
    }

    return ChatBedrock(
        model=model,
        client=aws_get_service('bedrock-runtime'),
        model_kwargs=model_kwargs
    )
The project includes fake sample industrial data with manufacturing metrics:

- `machine_id`: Equipment identifier
- `shift`: Production shift (A/M/N for morning/afternoon/night)
- `temperature`, `speed`, `humidity`, `pressure`: Process parameters
- `operator_experience`: Years of operator experience
- `scrap_kg`: Quality metric (waste produced)
- `unplanned_stop`: Equipment failure indicator

A typical analysis query might be: "Do temperature and speed setpoints vary across shifts?"
The agent will stream the response as it generates it.

The agent will:

1. Load and examine the dataset structure
2. Generate appropriate Python code for analysis
3. Execute the code in a sandboxed environment
4. Provide insights about shift-based variations
5. Suggest process optimization recommendations
import logging

import pandas as pd
from langchain_core.callbacks import StreamingStdOutCallbackHandler

from modules.df_analyzer import analyze_df
from prompts import SYSTEM_PROMPT

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


class StreamingCallbackHandler(StreamingStdOutCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end='', flush=True)


df = pd.read_csv('fake_data.csv')

user_prompt = "Do temperature and speed setpoints vary across shifts?"
for chunk in analyze_df(
        user_prompt=user_prompt,
        df=df,
        system_prompt=SYSTEM_PROMPT,
        callbacks=[StreamingCallbackHandler()],
        streaming=True):
    logger.debug(chunk)

This project demonstrates the power of agentic AI for specialized domains. Instead of building custom analytics dashboards or writing specific analysis scripts, we provide the agent with:

  1. Domain Knowledge: Through specialized system prompts
  2. Tools: Safe code execution capabilities
  3. Context: The actual data to analyze

The agent can then:

  • Generate appropriate analysis code
  • Execute it safely
  • Interpret results with industrial context
  • Provide actionable recommendations

The result is a flexible system that can handle various industrial analysis tasks without pre-programmed solutions. The agent reasons about the problem, writes the necessary code (sandboxed), and provides expert-level insights.

Full code in my github.

Implementing OAuth2 with a Vue Frontend and Python Backend using Nginx as a Reverse Proxy

We’ve seen in other posts that we can use OAuth2-proxy to provide OAuth2 authentication in our application. Today, for example, we will protect a Vue application, but instead of using oauth2-proxy, we will implement the functionality provided by oauth2-proxy directly in Python.

Our Vue application is very simple: it has only one button that shows some information. The entire Vue application, as well as the backend that serves this information (developed with Flask), will be protected and authenticated with OAuth2. For this example, we will use GitHub as the authentication provider.

<script setup lang="ts">
import {ref} from "vue";

defineProps<{
  msg: string
}>()

const data = ref(null);
const showModal = ref(false);

const fetchData = async () => {
  try {
    const response = await fetch("/app/api/userinfo", { redirect: "manual" });

    const logoutStatuses = [401, 403, 302, 303];
    if (response.type === "opaqueredirect" || logoutStatuses.includes(response.status)) {
      window.location.href = "/app/oauth/logout";
    } else {
      data.value = await response.json();
      showModal.value = true;
    }
  } catch (error) {
    console.error("Error al obtener los datos", error);
  }
};

</script>

<template>
  <div class="greetings">
    <h1 class="green">{{ msg }}</h1>
    <h3>
      You’ve successfully created a project with
      <button @click="fetchData" class="bg-blue-500 text-white p-2 rounded">
        Load data
      </button>
    </h3>
    <div v-if="showModal" class="fixed inset-0 flex items-center justify-center bg-gray-800 bg-opacity-50">
      <div class="bg-white p-6 rounded shadow-lg w-1/3">
        <h2 class="text-xl font-bold mb-4">Datos del Backend</h2>
        <pre class="bg-gray-100 p-3 rounded text-sm">{{ data }}</pre>
        <button @click="showModal = false" class="mt-4 bg-red-500 text-white p-2 rounded">Cerrar</button>
      </div>
    </div>
  </div>
</template>

<style scoped>
h1 {
  font-weight: 500;
  font-size: 2.6rem;
  position: relative;
  top: -10px;
}

h3 {
  font-size: 1.2rem;
}

.greetings h1,
.greetings h3 {
  text-align: center;
}

@media (min-width: 1024px) {
  .greetings h1,
  .greetings h3 {
    text-align: left;
  }
}
</style>

The Flask backend is as follows and allows us to respond with user data in a protected route and a public route.

import logging
from datetime import datetime

from flask import Flask, session
from flask_compress import Compress

from core.oauth_proxy import setup_oauth
from settings import APP_PATH, SECRET, SESSION, DEBUG, OAUTH

app = Flask(__name__)
app.debug = DEBUG
app.secret_key = SECRET
app.config.update(SESSION)
Compress(app)
for logger_name in ['werkzeug', ]:
    logging.getLogger(logger_name).setLevel(logging.WARNING)

setup_oauth(app, OAUTH, APP_PATH)


@app.get(f"/{APP_PATH}/api/userinfo")
def protected_route():
    now = datetime.now().isoformat()
    return dict(
        session=session['user'],
        now=now
    )


@app.get(f"/{APP_PATH}/api/no_auth")
def public_route():
    return "public, route!"

In order to use OAuth2, we need to use a reverse proxy like NGINX. The NGINX configuration file is shown below.

upstream app {
    server host.docker.internal:5000;
}

upstream front {
    server host.docker.internal:5173;
}


server {
    listen 8000;
    server_tokens off;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    proxy_set_header X-Forwarded-Host $host:$server_port;

    location / {
        auth_request /app/oauth2/auth;
        error_page 401 = @error401;
        auth_request_set $auth_cookie $upstream_http_set_cookie;
        try_files $uri @proxy_to_front;
    }

    location /app/api/ {
        auth_request /app/oauth2/auth;
        error_page 401 = @error401;
        auth_request_set $auth_cookie $upstream_http_set_cookie;
        try_files $uri @proxy_to_app;
    }

    location /app/api/no_auth {
        try_files $uri @proxy_to_app;
    }

    location /app/oauth2/ {
        proxy_set_header X-Real-IP               $remote_addr;
        proxy_set_header X-Auth-Request-Redirect $request_uri;
        proxy_pass http://app;
    }

    location @proxy_to_app {
        proxy_pass http://app;
    }

    location @proxy_to_front {
        proxy_pass http://front;
    }

    location @error401 {
        auth_request_set $auth_cookie $upstream_http_set_cookie;
        add_header Set-Cookie $auth_cookie;
        return 302 /app/oauth2/sign_in;
    }
}

The authentication flow is managed as follows, using a small blueprint that handles OAuth2 redirections, token exchange, and user session storage.

import logging
import secrets
from urllib.parse import urlencode

import requests
from flask import Blueprint, jsonify
from flask import request, session, redirect

logger = logging.getLogger(__name__)


def _clean_session():
    session.pop('user', default=None)
    session.pop('state', default=None)
    session.pop('referer', default=None)


def get_oauth_proxy_blueprint(oauth_conf, app_path, *, sub_path="oauth2", callback_url='/callback',
                              signin_url='/sign_in', auth_url='/auth', logout_url='/logout'):
    blueprint = Blueprint('oauth_proxy', __name__, url_prefix=f'/{app_path}/{sub_path}')

    @blueprint.get(callback_url)
    def callback():
        referer = session.get('referer')
        state = request.args.get('state')
        session_state = session.get('state')

        if 'state' not in session:
            return redirect(f"{referer}")
        if state == session_state:
            authorization_code = request.args.get('code')
            token_data = {
                'grant_type': oauth_conf.get('GRANT_TYPE', 'authorization_code'),
                'code': authorization_code,
                'redirect_uri': oauth_conf['REDIRECT_URL'],
                'client_id': oauth_conf['CLIENT_ID'],
                'client_secret': oauth_conf['CLIENT_SECRET']
            }
            response = requests.post(oauth_conf['TOKEN_URL'],
                                     data=token_data,
                                     headers={'Accept': 'application/json'})
            response_data = response.json()
            headers = {
                "Authorization": f"Bearer {response_data.get('access_token')}",
                'Accept': 'application/json'
            }
            user_response = requests.get(oauth_conf['USER_URL'],
                                         data=token_data,
                                         headers=headers)
            if user_response.ok:
                user_data = user_response.json()
                session['user'] = dict(
                    username=user_data['login'],
                    name=user_data['name'],
                    email=user_data['email']
                )
                session.pop('state', default=None)
                session.pop('referer', default=None)
            else:
                _clean_session()
            return redirect(referer)

    @blueprint.get(signin_url)
    def sign_in():
        state = secrets.token_urlsafe(32)
        session['state'] = state
        authorize = oauth_conf['AUTHORIZE_URL']
        query_string = urlencode({
            'scope': oauth_conf.get('SCOPE', 'read write'),
            'prompt': oauth_conf.get('PROMPT', 'login'),
            'approval_prompt': oauth_conf.get('APPROVAL_PROMPT', 'auto'),
            'state': state,
            'response_type': oauth_conf.get('RESPONSE_TYPE', 'code'),
            'redirect_uri': oauth_conf['REDIRECT_URL'],
            'client_id': oauth_conf['CLIENT_ID']
        })
        return redirect(f"{authorize}?{query_string}")

    @blueprint.get(auth_url)
    def auth():
        if not session.get("user"):
            referer = request.headers.get('X-Auth-Request-Redirect')
            session['referer'] = referer
            return redirect(f"oauth2/sign_in", 401)
        else:

            return jsonify(dict(error='OK')), 200

    @blueprint.get(logout_url)
    def logout():
        _clean_session()
        return redirect(logout_url)

    return blueprint


def setup_oauth(app, oauth_conf, app_path, *, sub_path="oauth2", callback_url='/callback',
                signin_url='/sign_in', auth_url='/auth', logout_url='/logout'):
    app.register_blueprint(get_oauth_proxy_blueprint(oauth_conf, app_path, sub_path=sub_path, callback_url=callback_url,
                                                     signin_url=signin_url, auth_url=auth_url, logout_url=logout_url))

You can see the full code of the project in my github

Agentic AI for movie recommendations with Python and Strands Agents

Context 1: I like to go to the cinema. I normally go to the cinema on Saturday afternoons, at the first showing. In the city where I live there are three cinemas and all belong to the same company called Sade. I normally check the cinema schedules on their website, SadeCines.com, to see what’s playing. Also, I track the movies I see on Letterboxd. There I have my diary and also a list with the movies I see in the cinema. I rate the movies when I finish watching them. My first impression. I do that not to share with others, only to have a personal record of what I like and dislike.

Context 2: I’m on holidays and I like to code also, so I decided to build an AI agent that helps me decide what movie to watch on Saturday afternoons. This project is an example of over-engineering, I know, but I’ve done it as an exercise using Strands Agents, a framework for building multi-tool LLM agents that I’m using these days.

The aim of the project is to create an AI agent that can access the internet to check the cinema schedules, my Letterboxd profile, and then recommend me a movie to watch on Saturday afternoons. Normally the LLMs are good at reasoning, but they don’t have access to the internet. Also, they are not good at doing mathematical operations, but with agents we can use tools to do that. So I decided to build an agent that can access the internet (to check the cinema schedules, my Letterboxd profile and IMDb/Metacritic’s scores) and create the needed code to do the mathematical operations needed.

Strands Agents (it is similar to LangChain) allows us to build multi-tool LLM agents. In this example I’m using the pre-built tools provided by the framework, like:

  • calculator: for performing mathematical operations
  • think: for reasoning and decision-making
  • current_time: to get the current date and time
  • file_write: to write the recommendations to a file
  • batch: to execute multiple tools in parallel
  • code_interpreter: to execute Python code dynamically (sandboxed in an AWS environment)
  • browser: to scrape the cinema schedules from SadeCines.com and my Letterboxd profile (also sandboxed in an AWS environment)

Code interpreter is a powerful tool that allows us to execute Python code dynamically, which is useful for performing mathematical operations and data processing. For me it is the key to push the agents to the next level. LLMs can generate python code very well. They can generate code to build a Pandas dataframe, to filter the data, to calculate the average rating, etc. But they can also generate code that can be harmful, like deleting files, or accessing sensitive data. So we need to be careful with the code we execute. This issue is especially important when we are using prompts from users (in a chat, for example). Strands Agents provides a tool called python-repl that allows us to execute Python code locally within our environment. If you rely on your prompts it can be an option (I’ve sent a pull request to Strands Agents to make it a bit more safe). But in this project I’m using the code_interpreter tool, which is a sandboxed environment provided by AWS. This allows us to execute Python code safely without the risk of executing harmful code in your host environment.

In this project we need to scrape webpages to retrieve information from internet. Strands Agents provides us a built-in tool, called use_browser, to use a headless browser locally to access the Internet. In this project, I’m using the browser tool, which is also a sandboxed environment provided by AWS Bedrock. This allows us to scrape webs (using Playwright) and interact with web pages without the risk of executing harmful code in your host environment.

With this information, to build the agent is pretty straightforward. The idea of agents is not to code everything from scratch, but to provide to the agent the needed tools to solve the problem, and let the agent figure out how to use them using the prompts. When we work with LLM we have two kinds of prompts: the system prompt and the user prompt. The system prompt is used to define the agent’s behavior, while the user prompt is used to provide the input data.

In this project I’m using those prompts:

from settings import BASE_DIR

SYSTEM_PROMPT = f"""
You are an expert movie recommendation assistant to help me decide what to watch.

You have access to the following URLs and available movie analyses:
- https://sadecines.com/ With the movie schedules in my city's cinemas.
    Sadecines has a checkbox to filter the day of the week, so you can select Saturday.
- https://letterboxd.com/gonzalo123/films/diary/ Movies I have watched and rated.
- https://letterboxd.com/gonzalo123/list/cine-2025/detail/ Movies I have already seen in theaters in 2025.

You must take into account the user's preferences:
- Avoid movies in the "children" and "family" genres.
- I don't really like intimate or drama movies, except for rare exceptions.
- I like entertaining movies, action, science fiction, adventure, and comedies.

Take into account when making recommendations:
- The ratings of the movies on IMDb and Metacritic.
- But mainly consider my personal preferences,
    which can be seen in the list of movies I have watched and rated on Letterboxd.
"""

QUESTION = f"""
Analyze the movies showing this Saturday in the first session.

Present only those you recommend, excluding those not relevant according to my preferences,
and order them from best to worst according to your criteria.

Show the result in a table with the following columns:
- Title
- Genre
- IMDb Rating
- Metacritic Rating
- Summary
- Start Time
- End Time

Save the final report in a file named YYYYMMDD.md, following this structure:
{BASE_DIR}/
    └ reports/
        └ YYYYMMDD.md       # Movie analysis of the day, format `YYYYMMDD`

And the code of the agent is very simple (I’m using AWS Bedrock to run the agent)

import logging

from botocore.config import Config
from strands import Agent
from strands.models import BedrockModel
from strands_tools import calculator, current_time, think, file_write, batch
from strands_tools.browser import AgentCoreBrowser
from strands_tools.code_interpreter import AgentCoreCodeInterpreter

from promts import SYSTEM_PROMPT, QUESTION
from settings import AWS_REGION, MODEL_TEMPERATURE, MODEL, LLM_READ_TIMEOUT, LLM_CONNECT_TIMEOUT, LLM_MAX_ATTEMPTS

logging.basicConfig(
    format="%(asctime)s [%(levelname)s] %(message)s",
    level="INFO",
    datefmt="%d/%m/%Y %X",
)

logger = logging.getLogger(__name__)

agent = Agent(
    system_prompt=SYSTEM_PROMPT,
    model=BedrockModel(
        model_id=MODEL,
        temperature=MODEL_TEMPERATURE,
        boto_client_config=Config(
            read_timeout=LLM_READ_TIMEOUT,
            connect_timeout=LLM_CONNECT_TIMEOUT,
            retries={'max_attempts': LLM_MAX_ATTEMPTS}
        )
    ),
    tools=[
        calculator, think, current_time, file_write, batch,
        AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter,
        AgentCoreBrowser(region=AWS_REGION).browser]
)

result = agent(QUESTION)
logger.info(f"Total tokens: {result.metrics.accumulated_usage['totalTokens']}")
logger.info(f"Execution time: {sum(result.metrics.cycle_durations):.2f} seconds")
logger.info(f"Tools used: {list(result.metrics.tool_metrics.keys())}")

The lines of code never is a goal (we only need to write readable and maintainable code), but in this example we have more code in the prompts than in the code itself. Maybe it’s the sigh of our times.

And that’s all. I must say again that this project is just an example. It is an over-engineering example. Scaling this project would be very expensive. Working a little bit in a custom scraper in addition to custom python code, can do the same to solve this specific problem without the usage, and paid, the IA (cheap for a single user usage, but expensive when scaled). I think it is a good example to show how Agents and the power of the code interpreter and the browser tools in a few lines of code. And remember, I’m on holidays and I like to code (don’t blame me for that).

Full code in my Github account.

Building Production-Ready AI Agents with Strands-Agents and Python

Today we’re going to build an AI agent that can predict the weather using Strands-Agents framework and Python. This project is designed to show how to integrate external data sources, advanced computational tools, and AI capabilities into a cohesive system. For this experiment we’re going to use Strands-Agents framework, which provides a robust foundation for building intelligent agents that can interact with various tools and APIs. Strands-Agents comes with built-in tools that allow us to create agents that can perform complex tasks by orchestrating multiple tools and APIs. For this project we’re going to use the following tools:

  • calculator: for performing mathematical and financial calculations.
  • think: for reflecting on data and generating ideas.
  • file_write: for saving results and analyses to files.
  • python_repl: for executing Python code and performing advanced analyses.

The last one is particularly useful for overcoming the limitations of large language models (LLMs) when it comes to deterministic calculations. By using a Python REPL, we can ensure that our agent can perform precise computations without relying solely on the LLM’s probabilistic outputs. We have Pandas and Scikit-learn for statistical analysis, which allows us to perform advanced data manipulation and machine learning tasks, and the agent will be able to use these libraries to analyze weather data and generate forecasts. Also, I’ve created a custom tool to fetch hourly weather data from the Open-Meteo API, which provides real-time weather information for specific locations.

import logging
from datetime import datetime, date
from typing import List

import requests
from strands import tool

from modules.weather.models import (
    TemperatureReading, HumidityReading, ApparentTemperatureReading,
    PrecipitationReading, EvapotranspirationReading, SurfacePressureReading, MeteoData)

logger = logging.getLogger(__name__)


class Tools:
    def __init__(self, latitude: float, longitude: float):
        self.latitude = latitude
        self.longitude = longitude

    def get_tools(self) -> List[tool]:
        @tool
        def get_hourly_weather_data(from_date: date, to_date: date) -> MeteoData:
            """
            Get hourly weather data for a specific date range.
            Notes:
                - The response is a MeteoData object containing lists of readings for temperature, humidity,
                  apparent temperature, precipitation, evapotranspiration, and surface pressure.
                - Each reading has a timestamp and a value.

            Returns:
                MeteoData: Object containing weather readings for the specified date range
            """

            start_date = from_date.strftime('%Y-%m-%d')
            end_date = to_date.strftime('%Y-%m-%d')
            url = (f"https://api.open-meteo.com/v1/forecast?"
                   f"latitude={self.latitude}&"
                   f"longitude={self.longitude}&"
                   f"hourly=temperature_2m,relative_humidity_2m,apparent_temperature,precipitation,evapotranspiration,surface_pressure&"
                   f"start_date={start_date}&"
                   f"end_date={end_date}")
            response = requests.get(url)

            meteo = MeteoData(
                temperature=[],
                humidity=[],
                apparent_temperature=[],
                precipitation=[],
                evapotranspiration=[],
                surface_pressure=[]
            )
            data = response.json()

            weather_data_time = data['hourly']['time']

            logger.info(f"[get_hourly_weather_data] Fetched weather data from {start_date} to {end_date}. {len(weather_data_time)} records found.")
            for iso in weather_data_time:
                time = datetime.fromisoformat(iso)
                meteo.temperature.append(TemperatureReading(
                    time=time,
                    value=data['hourly']['temperature_2m'][data['hourly']['time'].index(iso)]))
                meteo.humidity.append(HumidityReading(
                    time=time,
                    value=data['hourly']['relative_humidity_2m'][data['hourly']['time'].index(iso)]))
                meteo.apparent_temperature.append(ApparentTemperatureReading(
                    time=time,
                    value=data['hourly']['apparent_temperature'][data['hourly']['time'].index(iso)]))
                meteo.precipitation.append(PrecipitationReading(
                    time=time,
                    value=data['hourly']['precipitation'][data['hourly']['time'].index(iso)]))
                meteo.evapotranspiration.append(EvapotranspirationReading(
                    time=time,
                    value=data['hourly']['evapotranspiration'][data['hourly']['time'].index(iso)]))
                meteo.surface_pressure.append(SurfacePressureReading(
                    time=time,
                    value=data['hourly']['surface_pressure'][data['hourly']['time'].index(iso)]))
            return meteo

        return [get_hourly_weather_data, ]

To allow the LLM to interact with this tool, we define a Pydantic model that describes the expected input and output formats. This ensures that the agent can correctly interpret the data it receives from the API and use it effectively in its analyses.

from datetime import datetime

from pydantic import BaseModel, Field


class TemperatureReading(BaseModel):
    """Temperature reading at 2 meters"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(description="Temperature in °C")


class HumidityReading(BaseModel):
    """Relative humidity reading at 2 meters"""
    time: datetime = Field(..., description="Timestamp")
    value: int = Field(..., ge=0, le=100, description="Relative humidity in %")


class ApparentTemperatureReading(BaseModel):
    """Apparent temperature reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., description="Apparent temperature in °C")


class PrecipitationReading(BaseModel):
    """Precipitation reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., ge=0, description="Precipitation in mm")


class EvapotranspirationReading(BaseModel):
    """Evapotranspiration reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., description="Evapotranspiration in mm")


class SurfacePressureReading(BaseModel):
    """Surface pressure reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., gt=0, description="Surface pressure in hPa")


class MeteoData(BaseModel):
    """Model to store meteorological data"""
    temperature: list[TemperatureReading] = Field(..., description="List of temperature readings")
    humidity: list[HumidityReading] = Field(..., description="List of humidity readings")
    apparent_temperature: list[ApparentTemperatureReading] = Field(..., description="List of apparent temperature readings")
    precipitation: list[PrecipitationReading] = Field(..., description="List of precipitation readings")
    evapotranspiration: list[EvapotranspirationReading] = Field(..., description="List of evapotranspiration readings")
    surface_pressure: list[SurfacePressureReading] = Field(..., description="List of surface pressure readings")

The use of Strands-Agents is very simple. I’ve encapsulated the agent logic in a single function that initializes the agent with the necessary tools and prompts. The agent can then be used to generate weather forecasts or answer specific weather-related questions.

_ = ai(
    system_prompt=SYSTEM_PROMPT,
    user_prompt="What will the weather be like tomorrow?")

If I run this code, agent will use the provided tools to generate the answer. In the output of the command, you will see the agent’s reasoning, the tools it used, and the final answer. First it uses the current time tool to get the current date and time (using current_time tool), then it uses the get_hourly_weather_data tool to fetch the weather data, after that uses think tool to analyze the data, and finally it uses the python_repl tool to generate the needed calculations, using Pandas, and produce the final weather forecast. Here’s an example of the output you might see when running the agent:

12/07/2025 15:28:18 [INFO] Check agent weather.
12/07/2025 15:28:18 [INFO] Found credentials in shared credentials file: ~/.aws/credentials
12/07/2025 15:28:18 [INFO] Creating Strands MetricsClient
I'll help you get the weather forecast for tomorrow. First, let me get the current time to determine what "tomorrow" is, and then I'll fetch the weather data for you.
Tool #1: current_time
Now I'll get the hourly weather data for tomorrow (July 13, 2025):
Tool #2: get_hourly_weather_data
12/07/2025 15:28:24 [INFO] [get_hourly_weather_data] Fetched weather data from 2025-07-13 to 2025-07-13. 24 records found.
Let me analyze this weather data to provide you with a comprehensive forecast for tomorrow (July 13, 2025):
Tool #3: think
Let me analyze the weather data directly to provide you with tomorrow's forecast:
Tool #4: python_repl
WEATHER FORECAST FOR TOMORROW (July 13, 2025)
==================================================
Temperature Range: 20.5°C to 23.6°C
Average Temperature: 22.2°C
Feels Like Temperature: Up to 24.6°C

Humidity Range: 70% to 83%
Average Humidity: 77%

Total Precipitation: 1.0mm
Precipitation Timing: Brief shower 4-5 PM

Pressure: Rising from 1012.1 to 1019.9 hPa
Pressure Change: +7.8 hPa (improving conditions)

Tool #5: python_repl

DETAILED HOURLY BREAKDOWN
==============================

?? MORNING (6 AM - 12 PM):
Temperature: 20.6°C to 23.0°C
Humidity: 70% to 82%
Conditions: Clear, comfortable temperatures

?? AFTERNOON (12 PM - 6 PM):
Temperature: 22.6°C to 23.6°C
Humidity: 71% to 81%
Precipitation: 1.0mm (brief shower 4-5 PM)
Conditions: Warmest part of day, brief afternoon shower

?? EVENING (6 PM - 12 AM):
Temperature: 22.0°C to 22.4°C
Humidity: 75% to 82%
Conditions: Pleasant evening temperatures, increasing humidity

?? SUMMARY & RECOMMENDATIONS:
? Mild and pleasant day with comfortable temperatures
? Brief shower expected between 4-5 PM (carry umbrella)
? Rising atmospheric pressure indicates stable weather
? High humidity in morning and evening - may feel muggy
? Good day for outdoor activities, except during afternoon shower
? Light layers recommended due to temperature variation
## Weather Forecast for Tomorrow (July 13, 2025)

**Overall Conditions:** A pleasant, mild day with mostly clear skies and a brief afternoon shower.

### ??? **Temperature**
- **Range:** 20.5°C to 23.6°C (69°F to 74°F)
- **Feels like:** Up to 24.6°C (76°F)
- **Coolest:** Early morning around 2-6 AM
- **Warmest:** Mid-afternoon around 2 PM

### ??? **Precipitation**
- **Total:** 1.0mm of rain
- **Timing:** Brief shower between 4-5 PM
- **Duration:** Short-lived, about 1 hour
- **Recommendation:** Carry an umbrella for the afternoon

### ?? **Atmospheric Conditions**
- **Humidity:** 70-83% (higher in morning/evening)
- **Pressure:** Rising from 1012 to 1020 hPa (+7.8 hPa increase)
- **Trend:** Improving and stabilizing weather conditions

### ? **Time-Based Breakdown**

**Morning (6 AM - 12 PM):** Clear and comfortable, 20.6-23°C
**Afternoon (12 PM - 6 PM):** Warmest period with brief shower, 22.6-23.6°C  
**Evening (6 PM - 12 AM):** Pleasant and stable, 22-22.4°C

### ?? **Recommendations**
- **Clothing:** Light layers, as temperatures vary throughout the day
- **Activities:** Great for outdoor plans, just avoid 4-5 PM for the shower
- **Comfort:** May feel slightly muggy due to higher humidity
- **Overall:** A very pleasant summer day with stable, improving weather conditions

The rising atmospheric pressure indicates this will be a stable weather day with good conditions for most outdoor activities!
Process finished with exit code 0

Here you can see the ai function.

import logging

from botocore.config import Config
from strands import Agent
from strands.agent import AgentResult
from strands.models import BedrockModel
from strands_tools import calculator, file_write, current_time, think, python_repl

from core.aws import get_aws_session
from modules.weather.tools import Tools
from settings import (
    IA_MODEL, IA_TEMPERATURE, LLM_READ_TIMEOUT, LLM_CONNECT_TIMEOUT,
    LLM_MAX_ATTEMPTS, MY_LATITUDE, MY_LONGITUDE, )

logger = logging.getLogger(__name__)


def get_agent(
        system_prompt: str,
        read_timeout: int = LLM_READ_TIMEOUT,
        connect_timeout: int = LLM_CONNECT_TIMEOUT,
        max_attempts: int = LLM_MAX_ATTEMPTS) -> Agent:
    config = Config(
        read_timeout=read_timeout,
        connect_timeout=connect_timeout,
        retries={'max_attempts': max_attempts}
    )
    session = get_aws_session()

    base_tools = [calculator, think, python_repl, file_write, current_time]
    custom_tools = Tools(latitude=MY_LATITUDE, longitude=MY_LONGITUDE).get_tools()
    all_tools = base_tools + custom_tools

    bedrock_model = BedrockModel(
        model_id=IA_MODEL,
        temperature=IA_TEMPERATURE,
        boto_session=session,
        boto_client_config=config,
    )
    return Agent(
        model=bedrock_model,
        tools=all_tools,
        system_prompt=system_prompt
    )


def ai(
        system_prompt: str,
        user_prompt: str,
        read_timeout: int = 300,
        connect_timeout: int = 60,
        max_attempts: int = 5) -> AgentResult:
    agent = get_agent(
        system_prompt=system_prompt,
        read_timeout=read_timeout,
        connect_timeout=connect_timeout,
        max_attempts=max_attempts)

    return agent(user_prompt)

As you can see, the agent is only a few lines of code. The magic is in the prompts and the tools that it uses. The agent can be used to generate weather forecasts, analyze historical weather data, and provide practical recommendations based on the weather conditions. This is the main prompt:

FORECAST_PROMPT = f"""
## Instructions for the weather forecast
Your mission is to analyze weather data and provide accurate and useful forecasts for the next {{days}} days.
You have access to a tool called `get_hourly_weather_data` that allows you to obtain hourly weather data.
As a meteorology expert, you must thoroughly analyze the data and provide accurate and useful forecasts.

Take into account possible extreme heat days, especially in summer.
Remember that extreme heat is considered when maximum and minimum temperatures exceed local temperature thresholds for several consecutive days,
often during a heatwave. These temperatures, along with humidity, can be harmful to health, especially for vulnerable groups.

## Report style
All reports must be written in English.
The report must be clear, concise, and easy to understand.
It should include:
- A summary of current weather conditions.
- A detailed forecast for the coming days, including temperature, precipitation, wind, and any other relevant data.
- Practical recommendations based on the forecast, such as precautions to take or recommended activities.
- Be creative and innovative in your approach, using advanced data visualization techniques to enhance the report.

## Data visualization
The report, in markdown, must be visually appealing and innovative.
You will use tables, lists, and other formatting elements to enhance readability.

### Graph format
- Generate the graph configuration in JSON format, compatible with the Vegalite library.
- Ensure the JSON is valid and compatible with the Vegalite library.
- The graphs must be innovative, leveraging the library's potential. Do not limit yourself to simple bar or line charts. Aim for a wow effect.

- Required JSON structure:
    * title: main title of the graph, at the top of the graph. The title must be brief and descriptive.
    * the title must be in the layout.title.text directive
    * layout.showlegend will be true/false, to show the graph legend. Some graphs do not need a legend, such as simple line charts.
- After each graph, generate a blockquote briefly explaining what the graph shows and its context.

...

For the visualization I’m using MkDocs , a simple static site generator for Markdown files. To have more advanced visualizations, I’m using the Vega-Lite library, which allows you to create interactive and visually appealing charts. The agent generates the JSON configuration for the graphs in a format compatible with Vega-Lite, which can then be rendered in the Markdown reports.

For AI, I’m using Claude 3.5 Sonnet, provided by Amazon Bedrock. For the experiment it’s enough, but if you create a cron job to run the agent every day, you’ll have your 5-day forecasting system ready to go. The project tries to show how to use AI agents to solve real-world problems, and how to integrate them with external data sources and tools. The agent can be extended to include more advanced features, such as integrating with other APIs or using more complex machine learning models for weather prediction.

Full code in my github account.

Building an Agentic AI with Python, LangChain, AWS Bedrock and Claude 4 Sonnet

Today we are going to build an agent with IA. It is just an example of how to build a agent with LangChain and AWS Bedrock and Claude 4 Sonnet. The agent will be a “mathematical expert” capable of performing complex calculations and providing detailed explanations of its reasoning process. The idea is to provide the agent with the ability to perform mathematical operations like addition, subtraction. In fact, with additions and subtractions, we can perform all the mathematical operations, like multiplication, division, exponentiation, square root, etc. The agent will be able to perform these operations step by step, providing a detailed explanation of its reasoning process. I know that we don’t need to use AI to perform these operations, but the idea is to show how to build an agent with LangChain and AWS Bedrock and Claude 4 Sonnet.

The mathematical agent implements the tool-calling pattern, allowing the LLM to dynamically select and execute mathematical operations:

import logging

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate

from core.llm.aws import get_llm, Models
from modules.prompts import AGENT_SYSTEM_PROMPT
from modules.tools import MathTools
from settings import MAX_TOKENS

logger = logging.getLogger(__name__)


def run(question: str, model: Models = Models.CLAUDE_4):
    prompt = ChatPromptTemplate.from_messages([
        ("system", AGENT_SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}")
    ])
    math_tools = MathTools()
    tools = math_tools.get_tools()

    llm = get_llm(model=model, max_tokens=MAX_TOKENS)
    agent = create_tool_calling_agent(llm, tools, prompt)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=10
    )

    response = agent_executor.invoke({
        "input": question
    })

    logger.info(f"Agent response: {response['output']}")

Tools are defined using LangChain’s @tool decorator, providing automatic schema generation and type validation. Really we don’t need to create a class for the tools, but I have done it because I want to add an extra feature to the agent: the ability to keep a history of the operations performed. This will allow the agent to provide a detailed explanation of its reasoning process, showing the steps taken to arrive at the final result.

import logging
from typing import List

from langchain.tools import tool

logger = logging.getLogger(__name__)


class MathTools:

    def __init__(self):
        self.history = []

    def _diff_values(self, a: int, b: int) -> int:
        result = a - b
        self.history.append(f"{a} - {b} = {result}")
        return result

    def _sum_values(self, a: int, b: int) -> int:
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result

    def _get_history(self) -> str:
        if not self.history:
            return "No previous operations"
        return "\n".join(self.history[-5:])  # Last 5

    def get_tools(self) -> List:
        @tool
        def diff_values(a: int, b: int) -> int:
            """Calculates the difference between two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: difference of a - b
            """
            logger.info(f"Calculating difference: {a} - {b}")
            return self._diff_values(a, b)

        @tool
        def sum_values(a: int, b: int) -> int:
            """Sums two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: sum of a + b
            """
            logger.info(f"Calculating sum: {a} + {b}")
            return self._sum_values(a, b)

        @tool
        def get_history() -> str:
            """Gets the operation history
            Returns:
                str: last operations
            """
            logger.info("Retrieving operation history")
            return self._get_history()

        return [diff_values, sum_values, get_history]

The system prompt is carefully crafted to guide the agent’s behavior and establish clear operational boundaries:

AGENT_SYSTEM_PROMPT = """
You are an expert mathematical agent specialized in calculations.

You have access to the following tools:
- diff_values: Calculates the difference between two numbers
- sum_values: Sums two numbers
- get_history: Gets the operation history

Guidelines:
1. Only answer questions related to mathematical operations.
2. For complex operations, use step-by-step calculations:
   - Multiplication: Repeated addition
   - Division: Repeated subtraction
   - Exponentiation: Repeated multiplication
   - Square root: Use methods like Babylonian method or prime factorization.
"""

Now we can invoke our agent by asking questions such as ‘What’s the square root of 16 divided by two, squared?’. The agent will iterate using only the provided tools to obtain the result.

And that’s all. This project demonstrates how to build a production-ready AI agent using LangChain and AWS Bedrock. It’s just a boilerplate, but it can be extended to create more complex agents with additional capabilities and understand how AI agents work.

Full code in my GitHub account.

OAuth2 Authentication in Streamlit Applications with Nginx and OAuth2-Proxy

Normally, when I want to provide authentication to a service, I use OAuth2. There are libraries to integrate this authentication mechanism into a web application, but sometimes we cannot do this easily because it is a third-party service over which we have no control. In these cases, it is possible that this third-party service has support for OAuth2 and can also log in with OAuth2. But sometimes this is not possible, or it is too complicated. In these cases, a solution is to use a proxy that handles the authentication and communicates with the third-party service. In this example, we will use a Streamlit application as if it were a third-party application.

import streamlit as st

st.set_page_config(
    page_title="Home",
    page_icon="??",
)
st.write("# Welcome to Streamlit! ??")
st.markdown(
    """
    Streamlit is an open-source app framework built specifically for
    Machine Learning and Data Science projects.
    **?? Select a demo from the sidebar** to see some examples
    of what Streamlit can do!
    ### Want to learn more?
    - Check out [streamlit.io](https://streamlit.io)
    - Jump into our [documentation](https://docs.streamlit.io)
    - Ask a question in our [community
        forums](https://discuss.streamlit.io)
    ### See more complex demos
    - Use a neural net to [analyze the Udacity Self-driving Car Image
        Dataset](https://github.com/streamlit/demo-self-driving)
    - Explore a [New York City rideshare dataset](https://github.com/streamlit/demo-uber-nyc-pickups)
"""
)

st.sidebar.success("Select a demo above.")

Our Streamlit application has a page.

from random import randint

import streamlit as st

st.set_page_config(
    page_title="Hello",
    page_icon="??",
)

st.markdown("# Plotting Demo")
st.sidebar.header("Plotting Demo")
st.write("This demo illustrates a combination of plotting with Streamlit. Enjoy!")

data = [dict(name=f"name{i}", value=randint(1, 1000)) for i in range(1, 101)]

progress_bar = st.sidebar.progress(0)
status_text = st.sidebar.empty()
chart = st.line_chart([item['value'] for item in data])

progress_bar.empty()

st.button("Re-run")

To use OAuth authentication in the Streamlit application, we are using Nginx as a reverse proxy with the auth_request directive to direct requests to an OAuth2-proxy service deployed in our stack. OAuth2-proxy can be configured to authenticate any OAuth2 server compatible with OpenID. In my example, I am using GitHub, but you can use ActiveDirectory, Google, Keycloak, or even your own OAuth2 server. This is my Nginx configuration:

This is my Nginx configuration:

upstream app {
    server streamlit:8501;
}

upstream oauth2 {
    server oauth2-proxy:4180;
}

server {
    listen 8000;

    location / {
        auth_request /oauth2/auth;
        error_page 401 = @error401;
        try_files $uri @proxy_to_app;
    }

    location /_stcore/stream {
        auth_request /oauth2/auth;
        error_page 401 = @error401;
        proxy_pass http://app/_stcore/stream;
        proxy_http_version 1.1;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 86400;
    }

    location @error401 {
        return 302 /oauth2/sign_in;
    }

    location /oauth2/ {
        try_files $uri @proxy_to_oauth2;
    }

    location @proxy_to_oauth2 {
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_pass http://oauth2;
    }

    location @proxy_to_app {
        proxy_set_header X-Forwarded-Proto https;
        proxy_set_header X-Url-Scheme $scheme;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_redirect off;
        proxy_pass http://app;
    }
}

The complete stack can be seen in the docker-compose.yml:

version: '3.9'

services:
  streamlit:
    build: .
    environment:
      - ENVIRONMENT=docker
    command: ["streamlit", "run", "st.py", "--server.port=8501", "--server.address=0.0.0.0"]

  nginx:
    build: .docker/nginx
    ports:
      - "8000:8000"

  oauth2-proxy:
    image: quay.io/oauth2-proxy/oauth2-proxy:v7.8.1
    env_file:
      - .env

And that’s all. The advantage of using?oauth2-proxy?is that we don’t need to do anything within the Streamlit application to have OAuth2 authentication. This greatly simplifies the integration process, as all the authentication logic is handled outside the main application. Additionally,?oauth2-proxy?is compatible with any OAuth2 server that complies with OpenID, giving us the flexibility to use different authentication providers. By using Nginx as a reverse proxy, we can efficiently redirect and manage authentication requests, ensuring that only authenticated users can access our Streamlit application.

Full code available in my github account.

Real-time Data Visualization with R Shiny and External APIs

Today we will create a simple R Shiny frontend application that fetches real-time data from an external API. First, we have a backend API that provides a simple JSON response protected by a bearer token.

import random
from functools import wraps

from flask import Flask, jsonify, request

app = Flask(__name__)

VALID_TOKEN = "api-secret-token"


def token_required(valid_token=VALID_TOKEN):
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            token = None
            auth_header = request.headers.get('Authorization')
            if auth_header and auth_header.startswith('Bearer '):
                token = auth_header.split(' ')[1]

            if not token or token != valid_token:
                return jsonify({'message': 'Not valid token'}), 401

            return f(*args, **kwargs)

        return decorated

    return decorator


def generate_phone_number():
    area_code = random.randint(200, 999)
    prefix = random.randint(100, 999)
    line = random.randint(1000, 9999)
    return f"+1 {area_code}-{prefix}-{line}"


@app.get('/data')
@token_required(valid_token=VALID_TOKEN)
def get_data():
    return jsonify([
        {"phone": generate_phone_number(), "name": "Juan Pérez", "email": "juan@example.com"},
        {"phone": generate_phone_number(), "name": "María García", "email": "maria@example.com"},
        {"phone": generate_phone_number(), "name": "Carlos López", "email": "carlos@example.com"},
        {"phone": generate_phone_number(), "name": "Ana Martínez", "email": "ana@example.com"},
        {"phone": generate_phone_number(), "name": "Pablo Sánchez", "email": "pablo@example.com"},
        {"phone": generate_phone_number(), "name": "Laura Rodríguez", "email": "laura@example.com"},
        {"phone": generate_phone_number(), "name": "Diego Fernández", "email": "diego@example.com"},
        {"phone": generate_phone_number(), "name": "Carmen Gómez", "email": "carmen@example.com"},
        {"phone": generate_phone_number(), "name": "Javier Díaz", "email": "javier@example.com"},
        {"phone": generate_phone_number(), "name": "Sofía Ruiz", "email": "sofia@example.com"},
        {"phone": generate_phone_number(), "name": "Miguel álvarez", "email": "miguel@example.com"},
        {"phone": generate_phone_number(), "name": "Lucía Jiménez", "email": "lucia@example.com"},
        {"phone": generate_phone_number(), "name": "Alejandro Moreno", "email": "alejandro@example.com"},
        {"phone": generate_phone_number(), "name": "Elena Mu?oz", "email": "elena@example.com"},
        {"phone": generate_phone_number(), "name": "David Alonso", "email": "david@example.com"},
        {"phone": generate_phone_number(), "name": "Natalia Torres", "email": "natalia@example.com"},
        {"phone": generate_phone_number(), "name": "Roberto Gutiérrez", "email": "roberto@example.com"},
        {"phone": generate_phone_number(), "name": "Cristina Navarro", "email": "cristina@example.com"},
        {"phone": generate_phone_number(), "name": "Antonio Ramos", "email": "antonio@example.com"},
        {"phone": generate_phone_number(), "name": "Isabel Ortega", "email": "isabel@example.com"}
    ])

The frontend application is a simple R Shiny app that fetches data from the API and displays it in a table. The app also includes a button to refresh the data.

We’ll use renv to manage the R package dependencies. To set up the environment, run the following commands in your R console:

install.packages("renv")
renv::init()

Then, install the required packages:

install.packages("shiny")
install.packages("readxl")
install.packages("dplyr")
install.packages("qcc")
install.packages("ggplot2")
install.packages("shinyWidgets")
install.packages("dotenv")
install.packages("DT")
install.packages("httr")
install.packages("jsonlite")

renv::snapshot()

That’s the main.R file.

library(shiny)

source("api_client.R")

args <- commandArgs(trailingOnly = TRUE)
port <- if (length(args) >= 1) as.numeric(args[1]) else 3838
host <- if (length(args) >= 2) args[2] else "0.0.0.0"
launch_browser <- if (length(args) >= 3) as.logical(args[3]) else TRUE

runApp("app.R", port = port, host = host, launch.browser = launch_browser)

And the shiny app is in the app.R file.

library(shiny)

library(ggplot2)
library(dplyr)
library(shinyWidgets)
library(dotenv)
library(DT)

df <- data.frame()
load_dot_env()

config <- list(
  api_url = Sys.getenv("API_URL"),
  api_token = Sys.getenv("API_KEY")
)

ui <- fluidPage(
  titlePanel("R api call example"),

  fluidRow(
    column(12,
           actionButton("refresh", "Refresh data", icon = icon("refresh"), class = "btn-primary")
    )
  ),

  mainPanel(
    DTOutput("table")
  )
)

server <- function(input, output, session) {
  data <- reactiveVal(df)

  refreshData(data, config$api_url, config$api_token)

  observeEvent(input$refrescar, {
    refreshData(data, config$api_url, config$api_token)
  })

  output$table <- renderDT({
    data()
  })
}

shinyApp(ui = ui, server = server)

The api_client.R file contains the function to fetch data from the API.

get_data <- function(uri, token = NULL) {
  library(httr)
  library(jsonlite)

  showNotification("Updating data...", type = "message")

  headers <- c(`Content-Type` = "application/json", `Accept` = "application/json")
  if (!is.null(token)) {
    headers <- c(headers, Authorization = paste("Bearer", token))
  }

  response <- GET(url = uri, add_headers(.headers = headers))

  if (http_error(response)) {
    stop(sprintf("Error en la petición: %s", status_code(response)))
  }

  content_text <- content(response, "text", encoding = "UTF-8")
  df <- fromJSON(content_text, flatten = TRUE)

  if (is.list(df) && "data" %in% names(df)) {
    df <- df$data
  }

  if (!is.data.frame(df)) {
    df <- as.data.frame(df)
  }

  return(df)
}

We’re also using dotenv to manage environment variables. Create a .env file in the root of your project with the following content:

API_URL=http://localhost:5000/data
API_KEY=api-secret-token

Full code in my github account.

Implementing Industrial OPC UA Communication with Python and Asyncio

Today we’re going to work with an industrial protocol called OPC UA. We’ll be using the opcua-asyncio library to create a simple OPC UA server and client. We’ll also be using the `asyncio` library to handle the asynchronous communication between the server and the client. The idea es build a OPC UA server that exposes a variable and a client that reads and writes to that variable.

To simulate a changing variable, I’ve created a simple script that changes one variable every second with the value of the current time and persists it to a Redis database.

import logging
import time

import redis

from settings import REDIS_HOST, REDIS_PORT

logger = logging.getLogger(__name__)


def update_redis_variable_loop():
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    while True:
        timestamp_ms = int(time.time() * 1_000)
        r.set('ts', timestamp_ms)
        logger.info(f"Updated variable: {timestamp_ms}")
        time.sleep(1)

The server will have an authentication mechanism using a username and password, and it will also have a self-signed certificate and a private key to encrypt the communication. To generate the self-signed certificate and private key, you can use the following commands:

openssl genpkey -algorithm RSA -out private_key.pem
openssl req -new -key private_key.pem -out certificate.csr
openssl x509 -req -days 365 -in certificate.csr -signkey private_key.pem -out certificate.pem

This OPC UA server will expose the variable that we’re updating in the Redis database.

class UserManager:
    def get_user(self, iserver, username=None, password=None, certificate=None):
        if certificate and OPC_USERS_DB.get(username, False) == password:
            logger.info(f"User '{username}' authenticated")
            return User(role=UserRole.User)
        return None


async def main():
    server = Server(user_manager=UserManager())
    await server.init()
    server.set_endpoint(OPC_ENDPOINT)

    await server.load_certificate(OPC_CERTIFICATE)
    await server.load_private_key(OPC_PRIVATE_KEY)
    server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])

    namespace_idx = await server.register_namespace(OPC_NAMESPACE)
    obj = await server.nodes.objects.add_object(namespace_idx, "Gonzalo")
    var = await obj.add_variable(namespace_idx, "T", 0, datatype=ua.VariantType.Int32)
    await var.set_writable(False)

    redis_client = await redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    logger.info(f"Starting server on {OPC_ENDPOINT}")

    async with server:
        while True:
            await asyncio.sleep(1)
            value = await redis_client.get('ts')
            if value is not None:
                value = int(value)
                logger.info(f"Set value of {var} to {value}")
                await var.write_value(value)


def server(debug: bool = False):
    asyncio.run(main(), debug=debug)

And now we create a OPC UA client that reads the variable from the server and prints it to the console.

import asyncio
import logging

from asyncua import Client

from settings import OPC_ENDPOINT, OPC_CERTIFICATE, OPC_PRIVATE_KEY, OPC_USERNAME, OPC_PASSWORD

logger = logging.getLogger(__name__)


async def main():
    c = Client(url=OPC_ENDPOINT)
    c.set_user(OPC_USERNAME)
    c.set_password(OPC_PASSWORD)
    await c.set_security_string(f"Basic256Sha256,SignAndEncrypt,{OPC_CERTIFICATE},{OPC_PRIVATE_KEY}")

    async with c:
        node = c.get_node("ns=2;i=2")
        value = await node.read_value()
        logger.info(f"Value: {value}")


def client(debug: bool = False):
    asyncio.run(main(), debug=debug)

In our example we are using click to create a CLI interface to run the server and the client.

# Start Redis server
docker-compose up

# Start the process that updates the variable in Redis
python cli.py backend

# Run the server
python cli.py server

# Run the client
python cli.py client

Full code available in my github account

Creating a standalone WebSocket Server with FastApi and JWT Authentication in Python

In this post, I will show you how to create a WebSocket server in Python that uses JWT tokens for authentication. The server is designed to be independent of the main process, making it easy to integrate into existing applications. The client-side JavaScript will handle reconnections incrementally.

The WebSocket server will be created using FastApi, the web framework built on top of Starlette. This is the entrypoint.
import logging

from fastapi import FastAPI

from asgi_ws import setup_app

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)
SECRET_KEY = "your_secret_key"

app = FastAPI()

app = setup_app(
    app=app,
    base_path='/ws',
    jwt_secret_key=SECRET_KEY,
)
The `setup_app` function is defined in the `lib.websockets` module. This function will set up the WebSocket server and the necessary routes.
def setup_app(app, jwt_secret_key: str, base_path='/ws', jwt_algorithm: str = "HS256"):
    ws_router = get_ws_router(
        jwt_secret_key=jwt_secret_key,
        jwt_algorithm=jwt_algorithm,
        base_path=base_path
    )
    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"

    app.mount("/js", StaticFiles(directory=static_dir), name="js")
    app.include_router(ws_router)

    return app
The `get_ws_router` function is defined in the same module. This function will create the WebSocket router and the necessary routes.
def get_ws_router(jwt_secret_key: str, base_path='ws', jwt_algorithm: str = "HS256"):
    ws_router = APIRouter()

    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"
    ws_router.mount(f"{base_path}/js", StaticFiles(directory=static_dir), name="js")

    manager = ConnectionManager(jwt_secret_key=jwt_secret_key, jwt_algorithm=jwt_algorithm)

    @ws_router.post(f"{base_path}/emmit")
    async def emmit_endpoint(request: Request):
        payload = await request.json()
        await manager.broadcast(payload["channel"], payload["payload"])
        return True

    @ws_router.websocket(f"{base_path}/")
    async def websocket_endpoint(websocket: WebSocket):
        token = websocket.query_params.get("token")
        if not token:
            await websocket.close(code=1008)
            raise HTTPException(status_code=401, detail="Token required")

        await manager.connect(websocket, token)
        try:
            while True:
                message: Message = await websocket.receive()
                if message["type"] == "websocket.disconnect":
                    manager.disconnect(websocket)
                    break
        except WebSocketDisconnect:
            manager.disconnect(websocket)

    return ws_router
Websockets are bidirectional communication channels that allow real-time data transfer between clients and servers, but I prefer to avoid the communication from the client to the server. When a client wants to send a message to the server, it will send an HTTP POST request to the `/emit` endpoint (via the main process). The server will then broadcast the message to all connected clients. The client will only receive messages from the server. Because of that we need a main wsgi process using FastApi or another web framework to handle the HTTP requests. 

This an example with FastApi:
<pre class="wp-block-syntaxhighlighter-code"><!DOCTYPE html>
<html>
<head>
    <title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>

<input type="text" id="messageText" autocomplete="off"/>
<button onclick="sendMessage()">Send</button>

<ul id='messages'>
</ul>
<a href="8000/js/websockets.js">//localhost:8000/js/websockets.js</a>
<script>
    async function sendMessage() {
        const channel = 'chat';
        const url = `/emit/${channel}`;
        const input = document.getElementById("messageText");
        const message = input.value;
        input.value = '';
        const body = JSON.stringify({channel: 'chat1', payload: message});
        const headers = {'Content-Type': 'application/json'};

        try {
            const response = await fetch(url, {method: 'POST', headers: headers, body: body});
        } catch (error) {
            console.error('Error:', error);
        }
    }

    (async function () {
        const getToken = async () => {
            const response = await fetch('/token');
            const {token} = await response.json();
            return token;
        };

        const messageCallback = (event) => {
            const messages = document.getElementById('messages');
            const message = document.createElement('li');
            message.textContent = event.data;
            messages.appendChild(message);
        };

        const wsManager = new WebSocketManager('ws://localhost:8000/ws/', getToken, messageCallback);
        await wsManager.connect();
    })();

</script>
</body>
</html></pre>

Library is available at pypi

poetry add asgi_ws
pip install asgi_ws

Full code available in my github account.

Creating a Real-Time Flask Application with Flask-SocketIO and Redis

Today, we’re going to create a simple Flask application with real-time communication using websockets and the SocketIO library. We’ll leverage the Flask-SocketIO extension for integration.

Here’s the plan: while websockets support bidirectional communication, we’ll use them exclusively for server-to-client messages. For client-to-server interactions, we’ll stick with traditional HTTP communication.

Our application will include session-based authentication. To simulate login, we’ve created a route called /login that establishes a session. This session-based authentication will also apply to our websocket connections.

A key objective of this tutorial is to enable sending websocket messages from outside the web application. For instance, you might want to send messages from a cron job or an external service. To achieve this, we’ll use a message queue to facilitate communication between the SocketIO server and the client application. We’ll utilize Redis as our message queue.

That’s the main application

from flask import Flask, render_template, session, request

from lib.ws import register_ws, emit_event, EmitWebsocketRequest
from settings import REDIS_HOST, WS_PATH

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

register_ws(app=app, socketio_path=WS_PATH, redis_host=REDIS_HOST)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/login')
def login():
    session['user'] = 'Gonzalo'
    return dict(name=session['user'])


@app.post('/api/')
def api():
    data = EmitWebsocketRequest(**request.json)
    emit_event(data.channel, data.body)

    return dict(status=True)

That’s the html template

<pre class="wp-block-syntaxhighlighter-code"><!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Flask-SocketIO Websocket Example</title>
    <a href="//cdn.socket.io/4.0.0/socket.io.min.js">//cdn.socket.io/4.0.0/socket.io.min.js</a>
</head>
<body>

<h1>Flask-SocketIO Websocket Example</h1>
<label for="message">Message:</label>
<input type="text" id="message" placeholder="type a message...">

<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>

<script>
    document.addEventListener("DOMContentLoaded", function () {
        let host = location.protocol + '//' + location.hostname + ':' + location.port
        let socket = io.connect(host, {
            path: '/ws/socket.io',
            reconnection: true,
            reconnectionDelayMax: 5000,
            reconnectionDelay: 1000
        });

        socket.on('connect', function () {
            console.log('Connected to ws');
        });

        socket.on('disconnect', function () {
            console.log('Disconnected from ws');
        });

        socket.on('message', function (msg) {
            let messages = document.getElementById('messages');
            let messageItem = document.createElement('li');
            messageItem.textContent = msg;
            messages.appendChild(messageItem);
        });

        window.sendMessage = async function () {
            const url = '/api/';
            const payload = {"channel": "message", "body": this.message.value};

            try {
                const response = await fetch(url, {
                    method: 'POST',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify(payload)
                });

                if (!response.ok) {
                    console.error('Error: ' + response.statusText);
                }

                await response.json();
            } catch (error) {
                console.error('Error:', error);
            }
        };
    });
</script>
</body>
</html></pre>

The register_ws function binds SocketIO to our Flask server. To enable sending messages from outside our Flask application, we need to instantiate SocketIO in two different ways. For this purpose, I’ve created a ws.py file. Note: I’m using Pydantic to validate the HTTP requests.

import logging
from typing import Dict, Any, Union

from flask import session
from flask_socketio import SocketIO
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class Conf:
    def __init__(self, socketio=None):
        self._socketio = socketio

    @property
    def socketio(self):
        return self._socketio

    @socketio.setter
    def socketio(self, value):
        self._socketio = value


conf = Conf()


def emit_event(channel, body):
    conf.socketio.emit(channel, body)


class EmitWebsocketRequest(BaseModel):
    channel: str
    body: Union[Dict[str, Any], str]


def setup_ws(redis_host, redis_port=6379):
    conf.socketio = SocketIO(message_queue=f'redis://{redis_host}:{redis_port}')


def register_ws(
        app,
        redis_host,
        socketio_path='/ws/socket.io',
        redis_port=6379
):
    redis_url = f'redis://{redis_host}:{redis_port}' if redis_host else None
    conf.socketio = SocketIO(app, path=socketio_path, message_queue=redis_url)

    @conf.socketio.on('connect')
    def handle_connect():
        if not session.get("user"):
            raise ConnectionRefusedError('unauthorized!')
        logger.debug(f'Client connected: {session["user"]}')

    @conf.socketio.on('disconnect')
    def handle_disconnect():
        logger.debug('Client disconnected')

    return conf.socketio

Now, we can emit an event from outside the Flask application.

from lib.ws import emit_event, setup_ws
from settings import REDIS_HOST

setup_ws(redis_host=REDIS_HOST)
emit_event('message', 'Hi')

The application needs a Redis server. I set up the server using docker.

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Source code available in my github.