Learning Python
I'm starting to understand how awesome Python is for some specific tasks. This would be the place for me to learn to make stuff.
Extracting PDF to text
Instead of just sending the file to some LLMs, it's better to extract it first and let them read by text so it's lighter.
import pdfplumber
def extract_text_from_pdfs(pdf_files):
extracted_text = []
for pdf_file in pdf_files:
with pdfplumber.open(pdf_file) as pdf:
text = ''
for page in pdf.pages:
text += page.extract_text() + '\n'
extracted_text.append(text)
return extracted_text
Converting using FFMPEG
Download and add an environment variable
import subprocess
CurrentFileName = 'input/audio.m4a'
FinalFileName = 'output/audio.mp3'
try:
subprocess.call([
'ffmpeg',
'-i', CurrentFileName, # Input file
'-ac', '1', # Set audio channels to 1 (mono)
'-ar', '16000', # Set sample rate to 16000 Hz
FinalFileName # Output file
])
except Exception as e:
print(e)
print('Error While Converting Audio')
ch = input('Press Enter to Close')
Extracting audio to text using free Google Web Speech API
This can only be processing audio for 60 seconds using the free service
import speech_recognition as sr
# Initialize the recognizer
recognizer = sr.Recognizer()
# Path to your audio file
audio_file = "output/audio.wav"
# Convert audio to text
try:
# Load the audio file
with sr.AudioFile(audio_file) as source:
print("Processing audio...")
audio_data = recognizer.record(source, duration=60) # Record the entire audio file
# Recognize the speech
text = recognizer.recognize_google(audio_data, language="id-ID")
print("Text extracted from audio:")
print(text)
except sr.UnknownValueError:
print("Google Web Speech API could not understand the audio.")
except sr.RequestError as e:
print(f"Error with the request to Google API: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Speech-to-text Google Cloud (paid)
There's a lot to be done here, first, you need to make the file sound to mono and the sample rate to 16000, this is better because it takes less storage than making it to 46000 sample rate. There was a choice where its either I make the audio file separate or 1 long audio to process. I want to be done with an use the long audio, so we need to use Async request and use GCS to store the file to process.
def speech_to_text_google(gcs_uri, output_file):
client = speech.SpeechClient()
audio_config = speech.RecognitionAudio(uri=gcs_uri)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
language_code="id-ID"
)
# Asynchronous request
operation = client.long_running_recognize(config=config, audio=audio_config)
print("Waiting for operation to complete...")
#response = operation.result(timeout=90) # Adjust timeout as needed
while not operation.done():
print("Still processing...")
time.sleep(30) # Wait for 30 seconds before checking again
if operation.done():
response = operation.result() # Retrieve the result
# Open a file for writing
with open(output_file, "w", encoding="utf-8") as file:
for result in response.results:
transcript = result.alternatives[0].transcript
print("Transcript:", transcript) # Print to console (optional)
file.write(transcript + "\n") # Write each transcript to the file
speech_to_text_google(gcs_uri , output_file_path)
Upload to GCS
Since we cannot process large files straight from my local where there's default timeout, in my case audio file, we need to put it in google cloud storage to make my other function work.
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "filename.json"
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
# Check if the file already exists in the bucket
if blob.exists():
print(f"File {destination_blob_name} already exists in GCS.")
gcs_uri = f"gs://{bucket_name}/{destination_blob_name}"
return gcs_uri
# Upload the file if it does not exist
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
gcs_uri = f"gs://{bucket_name}/{destination_blob_name}"
return gcs_uri
# Example usage
filename = "audio-1"
audio_file_path = f"output/{filename}.wav"
output_file_path = f"input/{filename}.txt"
# Google Cloud Storage settings
bucket_name = "bucket_name" # Replace with your GCS bucket name
destination_blob_name = f"{filename}.wav" # File name in GCS
# Upload the audio file to GCS (or skip if already exists)
gcs_uri = upload_to_gcs(bucket_name, audio_file_path, destination_blob_name)
Using OpenAI
At some point, we'll need to use OpenAI, here's the way
import openai
openai.api_key = 'YOUR_OPENAI_API_KEY'
def summarize_text(text):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", # Use GPT-3.5-turbo or newer available model
messages=[
{"role": "system", "content": "You are an AI assistant that summarizes texts."},
{"role": "user", "content": f"Please provide a summary for the following text:\n{text}"}
],
max_tokens=150,
temperature=0.7
)
return response['choices'][0]['message']['content'].strip()
except openai.error.OpenAIError as e:
return f"An error occurred: {str(e)}"
summarized_data = [summarize_text(text) for text in pdf_texts]
Sending Email Using SMTP
Sending email using SMTP, this is one option as we don't have another transactional email service to use.
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
def send_email(subject, body, to_email):
from_email = "your_email@example.com"
password = "your_password"
# Set up the MIME
message = MIMEMultipart()
message['From'] = from_email
message['To'] = to_email
message['Subject'] = subject
message.attach(MIMEText(body, 'plain'))
# Send the email
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(from_email, password)
server.send_message(message)
email_body = "\n\n".join(summarized_data)
send_email("Summarized PDF Data", email_body, "recipient@example.com")
Make exe from .py
Make sure you have your icon ready
py -m venv venv
.\venv\Scripts\activate
pip install pyinstaller
pyinstaller --onefile --icon=icon-console.ico index.py
Adding requirements.txt
Making requirements.txt will add all the required packages, after adding the required packages to the file run this.
pip install -r requirements.txt
Changing Python version
I've had trouble on some package, need to change the version to support the package.
after installing run below to check if the python is installed
py -0
Forecasting
Time Series Models - Traditional statistical methods that explicitly use temporal data patterns such as trends, seasonality, and autoregressive behavior.
Ex: ARIMA (AutoRegressive Integrated Moving Average), SARIMA (Seasonal ARIMA), ETS (Exponential Smoothing State Space)
Machine Learning Models - Predictive models that learn patterns from historical data without explicitly modeling time dependencies.
Ex: Linear Regression, Random Forests, Gradient Boosting (XGBoost, LightGBM, CatBoost)
Deep Learning Models - Neural network-based methods that excel in capturing complex patterns in data, including long-term temporal dependencies.
Ex: LSTM (Long Short-Term Memory), GRU (Gated Recurrent Unit), Transformer-based models (e.g., Attention mechanisms)
Hybrid Models - Combines the strengths of traditional time series models with machine learning or deep learning models.
Ex: ARIMA combined with a neural network for residual forecasting.
SARIMAX for trend/seasonality and gradient boosting for residuals.
pip install statsmodels
# Select the columns to use
columns_to_use = ['date', 'machine_id', 'cycle_time', 'downtime', 'production_count']
data = data[columns_to_use]
# Convert 'date' to datetime format
data['date'] = pd.to_datetime(data['date'])
# Sort data by date and machine_id
data = data.sort_values(by=['date', 'machine_id'])
# Initialize a DataFrame to store all forecasts
all_forecasts = []
# Group the data by 'machine_id' and apply the ARIMA forecast for each machine
grouped = data.groupby('machine_id')
for machine_id, group in grouped:
# Sort the group by date
group = group.sort_values(by='date')
# Train a VAR model for multivariate forecasting
model = VAR(group[['production_count', 'cycle_time', 'downtime']])
model_fit = model.fit(maxlags=15)
forecast_steps = 30
forecast = model_fit.forecast(y=group[['production_count', 'cycle_time', 'downtime']].values, steps=forecast_steps)
# Create a new DataFrame for forecasted data
forecast_dates = pd.date_range(start=group['date'].max() + pd.Timedelta(days=1), periods=forecast_steps, freq='D')
forecast_df = pd.DataFrame(forecast, columns=['production_count', 'cycle_time', 'downtime'])
forecast_df['date'] = forecast_dates
forecast_df['machine_id'] = machine_id
# Append the forecast_df to the list
all_forecasts.append(forecast_df)
# Concatenate all forecasts into a single DataFrame
forecast_result = pd.concat(all_forecasts)
# Round values for better readability
forecast_result['cycle_time'] = forecast_result['cycle_time'].round(2)
forecast_result['downtime'] = forecast_result['downtime'].round(2)
forecast_result['production_count'] = forecast_result['production_count'].round(1)
# Rearrange columns to the correct order
forecast_result = forecast_result[['date', 'machine_id', 'cycle_time', 'downtime', 'production_count']]
# Sort the forecasted data by date and machine_id
forecast_result = forecast_result.sort_values(by=['date', 'machine_id'])
# Save the forecasted data to a new CSV file with the correct column order
forecast_result.to_csv('assets/result/forecasted_production_data_reordered.csv', index=False)
Setting up docker
# Use an official Python runtime as a parent image
FROM python:3.10-slim
# Set the working directory in the container
WORKDIR /app
# Copy the project files into the container
COPY . .
# Install dependencies from requirements.txt or using pip
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
# Set the environment variables from the .env file
ENV PYTHONUNBUFFERED 1
# Run the Python script manually (modify as per your need)
CMD ["python", "manual.py"]
docker build -t takehome .
docker run -d --name takehome_container takehome
docker run -it takehome /bin/bash