# 노트북에서 예측 모델 학습 파이프라인 생성하기

In [1]:
import os
import requests
import pandas as pd
from kakaocloud_kbm import KbmPipelineClient
import kfp
from kfp import components
import kfp.dsl as dsl
import kfp.compiler as compiler
from kubernetes.client import (
    V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement
)
from typing import NamedTuple

In [2]:
# KBM Kubeflow SDK
os.environ["KUBEFLOW_HOST"] = "https://{{ KUBEFLOW 도메인 또는 사설 IP}}"
os.environ["KUBEFLOW_USERNAME"] = "{{ KUBEFLOW 계정 이메일 }}"
os.environ["KUBEFLOW_PASSWORD"] = "{{ KUBEFLOW 계정 비밀번호 }}"

client = KbmPipelineClient()

In [3]:
# Variables
COMPONENT_PATH = 'components'
TRAIN_PATH = os.path.join(COMPONENT_PATH, 'nyc_taxi_cpu_train')
TRAIN_CR_IMAGE = "bigdata-150.kr-central-2.kcr.dev/kc-kubeflow/kmlp-pytorch:1.0.0.py36.cpu"

## 파이프라인 컴포넌트 빌드하기

In [4]:
%%bash -s "{TRAIN_PATH}"

mkdir -p ${1}
echo ${1}

components/nyc_taxi_cpu_train


### 데이터 수집 컴포넌트

In [5]:
%%writefile {TRAIN_PATH}/nyc_taxi_dataset_component.yaml
name: NYC Taxi Fare Dataset
description: |
  NYC Taxi Fare dataset: https://www.kaggle.com/competitions/new-york-city-taxi-fare-prediction/data
metadata:
  annotations:
    author: KakaoCloud Bigdata <bigdata.platform@kakaoenterprise.com>
inputs:
- {name: kc_kbm_os_train_url, type: String, default: 'https://objectstorage.kr-central-1.kakaoi.io/v1/c745e6650f0341a68bb73fa222e88e9b/kbm-files/guide_docs%2Fhands_on%2Fnyc_taxi_fare%2Fdata%2Ftrain.csv'}
outputs:
- {name: dataset, description: 'Result type depends on format. CSV and TSV have header.'}
implementation:
  container:
    image: curlimages/curl
    command:
    - sh
    - -c
    - |
      set -e -x -o pipefail
      output_path="$0"
      mkdir -p "$output_path"
      curl "$1" --output "$output_path/train.csv"
    - {outputPath: dataset}
    - {inputValue: kc_kbm_os_train_url}


Overwriting components/nyc_taxi_cpu_train/nyc_taxi_dataset_component.yaml


### 예측 모델 학습 컴포넌트

In [6]:
def train_Pytorch_Tabular_Model(
    training_data_path: components.InputPath(str),
    model_path: components.OutputPath(str),
    mlpipeline_ui_metadata: components.OutputPath("UI_Metadata"),
    train_data_fnm: str,
    epoch_num: str
):
    import os
    import pathlib
    import torch 
    import torch.nn as nn
    import pandas as pd
    import numpy as np

    pd.options.display.max_columns = None
    
    device = torch.device('cpu')

    if torch.cuda.is_available():
        device = torch.device('cuda')
        print("Train on GPU.")
    else:
        print("No cuda available")

    print(training_data_path)
    print(os.listdir(training_data_path))
    
    pathlib.Path(model_path).mkdir(parents=True, exist_ok=True)
    pathlib.Path(mlpipeline_ui_metadata).parent.mkdir(parents=True, exist_ok=True)

    df = pd.read_csv(os.path.join(training_data_path, train_data_fnm))
    print(df)

    def haversine_distance(df, lat1, long1, lat2, long2):
        """
        Calculates the haversine distance between 2 sets of GPS coordinates in df
        """
        r = 6371  # average radius of Earth in kilometers

        phi1 = np.radians(df[lat1])  # converting the longitude and latidtude into numpy radians
        phi2 = np.radians(df[lat2])

        delta_phi = np.radians(df[lat2]-df[lat1])
        delta_lambda = np.radians(df[long2]-df[long1])

        a = np.sin(delta_phi/2)**2 + np.cos(phi1) * np.cos(phi2) * np.sin(delta_lambda/2)**2
        c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
        d = (r * c) # in kilometers

        return d

    df['distance_km'] = haversine_distance(df, 'pickup_latitude', 'pickup_longitude','dropoff_latitude','dropoff_longitude')
    df.drop(columns=['key'], inplace=True, errors='ignore')

    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])

    df['edtdate'] = df['pickup_datetime'] - pd.Timedelta(hours=4)

    df['Hour'] = df['edtdate'].dt.hour
    df['am_or_pm'] = np.where(df['Hour']<12, 'am', 'pm')
    df['weekday'] = df['edtdate'].dt.strftime("%a")

    print(df.columns)
    print(df.info())

    cat_cols = ['Hour', 'am_or_pm', 'weekday']
    cont_cols = ['pickup_longitude',
           'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
           'passenger_count', 'distance_km']
    y_col = ['fare_amount']


    for cat in cat_cols:
        df[cat] = df[cat].astype('category')

    hr = df['Hour'].cat.codes.values
    am_pm = df['am_or_pm'].cat.codes.values
    wkdy = df['weekday'].cat.codes.values

    cats = np.stack([hr,am_pm,wkdy], axis=1)
    cats = torch.tensor(cats, dtype=torch.int64)

    conts = np.stack([df[col].values for col in cont_cols], axis=1)
    conts = torch.tensor(conts, dtype=torch.float)

    print(conts.shape)

    y = torch.tensor(df[y_col].values, dtype=torch.float).reshape(-1,1)
    print(y.shape)

    cat_sizes = [len(df[col].cat.categories) for col in cat_cols]
    emb_sizes = [(size, min(50,(size+1)//2)) for size in cat_sizes]
    selfembeds = nn.ModuleList([nn.Embedding(ni, nf) for ni,nf in emb_sizes])

    class TabularModel(nn.Module):

        def __init__(self, emb_sizes, n_cont, out_szs, layers, p=0.5):
            super().__init__()

            self.embeds = nn.ModuleList([nn.Embedding(ni, nf) for ni,nf in emb_sizes])
            self.emb_drop = nn.Dropout(p)
            self.bn_cont = nn.BatchNorm1d(n_cont)

            layer_list = []
            n_emb = sum([nf for ni,nf in emb_sizes])
            n_in = n_emb + n_cont

            for i in layers:
                layer_list.append(nn.Linear(n_in, i))
                layer_list.append(nn.ReLU(inplace=True))
                layer_list.append(nn.BatchNorm1d(i))
                layer_list.append(nn.Dropout(p))
                n_in = i

            layer_list.append(nn.Linear(layers[-1], out_szs))

            self.layers = nn.Sequential(*layer_list)

        def forward(self, x_cat, x_cont):
            embeddings = []

            for i,e in enumerate(self.embeds):
                embeddings.append(e(x_cat[:,i]))

            x = torch.cat(embeddings, 1)
            x = self.emb_drop(x)

            x_cont = self.bn_cont(x_cont)
            x = torch.cat([x,x_cont], 1)
            x = self.layers(x)

            return x

    torch.manual_seed(33)
    model = TabularModel(emb_sizes, conts.shape[1], 1,[200,100], p=0.4)

    print(model)

    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

    batch_size=60000
    test_size = int(batch_size*0.2)

    cat_train = cats[:batch_size-test_size]
    cat_test = cats[batch_size-test_size:batch_size]
    con_train = conts[:batch_size-test_size]
    con_test = conts[batch_size-test_size:batch_size]

    y_train=y[:batch_size-test_size]
    y_test = y[batch_size-test_size:batch_size]

    torch.save(con_test, os.path.join(model_path, 'con_test.pt'))
    torch.save(cat_test, os.path.join(model_path, 'cat_test.pt'))
    torch.save(y_test, os.path.join(model_path, 'y_test.pt'))
    
    import time

    start_time = time.time()

    final_losses = []

    for epochs in range(int(epoch_num)):
        optimizer.zero_grad()
        y_pred = model(cat_train, con_train)
        loss = torch.sqrt(criterion(y_pred, y_train))
        final_losses.append(loss)
        loss.backward()
        optimizer.step()
        print(f"Epoch {epochs+1}, loss: {loss.item()}")

    duration = time.time() - start_time
    print(f"Training took {duration/60} minutes")
    
    torch.save(model.state_dict(), os.path.join(model_path, 'model.pt'))

    
    import base64
    import json
    from io import BytesIO

    import matplotlib.pyplot as plt

    plt.plot(range(int(epoch_num)), final_losses)

    tmpfile = BytesIO()
    plt.savefig(tmpfile, format="png")
    encoded = base64.b64encode(tmpfile.getvalue()).decode("utf-8")

    html = f"<img src='data:image/png;base64,{encoded}'>"
    metadata = {
        "outputs": [
            {
                "type": "web-app",
                "storage": "inline",
                "source": html,
            },
        ],
    }
        
    with open(mlpipeline_ui_metadata, "w") as html_writer:
        json.dump(metadata, html_writer)
        

### 모델 검증 컴포넌트

In [7]:
def evaluate_Pytorch_Tabular_Model(
    model_data_path: components.InputPath(str),
    mlpipeline_ui_metadata: components.OutputPath("UI_Metadata")
):
    import os
    import torch 
    import torch.nn as nn
    import pandas as pd
    import numpy as np
    import json
    import pathlib
    
    device = torch.device('cpu')

    if torch.cuda.is_available():
        device = torch.device('cuda')
        print("Train on GPU.")
    else:
        print("No cuda available")

    print(model_data_path)
    print(os.listdir(model_data_path))

    class TabularModel(nn.Module):

        def __init__(self, emb_sizes, n_cont, out_szs, layers, p=0.5):
            super().__init__()

            self.embeds = nn.ModuleList([nn.Embedding(ni, nf) for ni,nf in emb_sizes])
            self.emb_drop = nn.Dropout(p)
            self.bn_cont = nn.BatchNorm1d(n_cont)

            layer_list = []
            n_emb = sum([nf for ni,nf in emb_sizes])
            n_in = n_emb + n_cont

            for i in layers:
                layer_list.append(nn.Linear(n_in, i))
                layer_list.append(nn.ReLU(inplace=True))
                layer_list.append(nn.BatchNorm1d(i))
                layer_list.append(nn.Dropout(p))
                n_in = i

            layer_list.append(nn.Linear(layers[-1], out_szs))

            self.layers = nn.Sequential(*layer_list)

        def forward(self, x_cat, x_cont):
            embeddings = []

            for i,e in enumerate(self.embeds):
                embeddings.append(e(x_cat[:,i]))

            x = torch.cat(embeddings, 1)
            x = self.emb_drop(x)

            x_cont = self.bn_cont(x_cont)
            x = torch.cat([x,x_cont], 1)
            x = self.layers(x)

            return x


    criterion = nn.MSELoss()
    emb_szs = [(24, 12), (2, 1), (7, 4)]
    model = TabularModel(emb_szs, 6, 1, [200,100], p=0.4)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    
#     checkpoint = torch.load(os.path.join(model_data_path, 'taxi_model_pytorch.pt'), map_location=torch.device(device))
    
#     model.load_state_dict(checkpoint['model_state_dict'])
#     optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    
#     epoch = checkpoint['epoch']
#     loss = checkpoint['loss']
    
    model.load_state_dict(torch.load(os.path.join(model_data_path, 'model.pt')))
    model.to(device)
    
    con_test = torch.load(os.path.join(model_data_path, 'con_test.pt'), map_location=torch.device(device))
    print("con_test", con_test)
    cat_test = torch.load(os.path.join(model_data_path, 'cat_test.pt'), map_location=torch.device(device))
    print("cat_test", cat_test)
    y_test = torch.load(os.path.join(model_data_path, 'y_test.pt'), map_location=torch.device(device))
    print("y_test", y_test)

    
    model.eval()
    ### Evaluating our model on the test set
    with torch.no_grad():
        y_val = model(cat_test, con_test)
        loss = torch.sqrt(criterion(y_val, y_test))

    source_lst = [["PREDICTED VALUES", "TRUE VALUES", "DIFF"]]
    for i in range(20):
        diff = np.abs(y_val[i].item()-y_test[i].item())
        print(f'PREDICTED VALUES : {y_val[i].item():8.4f} TRUE VALUES : {y_test[i].item():8.2f} DIFF : {diff:8.2f}')
        source_lst.append([y_val[i].item(), y_test[i].item(), diff])
        
    metadata = {
        "outputs": [
            {
                "type": "table",
                "storage": "inline",
                'format': 'csv',
                'header': source_lst[0],
                "source": pd.DataFrame(source_lst[1:], columns=source_lst[0]).to_csv(header=False, index=False),
            },
        ],
    }
    
    pathlib.Path(mlpipeline_ui_metadata).parent.mkdir(parents=True, exist_ok=True)
    with open(mlpipeline_ui_metadata, "w") as html_writer:
        json.dump(metadata, html_writer)
        

## 파이프라인 생성

In [8]:
nyc_taxi_dataset_op = components.load_component_from_file(f"{TRAIN_PATH}/nyc_taxi_dataset_component.yaml")

torch_train_on_csv_op = components.create_component_from_func(
    train_Pytorch_Tabular_Model, 
    output_component_file=f'{TRAIN_PATH}/test_train_component.yaml',
    base_image=TRAIN_CR_IMAGE
)

evaluate_eval_op = components.create_component_from_func(
    evaluate_Pytorch_Tabular_Model, 
    output_component_file=f'{TRAIN_PATH}/test_evaluate_component.yaml',
    base_image=TRAIN_CR_IMAGE
)

In [9]:
@dsl.pipeline(
  name='NYC Taxi Pytorch ML pipeline',
  description='An example pipeline.'
)
def nyc_taxi_pytorch_pipeline_w_cpu(
    kc_kbm_os_train_url: str = 'https://objectstorage.kr-central-1.kakaoi.io/v1/c745e6650f0341a68bb73fa222e88e9b/kbm-files/guide_docs%2Fhands_on%2Fnyc_taxi_fare%2Fdata%2Ftrain.csv',
    train_data_fnm: str = "train.csv",
    epoch_num: str = "100"
):
    training_data = nyc_taxi_dataset_op(
        kc_kbm_os_train_url=kc_kbm_os_train_url
    ).set_cpu_request(cpu="1").set_memory_request(memory="2G")

    model_trained_on_csv = torch_train_on_csv_op(
        training_data.output,
        train_data_fnm,
        epoch_num
    )
    model_trained_on_csv.set_cpu_request(cpu="2").set_memory_request(memory="8G")
    
    eval_model = evaluate_eval_op(
        model_trained_on_csv.output
    )
    eval_model.set_cpu_request(cpu="2").set_memory_request(memory="8G")

In [10]:
experiment_name = nyc_taxi_pytorch_pipeline_w_cpu.__name__ + ' test experiment'

run_name = nyc_taxi_pytorch_pipeline_w_cpu.__name__ + ' run'

arguments = {
    "epoch_num": "100"
}

client.create_run_from_pipeline_func(
    nyc_taxi_pytorch_pipeline_w_cpu, 
    experiment_name=experiment_name, 
    run_name=run_name, 
    arguments=arguments
)