On the journey of

[KoBERT] SKTBrain의 KoBERT 공부하기 본문

Experiences & Study

[KoBERT] SKTBrain의 KoBERT 공부하기

dlrpskdi 2023. 5. 29. 17:36

NLP에 있어서, BERT 모델이란 기본적으로 Unlabeled Data를 활용해 모델을 학습하고, 이를 기반으로 번역/문서 분류/ Q&A(질의응답) 등의 목적을 위해 신경망을 추가하는 전이학습 모델이다. 

출처 - 구글링...

: 그러나 위 그림에서 알 수 있듯, 대부분의 Bert 모델 및 관련 정보는 전부 영어로만 되어 있다. 뭐 코드나 해석 정보 같은 건 한글로 쳐도 나오는데.....영어 지분율이 90%는 되는 듯. 특히 데이터셋으로 넘어오게 되면 Kor 붙은 데이터가 없다.

이를 해결하기 위해 (공식적으로는 '구글 Bert Base multilingual cased'의 한국어 성능 한계) SKTBrain/KoBERT: Korean BERT pre-trained cased (KoBERT) (github.com) 가 나왔다! KoBERT는 이를 기반으로 한 NLP 코드를 의미한다.


그렇다면 naver_review_classifications_pytorch_kobert.ipynb - Colaboratory (google.com) : 공개된 코랩 KoBERT를 활용한 네이버 리뷰 분류(감정분석) 과정을 살펴보자 . 내가 짠 코드가 아닌, 공개해준 코드이다 :)

 


 

  • 나는 Colab에서 실행했지만, Vscode(Visual Studio) 등에서 실행하는 경우 깃헙 링크가 아닌 pywidgets가 필요!

 

import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gluonnlp as nlp
import numpy as np
from tqdm.notebook import tqdm

필요한 패키지와 라이브러리를 불러왔다. 

  • torch: Pytorch(파이토치) 를 의미하며, 딥러닝 프레임워크로 GPU를 활용해 인공 신경망 모델을 만들고 학습시키는 데 쓰인다. 
  • Gluonnlp : NLP(자연어처리)에서, 신경망으로 공급되는 data(Input data) 입력은 가변적인 경우가 대부분이다. 때문에 신경망 모델은 학습 중 모양과 크기를 변경해야 하는 경우가 많은데, Gluonnlp 모델을 통해 그 변경을 용이하게 하려는 것이다. 
  • TQDM : 얜 그냥 상태 바(Progress Bar)를 보여주기 위한 라이브러리다. 
  • 혹시 uninstalled error가 발생한다면 아래와 같이 !pip install (라이브러리명) 붙여주고 실행하면 끝!
!pip install gluonnlp

이제 본격적으로 KoBERT 모델을 불러오자. 대신 CPU냐 GPU냐는 꼭 구분해줘야 함!

# CPU
ctx = mx.cpu()

# GPU
ctx = mx.gpu()

여기서 본인 환경에 맞게 설정(주석처리) 하면 됨!

bert_base, vocab = get_mxnet_kobert_model(use_decoder=False, use_classifier=False, ctx=ctx, cachedir=".cache")

 

tokenizer = get_tokenizer()
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False)

이제 KoBERT에서 제공하는 dataset를 가져와보자. 

!wget -O .cache/ratings_train.txt http://skt-lsl-nlp-model.s3.amazonaws.com/KoBERT/datasets/nsmc/ratings_train.txt
!wget -O .cache/ratings_test.txt http://skt-lsl-nlp-model.s3.amazonaws.com/KoBERT/datasets/nsmc/ratings_test.txt

이때 encoding에 주의해야 한다. 아니면 아래와 같이 깨져버린 dataset을 보게 될 것이다...

여하간 encoding 문제를 해결하면, data를 test용과 train용으로 구분지어야 하므로 dataset 제목에 train 있는 애를 dataset_train, test 있는 애를 dataset_test로 변경!

dataset_train = nlp.data.TSVDataset(".cache/ratings_train.txt", field_indices=[1,2], num_discard_samples=1)
dataset_test = nlp.data.TSVDataset(".cache/ratings_test.txt", field_indices=[1,2], num_discard_samples=1)

이제 파라미터를 설정해주고, 분류 개수 관련한 코드를 살펴보자. 먼저 파라미터다. 

#파라미터
## Setting parameters
max_len = 64
batch_size = 64
warmup_ratio = 0.1
num_epochs = 5
max_grad_norm = 1
log_interval = 200
learning_rate =  5e-5

최대 텍스트 데이터의 길이(max_len), 반복할 데이터셋 크기(Batch_size), 학습률(learning rate) 등의 파라미터를 설정할 수 있다. 이건 본인 취향(목적)에 맞게 하면 되고, 나는 별달리 변경하진 않았다 :)

class BERTDataset(mx.gluon.data.Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, max_len,
                 pad, pair):
        transform = nlp.data.BERTSentenceTransform(
            bert_tokenizer, max_seq_length=max_len, pad=pad, pair=pair)
        sent_dataset = gluon.data.SimpleDataset([[
            i[sent_idx],
        ] for i in dataset])
        self.sentences = sent_dataset.transform(transform)
        self.labels = gluon.data.SimpleDataset(
            [np.array(np.int32(i[label_idx])) for i in dataset])

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))

    def __len__(self):
        return (len(self.labels))

그럼 얘는 뭐 하는 함수냐 : 입력될 dataset을 정리하기 위한 코드인 것. dataset에서 뭘 가져와야 하는지를 def(정의)했다고 보면 됨! 그리고 데이터셋 정리와 parameter 값 설정이 끝났다면, 이제 아래 코드를 통해 모델을 (재)정의하면 된다. 이때도, 방법론에 따라 BertClassifier(상속받는 클래스) 정의 코드가 조금씩 다름을 알 수 있다! 나는 위 Block 코드 사용했다.

class BERTClassifier(nn.Block):
    def __init__(self,
                 bert,
                 num_classes=2,
                 dropout=None,
                 prefix=None,
                 params=None):
        super(BERTClassifier, self).__init__(prefix=prefix, params=params)
        self.bert = bert
        with self.name_scope():
            self.classifier = nn.HybridSequential(prefix=prefix)
            if dropout:
                self.classifier.add(nn.Dropout(rate=dropout))
            self.classifier.add(nn.Dense(units=num_classes))

    def forward(self, inputs, token_types, valid_length=None):
        _, pooler = self.bert(inputs, token_types, valid_length)
        return self.classifier(pooler)
class BERTClassifier(nn.Module): ## 클래스 상속
    def __init__(self,
                 bert,
                 hidden_size = 768,
                 num_classes=6,   ##클래스 수 조정
                 dr_rate=None,
                 params=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
                 
        self.classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)
    
    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        
        _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device))
        if self.dr_rate:
            out = self.dropout(pooler)
        return self.classifier(out)

기본적으로 Kobert은 '모델'이다. 이미 training을 거친 모델이라는 뜻이기에, 이를 상속받아 쓰기 위한 함수를 가져온 것이기에, class를 '상속'받으면서 시작해야 하는 것.


이제 모델을 불러와서 training을 하기 위한 준비를 마쳤다. 이제 본격적으로 train - evaluate(평가)를 진행해보자!

model = BERTClassifier(bert_base, num_classes=2, dropout=0.1)
# 분류 레이어만 초기화 한다. 
model.classifier.initialize(init=mx.init.Normal(0.02), ctx=ctx)
model.hybridize()

# softmax cross entropy loss for classification
loss_function = gluon.loss.SoftmaxCELoss()

metric = mx.metric.Accuracy()

기본적으로 Bert 모델은, 맨 처음 적어뒀듯 번역/문서 분류/ Q&A(질의응답) 등의 목적을 위해 신경망을 추가하는 모델이기에 활용은 다방면으로 가능하겠지만, 분류가 우선시될 것이다. 이 때문에 BertClassifier로 상속받은 거기도 하고! 

batch_size = 32
lr = 5e-5

train_dataloader = mx.gluon.data.DataLoader(data_train, batch_size=batch_size, num_workers=5)
test_dataloader = mx.gluon.data.DataLoader(data_test, batch_size=int(batch_size/2), num_workers=5)

gluon 모델을 통해 말하자면 원래 dataset이라는 재료를, 요리할 수 있게끔 손질하는 과정을 위 코드로 진행하였다. 요리하려면 어떤 (train, test) 데이터를 어느 정도 크기로 썰어서(batch_size) 어떻게 조리할지 등을 정해야 하듯!

trainer = gluon.Trainer(model.collect_params(), 'bertadam',
                        {'learning_rate': lr, 'epsilon': 1e-9, 'wd':0.01})

log_interval = 4
num_epochs = 5

 

그리고 어떻게 조리할지(trainer), 얼마나 오래 조리할지(num_epochs) 정해야 하는데, 이 옵션은 위 코드의 parameter 설정을 통해 진행했다.

# LayerNorm과 Bias에는 Weight Decay를 적용하지 않는다. 
for _, v in model.collect_params('.*beta|.*gamma|.*bias').items():
    v.wd_mult = 0.0
params = [
    p for p in model.collect_params().values() if p.grad_req != 'null'
]

참고로 내가 실행한 gluon_bert 코드의 경우에는 위와 같은 코드가 있다. Weight Decay가 뭐길래 적용하지 않는 거냐 한다면 모델은 데이터를 학습하면서 점점 복잡해지고, 모델다운(?) 형태를 띠게 된다. 하지만 너무 복잡해지면 과적합의 우려가 생기기에, 복잡도를 줄이고자 학습 도중 가중치(weight)가 너무 큰 값을 갖지 않도록 loss function에 weight가 커질 경우를 대비한 penalty를 반영하고, 이게 바로 weight decay다. 

def evaluate_accuracy(model, data_iter, ctx=ctx):
    acc = mx.metric.Accuracy()
    i = 0
    for i, (t,v,s, label) in enumerate(data_iter):
        token_ids = t.as_in_context(ctx)
        valid_length = v.as_in_context(ctx)
        segment_ids = s.as_in_context(ctx)
        label = label.as_in_context(ctx)
        output = model(token_ids, segment_ids, valid_length.astype('float32'))
        acc.update(preds=output, labels=label)
        if i > 1000:
            break
        i += 1
    return(acc.get()[1])

evaluate_accuracy: 이름만 봐도 기능을 알 수 있을 것 같다. 모델을 학습했으면 test를 진행해야 한다. 그 성능을 평가하기 위한 함수를 정의한 코드다. 

#learning rate warmup을 위한 준비 
accumulate = 4
step_size = batch_size * accumulate if accumulate else batch_size
num_train_examples = len(data_train)
num_train_steps = int(num_train_examples / step_size * num_epochs)
warmup_ratio = 0.1
num_warmup_steps = int(num_train_steps * warmup_ratio)
step_num = 0
all_model_params = model.collect_params()

학습률을 조정하는 코드이고, 아래는 Gradient Accumulation(옵션이다)을 조정하는 코드다. Colab 상에서 코드를 실행할 때, 가장 중요해지는 건 무료라는 전제 하에  어디까지 돌려볼 수 있는가? 가 아닐까 싶다. 이를 조정하기 위한 방법이 gradient accumulation이다. 구글링 결과 나온 이미지로 설명해 보자면 아래 그림과 같다. G.G(Gradient accumulation) 단계를 추가하여, 우선 각 미니 배치 훈련 후 한번에 업데이트시킨다는 차이가 있다. 

# Set grad_req if gradient accumulation is required
if accumulate and accumulate > 1:
    for p in params:
        p.grad_req = 'add'

때문에 required: 필요하다고 판단되면 설정하라는 것!


마지막으로 epoch 돌리면서 test accuracy를 출력하는 코드이다. 

for epoch_id in range(num_epochs):
    metric.reset()
    step_loss = 0
    for batch_id, (token_ids, valid_length, segment_ids, label) in tqdm(enumerate(train_dataloader), total=len(train_dataloader)):
        if step_num < num_warmup_steps:
            new_lr = lr * step_num / num_warmup_steps
        else:
            non_warmup_steps = step_num - num_warmup_steps
            offset = non_warmup_steps / (num_train_steps - num_warmup_steps)
            new_lr = lr - offset * lr
        trainer.set_learning_rate(new_lr)
        with mx.autograd.record():
            # load data to GPU
            token_ids = token_ids.as_in_context(ctx)
            valid_length = valid_length.as_in_context(ctx)
            segment_ids = segment_ids.as_in_context(ctx)
            label = label.as_in_context(ctx)

            # forward computation
            out = model(token_ids, segment_ids, valid_length.astype('float32'))
            ls = loss_function(out, label).mean()

        # backward computation
        ls.backward()
        if not accumulate or (batch_id + 1) % accumulate == 0:
          trainer.allreduce_grads()
          nlp.utils.clip_grad_global_norm(params, 1)
          trainer.update(accumulate if accumulate else 1)
          step_num += 1
          if accumulate and accumulate > 1:
              # set grad to zero for gradient accumulation
              all_model_params.zero_grad()

        step_loss += ls.asscalar()
        metric.update([label], [out])
        if (batch_id + 1) % (50) == 0:
            print('[Epoch {} Batch {}/{}] loss={:.4f}, lr={:.10f}, acc={:.3f}'
                         .format(epoch_id + 1, batch_id + 1, len(train_dataloader),
                                 step_loss / log_interval,
                                 trainer.learning_rate, metric.get()[1]))
            step_loss = 0
    test_acc = evaluate_accuracy(model, test_dataloader, ctx)
    print('Test Acc : {}'.format(test_acc))

정확하지는 않을지도 모른다 ... 나는 감자 학부생인 데다 공부하면서 적은 거기 때문에....

혹시 잘못되었거나, 이미지들은 구글링하여 나온 이미지를 활용했는데 혹시 저작권 문제가 있다면 댓글로 알려주세요 :)