study/Machine Learning

NBC (Naive Bayes Classification 구현 (only numpy)

ys_cs17 2022. 7. 6. 16:28
반응형

CODE

(1) main.py

import numpy as np
import warnings

from dataloader import DataLoader
from model import get_GaussianNBC, predict, get_ACC

warnings.filterwarnings("ignore", category=RuntimeWarning)

if __name__ == "__main__":
    train_path = './data/train/'
    test_path = './data/test/'

    train_setting = DataLoader(train_path, 'train')  # 60000 * 28 * 28 * 3
    test_setting = DataLoader(test_path, 'test')  # 10000 * 28 * 28 * 3

    train_data, train_label = train_setting.set_data()
    test_data, test_label = test_setting.set_data()
    test_data = test_data.reshape(10000, -1)
    means_by_classes, stdev_by_classes = get_GaussianNBC(train_data, train_label, 'normal')

    means_by_classes = means_by_classes.reshape(10, -1)  # 10 * 784
    stdev_by_classes = stdev_by_classes.reshape(10, -1)  # 10 * 784

    pred_classes = predict(means_by_classes, stdev_by_classes, test_data, 'normal')

    pred_classes = np.array(pred_classes)
    normal_acc = get_ACC(pred_classes, test_label)

    # -------------------online_learning_part-------------------
    means_by_classes, stdev_by_classes = get_GaussianNBC(train_data, train_label, 'online')
    means_by_classes = means_by_classes.reshape(10, -1)  # 10 * 784
    stdev_by_classes = stdev_by_classes.reshape(10, -1)  # 10 * 784

    pred_classes = predict(means_by_classes, stdev_by_classes, test_data, 'online')

    pred_classes = np.array(pred_classes)
    online_acc = get_ACC(pred_classes, test_label)

    print("Normal version ACC: {:.3f}%".format(normal_acc))
    print("Online batch version ACC: {:.3f}%".format(online_acc))

메인에 대한 코드입니다.

각 train, test에 대한 데이터의 path를 저장한 후 Dataloader class를 통해 데이터에 대한 로더를 진행합니다. set_data() 메서드를 통해 min max normalization을 진행합니다.

test data의 경우, 연산의 용이를 위해 flatten()과 마찬가지로 shape을 변경하였습니다.

get_GaussianNBC를 통해 각 class 별 평균 및 표준 편차를 구합니다.

이에 대한 shape을 10 * 784로 바꾸고, predict에 넣어 줌으로써 예측을 진행합니다.

최종적으로 get_ACC를 통해 정확도를 구합니다.

일반적인 학습이 끝난 후 online 학습을 진행한 후 코드는 종료됩니다.

(2) Dataloader.py

def __init__(self, path, mode):
        self.file_list = os.listdir(path)
        self.N = len(self.file_list)
        self.file_numpy = np.zeros(self.N)
        self.mode = mode
        self.name_form = './data/{}/image_idx-{}_label-{}.jpg'

        for file in self.file_list:
            split_file = file.split('-')
            idx = int(split_file[1].split('_')[0])
            label = int(split_file[2].split('.')[0])
            self.file_numpy[idx] = label

Dataloader의 생성자입니다.

데이터의 경로, 데이터의 개수, 데이터의 모드 (train, test 구별), name form을 멤버 변수로 저장합니다.

file_numpy를 데이터 개수만큼 선언하여 데이터를 넘 파이로 변환합니다.

각 데이터들은 for 문을 통해 file_numpy에 인덱스 별로 저장됩니다.

idx 및 label은 데이터 이름을 통해 추출하여 분류합니다.

def set_data(self):
        data = []
        labels = []
        for i in range(self.N):
            image_name = self.name_form.format(self.mode, i, int(self.file_numpy[i]))
            x = cv.imread(image_name, 0)
            x = self.min_max_norm(x)
            data.append(x)
            labels.append(int(self.file_numpy[i]))

        data = np.array(data)
        labels = np.array(labels)

        return data, labels  # return data, label  N * 28 * 28 * 3, N * 1

데이터를 최종적으로 세팅을 하는 메서드입니다.

기존 생성자 코드를 통해 데이터에 대한 분류를 진행하였고, 해당 메서드를 통해서는 opencv에서 제공하는 imread를 통해 이미지 데이터를 읽고, min max norm을 진행한 후 넘 파이 변수에 저장합니다.

최종적으로 데이터와 label을 따로 return 합니다.

최종적인 데이터의 shape는 주석과 동일합니다.

def min_max_norm(self, data):
        max_val = np.max(data)
        min_val = np.min(data)

        data = (data - min_val) / (max_val - min_val)

        return data

데이터를 넘파이 배열에 담기 전 normalization을 진행하는 메서드입니다.

min max normalization의 수식은 다음과 같습니다.

$$ x_{normalized} = \frac{x - x_{min}}{x_{max}-x_{min}} $$

각 넘파이의 배열을 통해 한꺼번에 연산을 진행합니다.

(3) model.py

def get_GaussianNBC(train_samples, train_labels, mode):  # Data categorize by label
    class_0_samples = []
    class_1_samples = []
    class_2_samples = []
    class_3_samples = []
    class_4_samples = []
    class_5_samples = []
    class_6_samples = []
    class_7_samples = []
    class_8_samples = []
    class_9_samples = []

    for k in range(len(train_samples)):
        sample = train_samples[k]  # 1 * D
        label = train_labels[k]  # 1 * 1

        if label == 0:
            class_0_samples.append(sample)

        elif label == 1:
            class_1_samples.append(sample)

        elif label == 2:
            class_2_samples.append(sample)

        elif label == 3:
            class_3_samples.append(sample)

        elif label == 4:
            class_4_samples.append(sample)

        elif label == 5:
            class_5_samples.append(sample)

        elif label == 6:
            class_6_samples.append(sample)

        elif label == 7:
            class_7_samples.append(sample)

        elif label == 8:
            class_8_samples.append(sample)

        elif label == 9:
            class_9_samples.append(sample)

    samples_by_classes = [  # N * D
        class_0_samples,
        class_1_samples,
        class_2_samples,
        class_3_samples,
        class_4_samples,
        class_5_samples,
        class_6_samples,
        class_7_samples,
        class_8_samples,
        class_9_samples
    ]

    numOf_classes = 10
    means_by_classes = []
    stdev_by_classes = []
    np_samples_by_classes = np.array(samples_by_classes)  # 10 * 6000 * 28 * 28

get_GaussianNBC는 각 클래스의 평균 및 표준 편차를 계산하여 return 하는 코드입니다.

위 코드를 통해 총 10개의 class로 구성된 데이터를 나누어 sample_by_classes 리스트에 저장합니다.

이를 넘 파이를 사용하기 위해 np_samples_by_classes에 저장합니다.

이후 코드는 기존 학습과 online 학습으로 나뉘게 됩니다.

기존 학습 방법에 대해 먼저 살펴보겠습니다.

#  ------------------- normal learning -------------------
    else:
        for C in range(numOf_classes):
            means = []
            stdevs = []

            for features in zip(*samples_by_classes[C]):
                means.append(np.mean(features, axis=0, keepdims=True))
                stdevs.append(np.std(features, axis=0, keepdims=True))

            means_by_classes.append(means)
            stdev_by_classes.append(stdevs)

        return np.array(means_by_classes).squeeze(), np.array(stdev_by_classes).squeeze()  # 10 * D, 10 * D

기존 학습 방법은 for 문을 통해 각 클래스에 대해 평균과 표준 변 차를 구해 means_by_classes, stdev_by_classes에 저장하여 이들을 return 합니다.

Online learning의 경우에 대해 살펴보겠습니다.

#  ------------------- online learning -------------------
    if mode == 'online':
        for C in range(numOf_classes):
            K = 1

            count = 0
            for feature in np_samples_by_classes[C]:  # feature: 28 * 28, np_samples_by_classes[C]: 6000 * 28 * 28
                if count == 2:  # mini batch is 2

                    count = 0

                if K == 1:  # first turn
                    mean = feature
                    var = 0.0
                else:
                    pre_mean = mean.copy()
                    mean = mean + ((feature - mean) / K)
                    var = var + ((((feature - pre_mean) * (feature - mean)) - var) / K)

                K += 1
                count += 1

            means_by_classes.append(mean)
            stdev_by_classes.append(np.sqrt(var))

        return np.array(means_by_classes).squeeze(), np.array(stdev_by_classes).squeeze()  # 10 * D, 10 * D

온라인 학습의 경우, 우리의 미니 배치 사이즈는 2입니다. K를 통해 몇 번째 미니 배치인지 구분을 하고, count를 통해 2개의 데이터를 카운팅 합니다.

for문에서 dataset인 feature를 1개씩 뽑아 mean, variance를 구합니다.

만약 K가 1인 경우 mean은 feature의 값이 되고, var의 값은 0이 됩니다.

그 이후부터 아래와 같은 점화식으로 학습을 진행합니다.

$$ \mu_{k} = \mu_{k-1}+ \frac{(x_{k} - \mu_{k-1})}{k} \\ \sigma_{k}^{2} = \sigma_{k-1}^{2}+ \frac{((x_{k}-\mu_{k-1}) \times (x_{k}- \mu_{k})) - \sigma^{2}_{k-1}}{k} $$

해당 수식을 else 문 아래에 구현하였습니다.

만약 count가 2가 되면, K를 더해주고, 다시 count를 0으로 초기화합니다.

이 과정을 통해 학습을 진행한 후 종료되면 일반 학습 방법과 마찬가지로 평균 및 표준 편차 값을 return 합니다.

def predict(means, stdevs, test_samples, mode):
    pred_classes = []
    numOf_classes = 10
    numOf_test_samples = test_samples.shape[0]

    for i in range(numOf_test_samples):
        prob_by_classes = []

        for C in range(numOf_classes):
            mean = means[C]
            stdev = stdevs[C]
            x = test_samples[i]
            result = -(Gaussian_PDF(x, mean, stdev))
            prob_by_classes.append(result)

        bestProb = 999999999

        for C in range(numOf_classes):
            if prob_by_classes[C] < bestProb:
                bestProb = prob_by_classes[C]
                pred_Label = C
        pred_classes.append(pred_Label)

        if (i + 1) % 10 == 0:
            print("{} Iterator : {}/{}".format(mode, numOf_test_samples, i + 1))

    return pred_classes

다음은 학습을 마친 후 predict에 대한 함수입니다.

for문을 통해 진행하고, 두 번째 for문을 통해 각 클래스의 평균 및 표준 편차 값을 진행하고, Gaussian_PDF를 통한 결과 값을 NLL 방식으로 바꾸어 최종적인 결과를 prob_by_classes 리스트에 저장합니다.

그 후 각 클래스 별 result를 비교를 통해 가장 작은 값을 정답 label로 예측하여 pred_classes 리스트에 저장합니다.

아래 if 문은 단순히 predict 진행 확인용으로 만든 코드입니다.

최종적으로 예측한 클래스 값들의 리스트인 pred_classes를 return 합니다.

def Gaussian_PDF(x, mean, stdev):
    std_mask = stdev == 0.0
    x_mean_mask = x == mean
    stdev = np.where(std_mask, np.full(stdev.shape, 1.0), stdev)

    term1 = np.divide(1, np.sqrt(2 * np.pi) * stdev)
    term2 = -1 / 2 * np.power((x - mean) / stdev, 2)

    result = np.where(std_mask, np.where(x_mean_mask, np.full(x.shape, 1.0), np.full(x.shape, 0.0)),
                      np.log(term1) + term2)

    prod_result = np.sum(result)

    return prod_result

Gaussian PDF를 구하는 함수입니다.

각 mask를 통해 표준 편차가 0인 경우에 대한 예외 처리를 진행합니다.

만약 표준 편차가 0인 상황에서 해당 위치의 값과 mean의 값이 같으면 1로 바꾸고, 아니면 0으로 해당 위치의 feature 값을 바꿉니다.

term을 2개로 나누어 정규 분포 변화를 진행하였습니다.

기존 수식을 log를 씌어 NLL 방식으로 진행하였기 때문에 sum을 해준 결과를 return 하였습니다.

def get_ACC(pred_classes_of_testset, gt_of_testset):
    accuracy = np.equal(pred_classes_of_testset, gt_of_testset)
    return list(accuracy).count(True) / len(accuracy) * 100

정확도를 구하는 함수입니다.

실습 예제와 동일합니다.

Result

(1) Use min max normalization

최종 적인 결과는 기존 학습 방식은 63.56%이라는 정확도를 얻을 수 있었고, online learning의 경우에도 63.56%이라는 정확도를 얻을 수 있었습니다.

(2) Don’t use min max normalization

Min max normalization을 적용하지 않은 결과 약간의 정확도가 감소가 된 모습을 볼 수 있습니다.

 

반응형