As mentioned in the architecture flow mage does the heavy lifting of orchestrating and ETL process of it. Incase of Mage we have three major components
Below are the steps to create a data Loader to load data from postgres sql. Go to mage data loader tab as shown in the below and click on python and you will find the postgres sql option and click on it.
Below is the code snippet from the use case for loading one of the table from the postgres sql.
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_postgres(*args, **kwargs):
"""
Template for loading data from a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: <https://docs.mage.ai/design/data-loading#postgresql>
"""
query = 'SELECT * FROM public.departments' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(df, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert df is not None, 'The output is undefined'
Below are the steps to create a data loader to load data from csv. Go to mage data loader tab as shown in the below and click on python and you will find the generic option and click on it.
Below is the code snippet from the use case for loading one of the table from the local csv.
import io
import pandas as pd
import requests
from pandas import DataFrame
from pyspark.sql.types import StructType, StructField, StringType,TimestampType, IntegerType
from pyspark.sql import SparkSession
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
spark = SparkSession.builder.getOrCreate()
@data_loader
def load_data(*args, **kwargs):
"""
Template code for loading data from any source.
Returns:
Anything
"""
# Specify your data loading logic here
order_items_schema = "order_item_id integer,order_item_order_id integer,order_item_product_id integer,order_item_quantity integer,order_item_subtotal float,order_item_product_price float"
order_items = spark.read.format('csv').schema(order_items_schema).load('/Users/jafarshaik/Downloads/order_items.csv')
return order_items
@test
def test_output(df, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert df is not None, 'The output is undefined'
Below are the steps to create a data transformer to load transform the data that has been extracted from different sources. Go to mage data transformer tab as shown in the below and click on python and you will find the generic option and click on it. As you can see in the below screen you will find lot of data-connector options that mage provides.
Below is the code snippet from the use case for transforming all the tables extracted from different sources. Here we are removing duplicate values from all the tables.
from pandas import DataFrame
import math
from pyspark.sql import SparkSession
from mage_ai.data_preparation.variable_manager import get_variable
from pyspark.sql.functions import *
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
def transform_helper(df):
return df.drop_duplicates()
data_list =['load_product_data','load_customer_data','load_categories_data','load_departments','load_orders_data','load_orders_list_data']
spark = SparkSession.builder.getOrCreate()
@transformer
def transform_df(df, *args, **kwargs):
"""
Template code for a transformer block.
Add more parameters to this function if this block has multiple parent blocks.
There should be one parameter for each output variable from each parent block.
Args:
df (DataFrame): Data frame from parent block.
Returns:
DataFrame: Transformed data frame
"""
# Specify your transformation logic here
results = []
for data in data_list:
if data in ('load_departments','load_categories_data'):
print(data)
df = get_variable('retail_pipeline', data, 'output_0', variable_type='dataframe')
df=spark.createDataFrame(df)
else:
df = get_variable('retail_pipeline', data, 'output_0', variable_type='spark_dataframe', spark=spark)
results.append(transform_helper(df))
return results
@test
def test_output(df,*args, **kwargs) -> None:
"""
Template code for testing the output of the block.
"""
assert df is not None, 'The output is undefined'
Below are the steps to create a data exporter to load the data that has been extracted and transformed from different sources. Go to mage data exporter tab as shown in the below and click on python and you will find the snowflake and click on it. As you can see in the below screen you will find lot of data-connector options that mage provides.
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from pandas import DataFrame
from os import path
from mage_ai.data_preparation.variable_manager import get_variable
from pyspark.sql import SparkSession
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
spark = SparkSession.builder.getOrCreate()
def fix_date_cols(df, tz='UTC'):
cols = df.select_dtypes(include=['datetime64[ns]']).columns
for col in cols:
df[col] = df[col].dt.tz_localize(tz)
return df
@data_exporter
def export_data_to_snowflake(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a Snowflake warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: <https://docs.mage.ai/design/data-loading>
"""
data = ['products','customers','categories','departments','orders','order_items']
for i in range(0,6):
table_name = data[i]
df = get_variable('example_pipeline', 'drop_duplicates', 'output_'+str(i), spark=spark)
df = df.toPandas()
if table_name in ('orders'):
print(table_name)
df = fix_date_cols(df)
else:
df =df
database = 'DWBI_PROJECT'
schema = 'TITANIC_CLEAN'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Snowflake.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
table_name,
database,
schema,
if_exists='replace', # Specify resolution policy if table already exists
)
After running the above code you should be able to see the below log that your tables has been loaded into snowflake
Additional Points on the Mage:
For more Details about the Mage go through the mage official link https://docs.mage.ai/introduction/overview