Developing an ETL Pipeline for Massive NOAA Sensor Data: From Raw Files to Predictive Insights
Marcos Cedenilla
November 5, 2024
Building upon the foundational project overview introduced in the first article and the Kubernetes infrastructure detailed in the second installment, this article delves into the heart of our data processing workflow — the ETL (Extract, Transform, Load) pipeline. This pipeline is meticulously designed to handle over 31 GB of NOAA’s Global Historical Climatology Network-Daily (GHCN-Daily) data, transforming raw sensor data into structured information ready for predictive analytics and interactive visualizations.
In this section, we will explore the comprehensive ETL process that processes massive text-based datasets, ensuring efficient data handling, transformation, and storage. By leveraging Apache Spark for distributed data processing and orchestrating the workflow with Kubernetes, the pipeline not only manages vast amounts of data but also ensures scalability, reliability, and optimal performance. Additionally, the integration of spatial metadata from sensor information enhances our ability to perform geospatial queries, providing deeper insights into climatic patterns based on location.
Project Objective
The primary goal of this project is to develop a robust ETL pipeline that processes extensive sensor data to generate accurate predictions and interactive visualizations. By doing so, users can explore and analyze climatic trends with precision and ease. The pipeline is engineered to handle the sheer volume of data efficiently, transforming raw text files into structured data stored in a PostgreSQL database, enriched with spatial information for geospatial queries.
Motivation and Advantages of the Approach
1. Handling Massive Data Volumes
The GHCN-Daily dataset, specifically the ghcnd_all.tar.gz
file from National Oceanic and Atmospheric Administration datasets, encompasses a comprehensive collection of daily climatic records, totaling approximately 31 GB of text data. Processing such a vast amount of information necessitates a solution that offers both high processing capacity and flexibility in data transformation.
2. Leveraging Apache Spark
Apache Spark is chosen as the cornerstone for data processing due to its capabilities in distributed computing and its efficiency in managing large datasets. The advantages of utilizing Spark include:
- Scalability: Spark can seamlessly scale horizontally to accommodate increasing data loads without performance degradation.
- Speed: Its in-memory processing significantly accelerates ETL tasks compared to traditional disk-based methods.
- Flexibility: Spark supports multiple programming languages and offers a rich set of libraries for diverse data analysis needs.
3. Orchestration with Kubernetes
Kubernetes serves as the orchestration platform for managing Spark jobs within Docker containers. The integration of Kubernetes provides several benefits:
- Automated Deployment: Kubernetes simplifies the deployment and updating of applications, ensuring consistency and repeatability.
- Dynamic Scalability: It allows for automatic scaling of resources based on demand, crucial for handling data processing spikes.
- High Availability and Fault Tolerance: Kubernetes ensures that critical tasks remain operational even in the event of hardware or software failures.
4. Custom Library of ETL Functions
To optimize the ETL process, a custom library of utility functions has been developed. This library standardizes and modularizes the extraction, transformation, and loading operations, enhancing code maintainability and reusability.
Detailed ETL Process
The ETL pipeline is meticulously crafted to transform raw .dly
files and sensor metadata into a structured and queryable database format. Below, I outline each stage of the ETL process, highlighting the transformations applied to the data.
1. Configuration Setup (config.py
)
The config.py
file centralizes all configurations required for the ETL process, including file patterns, Spark settings, data directories, and JDBC connection details. This modular approach allows for easy adjustments and scalability.
# config.py
from pathlib import Path
FILE_PATTERN = ".dly"
# Spark configuration
MEASURES_APP_NAME = "NOAA ETL"
SPARK_MASTER = "local[*]" # Use all cores in the container
# Add shuffle partition configuration
SPARK_CONFIG = {
"spark.sql.shuffle.partitions": "200",
"spark.executor.memoryOverhead": "1g", # Extra memory for heavy execution
}
# Path to the folder with .dly files
DATA_DIR = Path("/data/")
JDBC_WRITE_OPTIONS = {
"batchsize": "100000", # Increased batch size to optimize large inserts
"isolationLevel": "NONE",
"numPartitions": "20", # Increase partitions to utilize 8 CPU cores
"customSql": "ON CONFLICT DO NOTHING"
}
# Column specifications for .dly file
MEASURES_COLUMN_SPECS = [
(0, 11, 'ID'),
(11, 15, 'YEAR'),
(15, 17, 'MONTH'),
(17, 21, 'ELEMENT'),
]
MEASURES_COLUMN_TYPES = {
'YEAR': "integer",
'MONTH': "integer"
}
# Add specifications for VALUE1 to VALUE31
for i in range(31):
start_pos = 21 + i * 8
end_pos = start_pos + 5 # Each value occupies 5 characters
col_name = f'VALUE{i+1}'
MEASURES_COLUMN_SPECS.append((start_pos, end_pos, col_name))
# Add data types for VALUE1 to VALUE31
for i in range(31):
MEASURES_COLUMN_TYPES[f'VALUE{i+1}'] = "integer"
# Number of value columns
NUM_VALUE_COLUMNS = 31
STATION_FILE = DATA_DIR / "stations.txt"
# Spark application name
STATION_APP_NAME = 'StationDataETL'
# URL to download station data
STATION_DATA_URL = 'https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt'
# Column specifications for extracting data from the station file
STATION_COLUMN_SPECS = [
(0, 11, 'ID'),
(12, 20, 'LATITUDE'),
(21, 30, 'LONGITUDE'),
(31, 37, 'ELEVATION'),
(41, 71, 'NAME')
]
# Data types for columns
STATION_COLUMN_TYPES = {
'LATITUDE': 'double',
'LONGITUDE': 'double',
'ELEVATION': 'double',
}
# Logging configuration
LOG_LEVEL = 'INFO'
2. ETL Task Functions (etl_task.py
)
The etl_task.py
file encapsulates the core functions required for each stage of the ETL process. These functions handle the creation of Spark sessions, downloading files, reading and parsing data, transforming data formats, and loading data into the PostgreSQL database.
# jobs/etl_task.py
from pathlib import Path
from typing import List, Optional
import urllib.request
import os
import logging
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
col, when, substring, expr, concat_ws, lpad, to_date
)
from pyspark.sql.types import IntegerType, DataType
import config.config as config
def create_spark_session(name) -> SparkSession:
"""
Create and return a SparkSession.
"""
spark_builder = SparkSession.builder.appName(name)
# Aplicar configuraciones adicionales
for key, value in config.SPARK_CONFIG.items():
spark_builder = spark_builder.config(key, value)
return spark_builder.getOrCreate()
def download_file(url: str, destination: Path) -> Path:
"""
Download a file from a URL to a local destination.
"""
if not destination.exists():
urllib.request.urlretrieve(url, str(destination))
logging.info(f'Downloaded file from {url} to {destination}')
else:
logging.info(f'File {destination} already exists.')
return destination
def read_text_file(spark: SparkSession, file_path: Path) -> DataFrame:
"""
Read a text file into a Spark DataFrame.
"""
df_raw = spark.read.text(str(file_path))
return df_raw
def get_file_list(data_dir: Path = None) -> Optional[List[str]]:
"""
Get the list of files matching the given pattern in the specified directory.
"""
if data_dir is None:
data_dir = config.DATA_DIR
file_list = list(data_dir.glob("*.dly"))
if not file_list:
logging.info(f'No files matching pattern found in {data_dir}.')
return None
logging.info(f'Found {len(file_list)} files.')
return [str(file) for file in file_list]
def read_raw_data(spark: SparkSession, file_list: List[str]) -> DataFrame:
"""
Read the data files and return a raw DataFrame.
Args:
spark (SparkSession): The SparkSession object.
file_list (List[str]): List of file paths to read.
Returns:
DataFrame: The raw DataFrame containing the data from the files.
"""
df_raw = spark.read.text(file_list)
return df_raw
def extract_columns(df_raw: DataFrame, column_specs: List[tuple]) -> DataFrame:
"""
Extract columns from the raw DataFrame according to the specifications.
Args:
df_raw (DataFrame): The raw DataFrame.
column_specs (List[tuple]): List of tuples specifying (start_pos,
end_pos, column_name).
Returns:
DataFrame: DataFrame with the extracted columns.
"""
df = df_raw
for start_pos, end_pos, col_name in column_specs:
length = end_pos - start_pos
df = df.withColumn(col_name, substring('value', start_pos + 1, length))
return df
def cast_columns_to_type(df: DataFrame, columns: dict[str, DataType]) -> DataFrame:
"""
Casts specified columns of a PySpark DataFrame to the given data types.
Args:
df (DataFrame): Input PySpark DataFrame.
columns (dict[str, DataType]): A dictionary where the keys are the column names and the values are the desired data types.
Returns:
DataFrame: A new DataFrame with specified columns cast to the given data types.
"""
for column, dtype in columns.items():
df = df.withColumn(column, col(column).cast(dtype))
return df
def handle_value_missing(df: DataFrame) -> DataFrame:
"""
Casts columns that start with 'VALUE' to IntegerType and replaces -9999 with None.
Args:
df (DataFrame): Input PySpark DataFrame.
Returns:
DataFrame: A new DataFrame with 'VALUE' columns cast to IntegerType and -9999 replaced with None.
"""
for col_name in df.columns:
if col_name.startswith('VALUE'):
# Cast the column to IntegerType
df = df.withColumn(col_name, col(col_name).cast(IntegerType()))
# Replace -9999 with None (null)
df = df.withColumn(
col_name,
when(col(col_name) == -9999, None).otherwise(col(col_name))
)
return df
def convert_columns(df: DataFrame, types: dict) -> DataFrame:
"""
Convert columns to their corresponding data types.
Args:
df (DataFrame): The DataFrame with extracted columns.
Returns:
DataFrame: DataFrame with columns converted to appropriate data types.
"""
df = df.drop('value') # Remove the original 'value' column
df = cast_columns_to_type(df, types)
df = handle_value_missing(df)
return df
def unpivot_data(df: DataFrame, num_value_columns: int) -> DataFrame:
"""
Convert the DataFrame from wide to long format using stack.
Args:
df (DataFrame): The DataFrame with converted columns.
num_value_columns (int): Number of value columns to unpivot.
Returns:
DataFrame: Unpivoted DataFrame in long format.
"""
value_cols = [f'VALUE{i+1}' for i in range(num_value_columns)]
num_values = len(value_cols)
expr_str = ", ".join(
[f"'{i+1}', {col}" for i, col in enumerate(value_cols)]
)
df_long = df.select(
'ID', 'YEAR', 'MONTH', 'ELEMENT',
expr(f"stack({num_values}, {expr_str}) as (DAY, VALUE)")
)
return df_long
def format_date_columns(df_long: DataFrame) -> DataFrame:
"""
Format date-related columns and create the DATE column.
Args:
df_long (DataFrame): The unpivoted DataFrame in long format.
Returns:
DataFrame: DataFrame with formatted date columns and DATE column.
"""
# Convert 'DAY' to IntegerType
df_long = df_long.withColumn('DAY', col('DAY').cast(IntegerType()))
# Ensure 'MONTH' and 'DAY' have two digits
df_long = df_long.withColumn('MONTH', lpad(col('MONTH'), 2, '0'))
df_long = df_long.withColumn('DAY', lpad(col('DAY'), 2, '0'))
# Create 'DATE' column in 'yyyy-MM-dd' format
df_long = df_long.withColumn(
'DATE',
to_date(concat_ws('-', col('YEAR'), col('MONTH'), col('DAY')), 'yyyy-MM-dd')
)
return df_long
def clean_data(df_long: DataFrame) -> DataFrame:
"""
Remove rows with null values and select final columns.
Args:
df_long (DataFrame): The DataFrame with formatted date columns.
Returns:
DataFrame: Cleaned DataFrame ready for loading.
"""
df_clean = df_long.filter(
col('VALUE').isNotNull() & col('DATE').isNotNull()
).select('ID', 'DATE', 'ELEMENT', 'VALUE')
return df_clean
def repartition_data(df_clean: DataFrame, num_partitions: int) -> DataFrame:
"""
Repartition the DataFrame to optimize writing.
Args:
df_clean (DataFrame): The cleaned DataFrame.
num_partitions (int): Number of partitions to repartition the
DataFrame.
Returns:
DataFrame: Repartitioned DataFrame.
"""
df_repartitioned = df_clean.repartition(num_partitions, 'ID')
return df_repartitioned
def extract_etl(spark: SparkSession) -> Optional[DataFrame]:
"""
Extract step of ETL: Read raw data from files.
Args:
spark (SparkSession): The SparkSession object.
Returns:
Optional[DataFrame]: The raw DataFrame, or None if no files found.
"""
file_list = get_file_list(config.DATA_DIR)
if not file_list:
return None, None
df_raw = read_raw_data(spark, file_list)
return df_raw, file_list
def transform_measures(df_raw: DataFrame) -> DataFrame:
"""
Transform step of ETL: Process raw DataFrame into clean DataFrame.
Args:
df_raw (DataFrame): The raw DataFrame.
Returns:
DataFrame: The transformed and repartitioned DataFrame ready for
loading.
"""
df_extracted = extract_columns(df_raw, config.MEASURES_COLUMN_SPECS)
df_converted = convert_columns(df_extracted, config.MEASURES_COLUMN_TYPES)
df_unpivoted = unpivot_data(df_converted, config.NUM_VALUE_COLUMNS)
df_formatted = format_date_columns(df_unpivoted)
df_clean = clean_data(df_formatted)
df_repartitioned = repartition_data(
df_clean, int(config.JDBC_WRITE_OPTIONS['numPartitions'])
)
return df_repartitioned
def load_etl(
df: DataFrame,
url: str,
table: str,
properties: dict,
write_options: dict
) -> None:
"""
Load step of ETL: Write the DataFrame to the database using JDBC.
Args:
df (DataFrame): The DataFrame to write to the database.
url (str): JDBC URL for the database connection.
properties (dict): JDBC connection properties.
write_options (dict): Additional write options.
Returns:
None
"""
write_properties = {**properties, **write_options}
df.write.jdbc(
url=url,
table=table,
mode='append',
properties=write_properties
)
def delete_processed_files(file_list: List[str]) -> None:
"""
Delete the processed raw files.
Args:
file_list (List[str]): List of file paths to delete.
Returns:
None
"""
for file_path in file_list:
try:
os.remove(file_path)
logging.info(f"Deleted file: {file_path}")
except Exception as e:
logging.error(f"Error deleting file {file_path}: {e}")
3. Pipeline for Sensor Metadata (pipeline_sensors.py
)
The pipeline_sensors.py
script manages the ETL process for sensor metadata. It downloads the station data, processes it, and loads it into the sensors
table in the PostgreSQL database. This metadata includes spatial information crucial for geospatial queries and visualizations.
# pipeline.py
import logging
from pathlib import Path
import config.config as config
from jobs.etl_task import (
create_spark_session,
delete_processed_files,
download_file,
read_text_file,
extract_columns,
convert_columns,
load_etl
)
import os
if __name__ == '__main__':
# Configure logging
logging.basicConfig(level=getattr(logging, config.LOG_LEVEL))
jdbc_url = os.getenv("DATABASE_URL") # Ahora sin credenciales
jdbc_properties = {
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"driver": "org.postgresql.Driver"
}
# Create Spark session
spark = create_spark_session(config.STATION_APP_NAME)
download_file(config.STATION_DATA_URL, config.STATION_FILE)
# Read the station data into a DataFrame
df_raw = read_text_file(spark, config.STATION_FILE)
# Extract columns according to specifications
column_specs = config.STATION_COLUMN_SPECS
df_extracted = extract_columns(df_raw, column_specs)
# Convert columns to appropriate data types
column_types = config.STATION_COLUMN_TYPES
df_converted = convert_columns(df_extracted, column_types)
# Select relevant columns
df_station = df_converted.select('ID', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'NAME')
# Write the DataFrame to the SQL database
load_etl(
df=df_station,
url=jdbc_url,
table="sensors",
properties=jdbc_properties,
write_options=config.JDBC_WRITE_OPTIONS
)
delete_processed_files([config.STATION_FILE])
logging.info('Station data ETL process completed successfully.')
4. Pipeline for Measurements (pipeline.py
)
The pipeline.py
script orchestrates the ETL process for climatic measurements. It handles the extraction of raw .dly
files, transforms the data into a structured format, and loads it into the measurements
table in the PostgreSQL database.
# pipeline/pipeline.py
from jobs.etl_task import (
create_spark_session,
extract_etl,
transform_measures,
load_etl,
delete_processed_files
)
import os, sys
import logging
from config.config import JDBC_WRITE_OPTIONS as write_options
import config.config as config
def main():
"""
Run the ETL pipeline by calling extract, transform, and load functions.
"""
# Configurar las propiedades de la base de datos
jdbc_url = os.getenv("DATABASE_URL") # Ahora sin credenciales
jdbc_properties = {
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"driver": "org.postgresql.Driver"
}
spark = create_spark_session(config.MEASURES_APP_NAME)
logging.info("MEASUREMENTS SESSION CREATED")
try:
# Extract step
df_raw, file_list = extract_etl(spark)
if df_raw is None:
# No data to process
logging.info("No raw data found. Exiting ETL.")
spark.stop()
return
logging.info("Measurement Data Extracted")
# Transform step
df_transformed = transform_measures(df_raw)
logging.info("Measurement Data Transformed")
# Load step
load_etl(
df_transformed,
jdbc_url,
"measurements",
jdbc_properties,
write_options
)
logging.info("Measurement Data Loaded")
# Eliminar los archivos procesados
if file_list:
delete_processed_files(file_list)
logging.info("Measurement files deleted")
logging.info("ETL measurements pipeline completed successfully.")
except Exception as e:
logging.error(f"ETL pipeline failed: {e}")
sys.exit(1)
finally:
# Stop the Spark session
spark.stop()
if __name__ == "__main__":
main()
Detailed ETL Transformation Process
The ETL pipeline meticulously processes raw .dly
files and sensor metadata to transform them into a structured and queryable format. Below is a step-by-step breakdown of the transformations applied to the data.
1. Extraction
a. Downloading Data Files
The pipeline begins by downloading the necessary data files from NOAA’s FTP server. Specifically, it retrieves the ghcnd-stations.txt
file which holds metadata for all stations. .dly
files for the GHCN-Daily dataset will be retrieved from the volume /data/
.
b. Reading Raw Data
Using Apache Spark, the pipeline reads the raw text data into DataFrames. For the .dly
files, each line represents a month's worth of daily data for a single station. The stations.txt
file contains spatial metadata such as latitude and longitude, essential for geospatial queries.
2. Transformation
a. Parsing Fixed-Width Columns
The raw .dly
files are in a fixed-width format, where specific character positions correspond to different data fields. The pipeline utilizes predefined column specifications (MEASURES_COLUMN_SPECS
) to extract relevant fields such as ID
, YEAR
, MONTH
, ELEMENT
, and daily VALUE
fields (e.g., VALUE1
to VALUE31
).
b. Data Type Casting
Once the columns are extracted, the pipeline casts them to appropriate data types as defined in MEASURES_COLUMN_TYPES
. For example, YEAR
and MONTH
are cast to integers, while VALUE
fields are also cast to integers to facilitate numerical operations.
c. Handling Missing Values
In the dataset, missing values are denoted by -9999
. The pipeline identifies these placeholder values and replaces them with None
(null) to ensure data integrity during analysis and storage.
d. Unpivoting Data
The .dly
files contain multiple VALUE
columns representing daily measurements within a month. To normalize the data, the pipeline unpivots these wide-format columns into a long-format DataFrame using Spark's stack
function. This transformation results in a DataFrame where each row represents a single day's measurement, with columns for ID
, DATE
, ELEMENT
, and VALUE
.
e. Date Formatting
The pipeline constructs a DATE
column by combining YEAR
, MONTH
, and DAY
, ensuring that dates are in the yyyy-MM-dd
format. This standardized date format is crucial for time-series analysis and querying.
f. Cleaning Data
Rows with null values in critical columns (VALUE
and DATE
) are filtered out to maintain data quality. The final cleaned DataFrame contains only complete and valid records, ready for loading into the database.
g. Repartitioning for Optimal Load Performance
To optimize the loading process into PostgreSQL, the DataFrame is repartitioned based on the ID
column. This ensures that data is evenly distributed across partitions, leveraging Spark's parallel processing capabilities for faster insertion.
3. Loading
a. Writing to PostgreSQL
The transformed DataFrame is loaded into the PostgreSQL database using JDBC. The pipeline is configured to handle large batch inserts (batchsize
set to 100000
) and manage conflicts gracefully (ON CONFLICT DO NOTHING
). By partitioning the data (numPartitions
set to 20
), the pipeline maximizes throughput and efficiently utilizes CPU resources.
b. Deleting Processed Files
Post-loading, the pipeline deletes the processed raw files to free up storage space and prevent redundant processing in future runs. This cleanup step ensures that the pipeline remains efficient and maintains a clean working environment.
Benefits of the Implemented ETL Process
1. Efficiency in Data Processing
Apache Spark’s in-memory processing significantly reduces the time required to handle large datasets. By distributing the workload across multiple cores and leveraging Spark’s optimized operations, the pipeline processes over 3.4 GB of data swiftly and reliably.
2. Scalability and Flexibility
Deploying the ETL pipeline on Kubernetes ensures that it can scale horizontally to meet increasing data demands. Kubernetes’ orchestration capabilities allow for dynamic resource allocation, enabling the pipeline to handle data spikes without manual intervention.
3. Modularity and Code Reusability
The development of a custom library of ETL functions promotes modularity and reusability. This approach not only simplifies maintenance but also facilitates the extension of the pipeline to accommodate additional data sources or transformation requirements in the future.
4. Seamless Integration with PostgreSQL
The pipeline’s configuration ensures efficient and reliable data loading into PostgreSQL. By optimizing batch sizes and partitioning, the pipeline maximizes data insertion speed while maintaining data integrity and consistency.
5. Geospatial Data Handling for Enhanced Queries
Incorporating spatial metadata (latitude and longitude) into the sensors
table allows for advanced geospatial queries. This capability is essential for generating location-based visualizations and conducting spatial analyses, providing users with deeper insights into climatic patterns.
6. Automation and Error Reduction
Automating the ETL process through scripts minimizes manual intervention, reducing the likelihood of human errors. Additionally, Kubernetes manages the execution environment, ensuring that the pipeline runs consistently across different deployments.
Docker and Kubernetes Configuration
To ensure that our ETL pipeline operates efficiently and can scale as needed, we utilize Docker for containerization and Kubernetes for orchestration. This section provides a detailed explanation of the Dockerfile used to create the Spark environment and the Kubernetes CronJob configuration that manages the ETL jobs.
1. Docker Configuration
Containerizing our ETL pipeline with Docker ensures consistency across different environments and simplifies the deployment process. Below is the Dockerfile used to build our Spark-based ETL image:
# Use the base Spark image provided by Bitnami
FROM bitnami/spark:latest
# Switch to root user to install additional packages
USER root
# Install Python 3, pip, and curl
RUN apt-get update && \
apt-get install -y python3 python3-pip curl && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set the working directory
WORKDIR /app
# Copy requirements.txt and install dependencies
COPY etl/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# Download the PostgreSQL JDBC driver and place it in the Spark JARs directory
RUN mkdir -p /opt/bitnami/spark/jars && \
curl -L -o /opt/bitnami/spark/jars/postgresql-42.6.0.jar https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
# Copy the entire 'etl' folder to the container
COPY etl/ /app/etl/
# Set PYTHONPATH
ENV PYTHONPATH="/app/etl:${PYTHONPATH}"
# Create the '/data' directory and adjust permissions
RUN mkdir /data && \
chown -R 1001:1001 /app /data
# Switch to a non-root user
USER 1001
# Command to execute the ETL based on the ETL_MODE variable
CMD ["sh", "-c", "if [ \"$ETL_MODE\" = 'sensors' ]; then spark-submit etl/pipelines/pipeline_sensors.py; elif [ \"$ETL_MODE\" = 'measurements' ]; then spark-submit etl/pipelines/pipeline.py; else spark-submit etl/pipelines/pipeline_sensors.py && spark-submit etl/pipelines/pipeline.py; fi"]
Explanation of the Dockerfile Components
- Base Image:
The Dockerfile starts with the Bitnami Spark image, which provides a ready-to-use Spark environment optimized for containerization. - User Permissions:
We switch to the root user to install additional packages such as Python 3, pip, and curl. After installing the necessary packages, we switch back to a non-root user (UID 1001) to enhance security. - Working Directory:
The working directory is set to/app
, where the ETL scripts and related files will reside. - Dependency Installation:
Therequirements.txt
file, which lists all Python dependencies, is copied into the container and installed usingpip3
. This ensures that all necessary libraries are available for the ETL scripts to run. - JDBC Driver:
The PostgreSQL JDBC driver is downloaded and placed in Spark’s JAR directory. This driver is essential for enabling Spark to communicate with the PostgreSQL database. - ETL Scripts:
The entireetl
directory, containing all ETL scripts and modules, is copied into the container. ThePYTHONPATH
is set to include the/app/etl
directory, allowing Python to locate and import the ETL modules seamlessly. - Data Directory:
A/data
directory is created to store any input or output data files. Permissions are adjusted to ensure that the non-root user can read and write to this directory. - Entrypoint Command:
TheCMD
directive uses a shell script to determine which ETL pipeline to execute based on theETL_MODE
environment variable. This allows for flexible execution of either the sensors pipeline, the measurements pipeline, or both.
2. Kubernetes Orchestration
Managing the ETL jobs at scale requires a robust orchestration platform. Kubernetes is employed to schedule and manage the ETL containers, ensuring that they run reliably and can scale as needed. Below is the Kubernetes CronJob configuration used to schedule and execute the ETL pipeline:
apiVersion: batch/v1
kind: CronJob
metadata:
name: etl-job-manual
namespace: noaa-analytics
spec:
schedule: "0 0 31 2 *" # Run the job manually
jobTemplate:
spec:
template:
metadata:
labels:
app: etl
spec:
containers:
- name: etl
image: mrcedee/etl:latest
imagePullPolicy: Always
env:
- name: DATABASE_URL
valueFrom:
configMapKeyRef:
name: noaa-config
key: DATABASE_URL
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: timescaledb-secret
key: POSTGRES_USER
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: timescaledb-secret
key: POSTGRES_PASSWORD
- name: ETL_MODE
value: "both" # Switch between "measurements" and "sensors" as needed
volumeMounts:
- name: data-volume
mountPath: /data
resources:
requests:
cpu: "6" # Minimum CPU request
memory: "6Gi" # Minimum memory request
limits:
cpu: "10" # Maximum CPU limit
memory: "10Gi" # Maximum memory limit
volumes:
- name: data-volume
hostPath:
path: /run/desktop/mnt/host/c/Users/marco/Escritorio/ghcnd_all/Weather/data/new
type: Directory
restartPolicy: OnFailure
Explanation of the Kubernetes CronJob Configuration
- Metadata:
The CronJob is namedetl-job-manual
and is placed within thenoaa-analytics
namespace, isolating it from other workloads. - Schedule:
The schedule is set to"0 0 31 2 *"
, which is an invalid date (February 31st) and effectively disables automatic scheduling. This configuration is intended for manual triggering of the ETL job as needed. - Job Template:
ThejobTemplate
section defines the pod template that Kubernetes will use to create jobs. It includes metadata labels to identify the pod as part of the ETL application. - Containers:
- Name and Image:
The container is namedetl
and uses themrcedee/etl:latest
image, which is built from the aforementioned Dockerfile. - Image Pull Policy:
Set toAlways
to ensure that the latest image is pulled each time the job runs. - Environment Variables:
DATABASE_URL
: Retrieved from thenoaa-config
ConfigMap, ensuring that database connection details are managed centrally.POSTGRES_USER
andPOSTGRES_PASSWORD
: Retrieved from thetimescaledb-secret
Secret, securing sensitive credentials.ETL_MODE
: Set to"both"
to execute both the sensors and measurements pipelines. This can be adjusted to"measurements"
or"sensors"
as needed.- Volume Mounts:
Thedata-volume
is mounted at/data
inside the container, allowing the ETL scripts to access the necessary data files. - Resource Requests and Limits:
- Requests:
The job requests a minimum of 6 CPUs and 6Gi of memory, ensuring sufficient resources for processing. - Limits:
The job is limited to a maximum of 10 CPUs and 10Gi of memory to prevent resource exhaustion. - Volumes:
Thedata-volume
uses ahostPath
to mount the directory containing the raw data files. This allows the containerized ETL scripts to access the data stored on the host machine. - Restart Policy:
Set toOnFailure
to automatically retry the job if it fails, enhancing reliability.
3. Deploying the ETL Job
To deploy the CronJob configuration to your Kubernetes cluster, use the following kubectl
command:
kubectl apply -f deployment/kubernetes/etl/job.yaml
This command applies the CronJob configuration defined in job.yaml
, setting up the ETL job within the noaa-analytics
namespace.
Manually Triggering the ETL Job
Since the CronJob is configured with an invalid schedule to prevent automatic execution, you can manually trigger the ETL job using the following command:
kubectl create job --from=cronjob/etl-job-manual etl-job-manual-run-1 -n noaa-analytics
Explanation:
kubectl create job
:
This command creates a new Kubernetes job based on an existing CronJob.--from=cronjob/etl-job-manual
:
Specifies that the new job should be created from theetl-job-manual
CronJob.etl-job-manual-run-1
:
Assigns a unique name to the newly created job.-n noaa-analytics
:
Indicates that the job should be created within thenoaa-analytics
namespace.
By executing this command, you manually initiate the ETL process, allowing you to control when data processing occurs.
Conclusion
The development of a scalable and efficient ETL pipeline is a critical component in transforming massive raw datasets into actionable insights. Building upon the project’s initial presentation and the robust Kubernetes infrastructure, this ETL pipeline leverages Apache Spark’s distributed processing capabilities to handle over 31 GB of NOAA sensor data effectively. By meticulously designing each stage of the ETL process — from extraction and transformation to loading — we ensure data integrity, scalability, and high performance.
Moreover, the integration of spatial metadata from sensor information empowers advanced geospatial analyses, enabling location-based queries and enhancing the overall utility of the dataset for comprehensive climatic studies. Deploying the ETL pipeline within a Kubernetes-managed environment further ensures that the system remains flexible, resilient, and capable of handling increasing data demands seamlessly.
The utilization of Docker for containerization and Kubernetes for orchestration brings numerous benefits, including consistency across environments, automated scaling, efficient resource management, fault tolerance, enhanced security, and streamlined deployment processes. This integration not only optimizes the performance and scalability of the ETL pipeline but also simplifies maintenance and future enhancements.
By adopting this robust infrastructure, we can confidently manage and process large volumes of climatic data, paving the way for accurate predictions and insightful visualizations. As we continue to refine and expand this ETL pipeline, future enhancements will include integrating more sophisticated machine learning models for predictive analytics, implementing real-time data ingestion, and bolstering security measures to protect sensitive information. These advancements will solidify the pipeline’s role as a cornerstone in environmental data analysis, driving informed decision-making and strategic planning.
Stay tuned for upcoming articles where we will delve deeper into Data Base System, optimize geospatial queries, and integrate AI-driven predictions to further enrich our visualizations and analytical capabilities.