1. Extract, Transform and Load

As mentioned in the architecture flow mage does the heavy lifting of orchestrating and ETL process of it. Incase of Mage we have three major components

  1. Data Loader: The data loader component is responsible for extracting data from the source. It can read data from various sources, including databases, files, APIs, and message queues. The data loader can also filter and select specific data based on criteria such as time ranges or other data attributes. In this tutorial, we are going to export it from postgres sql and from csv File.

Below are the steps to create a data Loader to load data from postgres sql. Go to mage data loader tab as shown in the below and click on python and you will find the postgres sql option and click on it.

Screenshot 2023-02-18 at 3.27.00 PM.png

Below is the code snippet from the use case for loading one of the table from the postgres sql.

from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_postgres(*args, **kwargs):
    """
    Template for loading data from a PostgreSQL database.
    Specify your configuration settings in 'io_config.yaml'.

    Docs: <https://docs.mage.ai/design/data-loading#postgresql>
    """
    query = 'SELECT * FROM public.departments'  # Specify your SQL query here
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        return loader.load(query)

@test
def test_output(df, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert df is not None, 'The output is undefined'

Below are the steps to create a data loader to load data from csv. Go to mage data loader tab as shown in the below and click on python and you will find the generic option and click on it.

Screenshot 2023-02-18 at 3.27.00 PM.png

Below is the code snippet from the use case for loading one of the table from the local csv.

import io
import pandas as pd
import requests
from pandas import DataFrame
from pyspark.sql.types import StructType, StructField, StringType,TimestampType, IntegerType
from pyspark.sql import SparkSession
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

spark = SparkSession.builder.getOrCreate()

@data_loader
def load_data(*args, **kwargs):
    """
    Template code for loading data from any source.

    Returns:
        Anything
    """
    # Specify your data loading logic here

    order_items_schema = "order_item_id integer,order_item_order_id integer,order_item_product_id integer,order_item_quantity integer,order_item_subtotal float,order_item_product_price float"

    order_items = spark.read.format('csv').schema(order_items_schema).load('/Users/jafarshaik/Downloads/order_items.csv')
    return order_items

@test
def test_output(df, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert df is not None, 'The output is undefined'
  1. Data Transformer: The data transformation component processes the data that has been exported from the source. It can perform a variety of operations on the data, such as cleaning, filtering, aggregation, and joining. The data transformation can also perform advanced data processing tasks, such as running machine learning models, performing natural language processing, and applying statistical algorithms.

Below are the steps to create a data transformer to load transform the data that has been extracted from different sources. Go to mage data transformer tab as shown in the below and click on python and you will find the generic option and click on it. As you can see in the below screen you will find lot of data-connector options that mage provides.

Screenshot 2023-02-18 at 3.35.27 PM.png

Below is the code snippet from the use case for transforming all the tables extracted from different sources. Here we are removing duplicate values from all the tables.

from pandas import DataFrame
import math
from pyspark.sql import SparkSession
from mage_ai.data_preparation.variable_manager import get_variable
from pyspark.sql.functions import *

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

def transform_helper(df):
    return df.drop_duplicates()

data_list =['load_product_data','load_customer_data','load_categories_data','load_departments','load_orders_data','load_orders_list_data']
spark = SparkSession.builder.getOrCreate()

@transformer
def transform_df(df, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        df (DataFrame): Data frame from parent block.

    Returns:
        DataFrame: Transformed data frame
    """

    
    # Specify your transformation logic here
    results = []
    for data in data_list:
        if data in ('load_departments','load_categories_data'):
            print(data)
            df = get_variable('retail_pipeline', data, 'output_0', variable_type='dataframe')
            df=spark.createDataFrame(df) 
        else:
            df = get_variable('retail_pipeline', data, 'output_0', variable_type='spark_dataframe', spark=spark)
        results.append(transform_helper(df))
    return results

@test
def test_output(df,*args, **kwargs) -> None:

    """
    Template code for testing the output of the block.
    """
    assert df is not None, 'The output is undefined'
  1. Data Exporter: The data exporter component is responsible for loading the transformed data into the destination. It can export the data into various destinations, including databases, data warehouses, and data lakes. The data export can also perform various data loading operations, such as upserts, appends, and deletes.

Below are the steps to create a data exporter to load the data that has been extracted and transformed from different sources. Go to mage data exporter tab as shown in the below and click on python and you will find the snowflake and click on it. As you can see in the below screen you will find lot of data-connector options that mage provides.

Screenshot 2023-02-18 at 3.47.28 PM.png

from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from pandas import DataFrame
from os import path
from mage_ai.data_preparation.variable_manager import get_variable
from pyspark.sql import SparkSession

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

spark = SparkSession.builder.getOrCreate()
def fix_date_cols(df, tz='UTC'):
    cols = df.select_dtypes(include=['datetime64[ns]']).columns
    for col in cols:
        df[col] = df[col].dt.tz_localize(tz)
    return df

@data_exporter
def export_data_to_snowflake(df: DataFrame, **kwargs) -> None:
    """
    Template for exporting data to a Snowflake warehouse.
    Specify your configuration settings in 'io_config.yaml'.

    Docs: <https://docs.mage.ai/design/data-loading>
    """
    data = ['products','customers','categories','departments','orders','order_items']
    for i in range(0,6):
        table_name = data[i]
        df = get_variable('example_pipeline', 'drop_duplicates', 'output_'+str(i), spark=spark)
        df = df.toPandas()
        if table_name in ('orders'):
            print(table_name)
            df = fix_date_cols(df)
        else:
            df =df

        database = 'DWBI_PROJECT'
        schema = 'TITANIC_CLEAN'
        config_path = path.join(get_repo_path(), 'io_config.yaml')
        config_profile = 'default'
        with Snowflake.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
            loader.export(
            df,
            table_name,
            database,
            schema,
            if_exists='replace',  # Specify resolution policy if table already exists
        )

After running the above code you should be able to see the below log that your tables has been loaded into snowflake

Screenshot 2023-02-19 at 9.45.40 AM.png

Additional Points on the Mage:

  1. All the config related to the databases either it can be stored in io_config.yaml file or secrets section. In this use case we are getting it from io_config.yaml
  2. We can schedule the pipelines to run daily and have triggers to trigger one pipeline based on the result of another pipeline.
  3. We have other options to view the flow of the pipeline, to run the basic data exploration charts and to install the packages on the fly. It will be on the right section as shown below images

Screenshot 2023-02-18 at 3.52.34 PM.png

Screenshot 2023-02-18 at 3.53.08 PM.png

Screenshot 2023-02-18 at 3.53.56 PM.png

For more Details about the Mage go through the mage official link https://docs.mage.ai/introduction/overview