| import os
|
| import json
|
| import numpy as np
|
| from PIL import Image
|
| import pandas as pd
|
| from IPython.display import Image
|
| from ultralytics import YOLO
|
| import torch
|
| from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments
|
| from datasets import load_dataset
|
| import cv2
|
| import pytesseract
|
| from PIL import Image, ImageEnhance
|
| import numpy as np
|
|
|
|
|
| pytesseract.pytesseract.tesseract_cmd = r'C:/Program Files/Tesseract-OCR/tesseract.exe'
|
|
|
| def ocr_core(image):
|
|
|
| data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
|
| df = pd.DataFrame(data)
|
| df = df[df['conf'] != -1]
|
| df['left_diff'] = df.groupby('block_num')['left'].diff().fillna(0).astype(int)
|
| df['prev_width'] = df['width'].shift(1).fillna(0).astype(int)
|
| df['spacing'] = (df['left_diff'] - df['prev_width']).fillna(0).astype(int)
|
| df['text'] = df.apply(lambda x: '\n' + x['text'] if (x['word_num'] == 1) & (x['block_num'] != 1) else x['text'], axis=1)
|
| df['text'] = df.apply(lambda x: ',' + x['text'] if x['spacing'] > 100 else x['text'], axis=1)
|
| ocr_text = ""
|
| for text in df['text']:
|
| ocr_text += text + ' '
|
| return ocr_text
|
|
|
| def improve_ocr_accuracy(img):
|
|
|
| img =Image.open(img)
|
|
|
|
|
| img = img.resize((img.width * 4, img.height * 4))
|
|
|
|
|
| enhancer = ImageEnhance.Contrast(img)
|
| img = enhancer.enhance(2)
|
|
|
| _, thresh = cv2.threshold(np.array(img), 127, 255, cv2.THRESH_BINARY_INV)
|
|
|
| return thresh
|
|
|
|
|
| def create_ocr_outputs():
|
| directory_path = os.getcwd() + '/data/processed/hand_labeled_tables/hand_labeled_tables'
|
|
|
| for root, dirs, files in os.walk(directory_path):
|
|
|
| print(f"Current directory: {root}")
|
|
|
|
|
| print("Subdirectories:")
|
| for dir in dirs:
|
| print(f"- {dir}")
|
|
|
|
|
| print("Files:")
|
| for image_path in files:
|
| print(f"- {image_path}")
|
| full_path = os.path.join(root, image_path)
|
|
|
| preprocessed_image = improve_ocr_accuracy(full_path)
|
|
|
| ocr_text = ocr_core(preprocessed_image)
|
| with open(os.getcwd() + f"/data/processed/annotations/{image_path.split('.')[0]}.txt", 'wb') as f:
|
| f.write(ocr_text.encode('utf-8'))
|
|
|
| print("\n")
|
|
|
|
|
| def prepare_dataset(ocr_dir, csv_dir, output_file):
|
| with open(output_file, 'w', encoding='utf-8') as jsonl_file:
|
| for filename in os.listdir(ocr_dir):
|
| if filename.endswith('.txt'):
|
| ocr_path = os.path.join(ocr_dir, filename)
|
| csv_path = os.path.join(csv_dir, filename)
|
| print(csv_path)
|
|
|
|
|
|
|
|
|
| with open(ocr_path, 'r', encoding='utf-8') as ocr_file:
|
| ocr_text = ocr_file.read()
|
|
|
| with open(csv_path, 'r', encoding='utf-8') as csv_file:
|
| csv_text = csv_file.read()
|
|
|
| json_object = {
|
| "prompt": ocr_text,
|
| "completion": csv_text
|
| }
|
| jsonl_file.write(json.dumps(json_object) + '\n')
|
|
|
| def tokenize_function(examples):
|
|
|
| inputs = tokenizer(examples['prompt'], truncation=True, padding='max_length', max_length=1012)
|
|
|
|
|
| inputs['labels'] = inputs['input_ids'].copy()
|
| return inputs
|
|
|
|
|
| if __name__ == '__name__':
|
|
|
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
| print(f"Using device: {device}")
|
|
|
|
|
| model = YOLO('yolov8l.pt')
|
|
|
|
|
| results = model.train(
|
| data='config.yaml',
|
| epochs=10,
|
| imgsz=640,
|
| batch=8,
|
| name='yolov8l_custom',
|
| device=device
|
| )
|
|
|
|
|
| metrics = model.val()
|
| print(metrics.box.map)
|
| torch.save(model, os.getcwd() + '/models/trained_yolov8.pt')
|
|
|
| create_ocr_outputs()
|
|
|
|
|
| ocr_dir = os.getcwd() + '/data/processed/annotations'
|
| csv_dir = os.getcwd() + '/data/processed/hand_labeled_tables'
|
| output_file = 'dataset.jsonl'
|
| prepare_dataset(ocr_dir, csv_dir, output_file)
|
|
|
|
|
|
|
| dataset = load_dataset('json', data_files={'train': 'dataset.jsonl'})
|
| dataset = dataset['train'].train_test_split(test_size=0.1)
|
|
|
|
|
| model_name = 'gpt2'
|
| tokenizer = GPT2Tokenizer.from_pretrained(model_name)
|
|
|
|
|
| tokenizer.add_special_tokens({'pad_token': '[PAD]'})
|
|
|
| tokenized_dataset = dataset.map(tokenize_function, batched=True)
|
|
|
|
|
| model = GPT2LMHeadModel.from_pretrained(model_name)
|
|
|
|
|
| model.resize_token_embeddings(len(tokenizer))
|
|
|
| training_args = TrainingArguments(
|
| output_dir='./results',
|
| num_train_epochs=3,
|
| per_device_train_batch_size=2,
|
| per_device_eval_batch_size=2,
|
| warmup_steps=500,
|
| weight_decay=0.01,
|
| logging_dir='./logs',
|
| logging_steps=10,
|
| evaluation_strategy="epoch",
|
| save_strategy="epoch",
|
| load_best_model_at_end=True,
|
| metric_for_best_model="eval_loss",
|
| )
|
|
|
|
|
| trainer = Trainer(
|
| model=model,
|
| args=training_args,
|
| train_dataset=tokenized_dataset['train'],
|
| eval_dataset=tokenized_dataset['test'],
|
| )
|
|
|
|
|
| trainer.train()
|
|
|
|
|
| eval_results = trainer.evaluate()
|
| print(f"Evaluation results: {eval_results}")
|
|
|
|
|
| model.save_pretrained(os.getcwd() + '/models/gpt')
|
| tokenizer.save_pretrained(os.getcwd() + '/models/gpt') |