From 4090a25471e6e2af20d42cba6107cbdc716ffc4f Mon Sep 17 00:00:00 2001 From: thematdev Date: Thu, 10 Nov 2022 22:23:31 +0300 Subject: [PATCH] First commit --- codeforces_scraper/__init__.py | 2 + codeforces_scraper/assets/__init__.py | 1 + .../assets/all_language_compilers.json | 182 ++++++++++++++ codeforces_scraper/languages.py | 50 ++++ codeforces_scraper/models.py | 234 ++++++++++++++++++ codeforces_scraper/scraper.py | 214 ++++++++++++++++ codeforces_scraper/utils.py | 32 +++ fetch_compilers_info/fetch.py | 28 +++ pyproject.toml | 3 + setup.py | 21 ++ tests/__init__.py | 0 tests/test_main.py | 60 +++++ 12 files changed, 827 insertions(+) create mode 100644 codeforces_scraper/__init__.py create mode 100644 codeforces_scraper/assets/__init__.py create mode 100644 codeforces_scraper/assets/all_language_compilers.json create mode 100644 codeforces_scraper/languages.py create mode 100644 codeforces_scraper/models.py create mode 100644 codeforces_scraper/scraper.py create mode 100644 codeforces_scraper/utils.py create mode 100644 fetch_compilers_info/fetch.py create mode 100644 pyproject.toml create mode 100644 setup.py create mode 100644 tests/__init__.py create mode 100644 tests/test_main.py diff --git a/codeforces_scraper/__init__.py b/codeforces_scraper/__init__.py new file mode 100644 index 0000000..e6ac1ad --- /dev/null +++ b/codeforces_scraper/__init__.py @@ -0,0 +1,2 @@ +from codeforces_scraper.models import * +from codeforces_scraper.scraper import * diff --git a/codeforces_scraper/assets/__init__.py b/codeforces_scraper/assets/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/codeforces_scraper/assets/__init__.py @@ -0,0 +1 @@ + diff --git a/codeforces_scraper/assets/all_language_compilers.json b/codeforces_scraper/assets/all_language_compilers.json new file mode 100644 index 0000000..58815d3 --- /dev/null +++ b/codeforces_scraper/assets/all_language_compilers.json @@ -0,0 +1,182 @@ +[ + { + "id": 43, + "name": "GNU GCC C11 5.1.0", + "extensions": [".c"] + }, + { + "id": 80, + "name": "Clang++20 Diagnostics", + "extensions": [".cpp"] + }, + { + "id": 52, + "name": "Clang++17 Diagnostics", + "extensions": [".cpp"] + }, + { + "id": 50, + "name": "GNU G++14 6.4.0", + "extensions": [".cpp"] + }, + { + "id": 54, + "name": "GNU G++17 7.3.0", + "extensions": [".cpp"] + }, + { + "id": 73, + "name": "GNU G++20 11.2.0 (64 bit, winlibs)", + "extensions": [".cpp"] + }, + { + "id": 59, + "name": "Microsoft Visual C++ 2017", + "extensions": [".cpp"] + }, + { + "id": 61, + "name": "GNU G++17 9.2.0 (64 bit, msys 2)", + "extensions": [".cpp"] + }, + { + "id": 65, + "name": "C# 8, .NET Core 3.1", + "extensions": [".cs"] + }, + { + "id": 79, + "name": "C# 10, .NET SDK 6.0", + "extensions": [".cs"] + }, + { + "id": 9, + "name": "C# Mono 6.8", + "extensions": [".cs"] + }, + { + "id": 28, + "name": "D DMD32 v2.091.0", + "extensions": [".d"] + }, + { + "id": 32, + "name": "Go 1.19", + "extensions": [".go"] + }, + { + "id": 12, + "name": "Haskell GHC 8.10.1", + "extensions": [".hs"] + }, + { + "id": 60, + "name": "Java 11.0.6", + "extensions": [".java"] + }, + { + "id": 74, + "name": "Java 17 64bit", + "extensions": [".java"] + }, + { + "id": 36, + "name": "Java 1.8.0_241", + "extensions": [".java"] + }, + { + "id": 48, + "name": "Kotlin 1.4.31", + "extensions": [".kt"] + }, + { + "id": 72, + "name": "Kotlin 1.5.31", + "extensions": [".kt"] + }, + { + "id": 77, + "name": "Kotlin 1.6.10", + "extensions": [".kt"] + }, + { + "id": 19, + "name": "OCaml 4.02.1", + "extensions": [] + }, + { + "id": 3, + "name": "Delphi 7", + "extensions": [".pas"] + }, + { + "id": 4, + "name": "Free Pascal 3.0.2", + "extensions": [".pas"] + }, + { + "id": 51, + "name": "PascalABC.NET 3.4.2", + "extensions": [".pas"] + }, + { + "id": 13, + "name": "Perl 5.20.1", + "extensions": [".pl"] + }, + { + "id": 6, + "name": "PHP 8.1.7", + "extensions": [".php"] + }, + { + "id": 7, + "name": "Python 2.7.18", + "extensions": [".py"] + }, + { + "id": 31, + "name": "Python 3.8.10", + "extensions": [".py"] + }, + { + "id": 40, + "name": "PyPy 2.7.13 (7.3.0)", + "extensions": [".py"] + }, + { + "id": 41, + "name": "PyPy 3.6.9 (7.3.0)", + "extensions": [".py"] + }, + { + "id": 70, + "name": "PyPy 3.9.10 (7.3.9, 64bit)", + "extensions": [".py"] + }, + { + "id": 67, + "name": "Ruby 3.0.0", + "extensions": [".rb"] + }, + { + "id": 75, + "name": "Rust 1.64.0 (2021)", + "extensions": [".rs"] + }, + { + "id": 20, + "name": "Scala 2.12.8", + "extensions": [] + }, + { + "id": 34, + "name": "JavaScript V8 4.8.0", + "extensions": [".js"] + }, + { + "id": 55, + "name": "Node.js 12.16.3", + "extensions": [".js"] + } +] diff --git a/codeforces_scraper/languages.py b/codeforces_scraper/languages.py new file mode 100644 index 0000000..cb897bd --- /dev/null +++ b/codeforces_scraper/languages.py @@ -0,0 +1,50 @@ +try: + import importlib.resources as pkg_resources +except ImportError: + # Try backported to PY<37 `importlib_resources`. + import importlib_resources as pkg_resources +from pydantic import BaseModel, parse_obj_as +from typing import List, Iterable, Optional +from . import assets +import json + + +class LanguageCompiler(BaseModel): + """Model containing information about compiler""" + id: int + name: str + extensions: List[str] + + +ALL_LANGUAGE_COMPILERS = parse_obj_as( + List[LanguageCompiler], + json.loads(pkg_resources.read_text(assets, 'all_language_compilers.json')) +) + + +def compiler_by_id(id: int) -> LanguageCompiler: + """Return compiler model by id""" + for comp in ALL_LANGUAGE_COMPILERS: + if comp.id == id: + return comp + + +def all_compilers_by_ext(extension: str) -> Iterable[LanguageCompiler]: + """Returns ALL compiler supporting given extension + """ + return filter(lambda comp: extension in comp.extensions, + ALL_LANGUAGE_COMPILERS) + + +def some_compiler_by_ext(extension: str) -> Optional[LanguageCompiler]: + """Returns some compiler for extension, or None if not supported + """ + if extension == '.cpp': + return compiler_by_id(73) + if extension == '.py': + return compiler_by_id(70) + if extension == '.c': + return compiler_by_id(43) + if extension == '.hs': + return compiler_by_id(12) + return None diff --git a/codeforces_scraper/models.py b/codeforces_scraper/models.py new file mode 100644 index 0000000..665b2ab --- /dev/null +++ b/codeforces_scraper/models.py @@ -0,0 +1,234 @@ +from pydantic import BaseModel +from enum import Enum +from typing import List, Optional + + +def de_eblanify(string: str) -> str: + if string == '__root__': + return string + # if string == 'friendof_count': + # return 'friendOfCount' + result: str = ''.join(word.capitalize() for word in string.split('_')) + if len(result) > 0: + result = result[0].lower() + result[1:] + return result + + +class Verdict(str, Enum): + FAILED = "FAILED" + OK = "OK" + PT = "PARTIAL" + CE = "COMPILATION_ERROR" + RE = "RUNTIME_ERROR" + WA = "WRONG_ANSWER" + PE = "PRESENTATION_ERROR" + TL = "TIME_LIMIT_EXCEEDED" + ML = "MEMORY_LIMIT_EXCEEDED" + IL = "IDLENESS_LIMIT_EXCEEDED" + SV = "SECURITY_VIOLATED" + CRASHED = "CRASHED" + INPUT_PREPARATION_CRASHED = "INPUT_PREPARATION_CRASHED" + CHALLENGED = "CHALLENGED" + SK = "SKIPPED" + TESTING = "TESTING" + RJ = "REJECTED" + + +class HackVerdict(str, Enum): + HACK_SUCCESSFUL = "HACK_SUCCESSFUL" + HACK_UNSUCCESSFUL = "HACK_UNSUCCESSFUL" + INVALID_INPUT = "INVALID_INPUT" + GENERATOR_INCOMPILABLE = "GENERATOR_INCOMPILABLE" + GENERATOR_CRASHED = "GENERATOR_CRASHED" + IGNORED = "IGNORED" + TESTING = "TESTING" + OTHER = "OTHER" + + +class ContestType(str, Enum): + CF = "CF" + IOI = "IOI" + ICPC = "ICPC" + + +class ContestPhase(str, Enum): + BEFORE = "BEFORE" + CODING = "CODING" + PENDING_SYSTEM_TEST = "PENDING_SYSTEM_TEST" + SYSTEM_TEST = "SYSTEM_TEST" + FINISHED = "FINISHED" + + +class ProblemResultType(str, Enum): + PRELIMINARY = "PRELIMINARY" + FINAL = "FINAL" + + +class APIModel(BaseModel): + class Config: + alias_generator = de_eblanify + + +class JudgeProtocol(APIModel): + manual: bool + protocol: Optional[str] + verdict: Optional[str] + + +class BlogEntry(APIModel): + id: int + original_locale: str + creation_time_seconds: int + author_handle: str + title: str + content: Optional[str] + locale: str + modification_time_seconds: int + allow_view_history: bool + tags: List[str] + rating: int + + +class Comment(APIModel): + id: int + creation_time_seconds: int + commentator_handle: str + locale: str + text: str + parent_comment_id: Optional[int] + rating: int + + +class RecentAction(APIModel): + time_seconds: int + blog_entry: BlogEntry + comment: Comment + + +class ProblemStatistics(APIModel): + contest_id: int + index: str + solved_count: int + + +class RatingChange(APIModel): + contest_id: int + contest_name: str + handle: str + rank: int + rating_update_time_seconds: int + old_rating: int + new_rating: int + + +class Member(APIModel): + handle: str + + +class Problem(APIModel): + contest_id: Optional[int] + problem_set_name: Optional[str] + index: str + name: str + type: str + points: Optional[float] + rating: Optional[int] + tags: List[str] + + +class User(APIModel): + handle: str + email: Optional[str] + vk_id: Optional[str] + open_id: Optional[str] + first_name: Optional[str] + last_name: Optional[str] + country: Optional[str] + city: Optional[str] + organization: Optional[str] + contribution: int + rank: str + rating: int + max_rank: str + max_rating: int + last_online_time_seconds: int + registration_time_seconds: int + friendof_count: int + avatar: str + title_photo: str + + +class Party(APIModel): + contest_id: int + members: List[Member] + participant_type: str + team_id: Optional[int] + team_name: Optional[str] + ghost: bool + room: Optional[int] + start_time_seconds: Optional[int] + + +class Submission(APIModel): + id: int + contest_id: int + creation_time_seconds: int + relative_time_seconds: int + problem: Problem + author: Party + programming_language: str + verdict: Optional[Verdict] + testset: str + passed_test_count: int + time_consumed_millis: int + memory_consumed_bytes: int + points: Optional[float] + + +class Contest(APIModel): + id: int + name: str + type: ContestType + phase: ContestPhase + frozen: bool + duration_seconds: bool + start_time_seconds: Optional[int] + relative_time_seconds: Optional[int] + prepared_by: Optional[str] + website_url: Optional[str] + description: Optional[str] + difficulty: Optional[int] + kind: Optional[str] + icpc_region: Optional[str] + country: Optional[str] + city: Optional[str] + season: Optional[str] + + +class Hack(APIModel): + id: int + creation_time_seconds: int + hacker: Party + defender: Party + problem: Problem + test: Optional[str] + judge_protocol = JudgeProtocol + + +class ProblemResult(APIModel): + points: float + penalty: int + rejected_attempt_count: int + type: ProblemResultType + best_submission_time_seconds: int + + +class RanklistRow(APIModel): + party: Party + rank: int + points: float + penalty: int + successful_hack_count: int + unsuccessful_hack_count: int + problem_result: List[ProblemResult] + last_submission_time_seconds: int diff --git a/codeforces_scraper/scraper.py b/codeforces_scraper/scraper.py new file mode 100644 index 0000000..4a6607b --- /dev/null +++ b/codeforces_scraper/scraper.py @@ -0,0 +1,214 @@ +import requests + +from requests import Session +from bs4 import BeautifulSoup as bs + +from codeforces_scraper.utils import get_token, get_messages, create_jar +from codeforces_scraper.models import Submission, Problem +from typing import List + +from functools import reduce + +BASE_URL = 'https://codeforces.com' + + +class ScraperError(Exception): + pass + + +class MessagedScrapError(ScraperError): + def __init__(self, codeforces_message: str): + self.codeforces_message = codeforces_message + + def __str__(self): + f'Codeforces returned message, which is not considered as good: {self.codeforces_message}' + + +class CodeforcesAPIException(ScraperError): + def __init__(self, comment: str): + self.comment = comment + + def __str__(self): + return f'Request to Codeforces API failed. Comment: {self.comment}' + + +class Scraper: + def __init__(self, create_session=True, base_url=BASE_URL): + """Initialize scraper + If ``create_session`` is True(default), will create session, + ``base_url`` (default 'codeforces.com') describes URL + to which all requests will be sent + """ + self.session = Session() if create_session else None + self.base_url = base_url + self.current_user = None + + def close(self): + """Close scraper(closes session it is not None)""" + if self.session is not None: + self.session.close() + + def logout(self): + """Logout from codeforces + Does nothing if you're not logged in + """ + if self.current_user is None: + return + soup = bs(self.get().text, 'lxml') + refs = soup.find(class_='lang-chooser').find_all('a') + for ref in refs: + if 'logout' in ref['href']: + self.get(ref['href']) + self.update_current_user() + if self.current_user is not None: + raise ScraperError('Failed to logout!') + return + + def get_csrf_token(self): + """Get csrf token, which is needed + to make requests by hand + """ + return get_token(self.get()) + + def fetch_current_user(self): + """Fetch current user by querying codeforces""" + soup = bs(self.get().text, 'lxml') + avatar_element = soup.find(class_='avatar') + if avatar_element is None: + return None + return avatar_element.find('div').find('a').text + + def update_current_user(self): + """Update cached ``current_user`` variable""" + self.current_user = self.fetch_current_user() + + # Tries to login with given credentials, will relogin, if logged under another user + def login(self, username: str, password: str): + """Login to codeforces by ``username`` and ``password``""" + if self.current_user == username: + return + if self.current_user is not None: + self.logout() + token = get_token(self.get('enter')) + payload = { + 'csrf_token': token, + 'action': 'enter', + 'handleOrEmail': username, + 'password': password, + 'remember': 'on' + } + self.post('enter', data=payload) + self.update_current_user() + if self.current_user != username: + # TODO: Parse response and raise different errors(if they can be) + raise ScraperError('Failed to login!') + + def set_cookies_from_header(self, str_cookie: str): + self.session.cookies = create_jar(str_cookie) + + def submit(self, contest_id: int, problem_index, source_code: str, lang: int) -> None: + """Submit code in problem ``BASE_URL/contest_id/problem_index`` with source + ``source_code`` and language code ``lang``. + Get your language code using Language class + """ + if self.current_user is None: + raise ScraperError('Submitting while not logged in') + url = f'contest/{contest_id}/submit' + submit_page_response = self.get(url) + for message in get_messages(submit_page_response): + raise MessagedScrapError(message) + token = get_token(submit_page_response) + payload = { + 'csrf_token': token, + 'source': source_code, + 'submittedProblemIndex': problem_index, + 'action': 'submitSolutionFormSubmitted', + 'programTypeId': lang + } + post_response = self.post(url, data=payload) + if len(get_messages(post_response)) == 0: + raise ScraperError("Failed to submit. No success message found") + + def make_manual_hack(self, submission_id: int, test_data: str) -> None: + """Make manual hack(explicit test) of submission with id + ``submission_id`` and test ``test_data`` + """ + if self.current_user is None: + raise ScraperError('Hacking while not logged in') + url = 'data/challenge' + payload = { + 'csrf_token': self.get_csrf_token(), + 'action': 'challengeFormSubmitted', + 'submissionId': submission_id, + 'inputType': 'manual', + 'testcase': test_data + } + self.post(url, data=payload) + + def scrap_submissions(self, contest_id: int) -> List[Submission]: + if self.current_user is None: + raise ScraperError('Submitting while not logged in') + url = f'contest/{contest_id}/my' + page_response = self.get(url) + soup = bs(page_response.text, 'lxml') + tables = soup.find_all('table', attrs={'class': 'status-frame-datatable'}) + tbodys = [table.find('tbody') for table in tables] + rows = [tbody.find_all('tr', attrs={'class': 'highlighted-row'}) for tbody in tbodys] + rows = reduce(lambda x, y: x + y, rows) + return rows + + def get_submission_source(self, contest_id: int, submission_id: int) -> str: + """Get source code of submission by ``contest_id`` and ``submission_id``""" + url = f'contest/{contest_id}/submission/{submission_id}' + page_response = self.get(url) + soup = bs(page_response.text, 'lxml') + srcs = soup.find_all('pre', attrs={'id': 'program-source-text'}) + try: + return srcs[0].contents[0] + except IndexError: + raise ScraperError("Submission not found!") + + def get_submissions(self, contest_id: int, username: str) -> List[Submission]: + """Get all submissions in contest ``contest_id`` + of user with handle ``username``, if None returns all submissions + in this contest + """ + if username is not None: + params = { + 'contestId': contest_id, + 'handle': username + } + else: + params = {'contestId': contest_id} + return [Submission.parse_obj(x) for x in self.api_request('contest.status', params)] + + def get_contest_tasks(self, contest_id: int) -> List[Problem]: + """Get all tasks in contest with id ``contest_id``""" + params = { + 'from': 1, + 'count': 1 + } + return self.api_request('contest.standings', params)['problems'] + + def get(self, sub_url='', **kwargs): + """Make a GET request to BASE_URL""" + url = self.base_url + '/' + sub_url + if self.session is not None: + return self.session.get(url, **kwargs) + else: + return requests.get(url, **kwargs) + + def post(self, sub_url='', **kwargs): + """Make a POST request to BASE_URL""" + url = self.base_url + '/' + sub_url + if self.session is not None: + return self.session.post(url, **kwargs) + else: + return requests.post(url, **kwargs) + + def api_request(self, method: str, params): + """Make a request to Codeforces API with ``params``""" + response = self.get(f'api/{method}', params=params).json() + if response['status'] == 'FAILED': + raise CodeforcesAPIException(response['comment']) + return response['result'] diff --git a/codeforces_scraper/utils.py b/codeforces_scraper/utils.py new file mode 100644 index 0000000..6300fdd --- /dev/null +++ b/codeforces_scraper/utils.py @@ -0,0 +1,32 @@ +import requests +import re +from requests import Response +from bs4 import BeautifulSoup as bs +from typing import List + +MESSAGE_GREP_STRING = r'Codeforces\.showMessage\(' +# TODO: Grep for Codeforces.showMessage(" to find message, that has been sent + + +def create_jar(str_cookie: str): + cookies = str_cookie.split(';') + d = {} + for c in cookies: + i = c.find('=') + k = c[:i] + v = c[i+1:] + d[k] = v + cj = requests.cookies.merge_cookies(requests.cookies.RequestsCookieJar(), d) + return cj + + +def get_token(response: Response) -> str: + text = response.text + soup = bs(text, 'lxml') + token = soup.find(class_='csrf-token')['data-csrf'] + return token + + +def get_messages(response: Response) -> List[str]: + text = response.text + return re.findall(fr'{MESSAGE_GREP_STRING}\"(.+?)\"', text) diff --git a/fetch_compilers_info/fetch.py b/fetch_compilers_info/fetch.py new file mode 100644 index 0000000..f77e51a --- /dev/null +++ b/fetch_compilers_info/fetch.py @@ -0,0 +1,28 @@ +"""Script to fetch all language compilers in JSON +It is used only to 'build' distribution, do not use it +""" + +from bs4 import BeautifulSoup +import sys +sys.path.append('..') +from codeforces_scraper import Scraper +from getpass import getpass +from pydantic import BaseModel +from typing import List + +class LanguageCompiler(BaseModel): + id: int + name: str + extensions: List[str] + + +class LanguageList(BaseModel): + __root__: List[LanguageCompiler] + + +scraper = Scraper() +scraper.login(input(), getpass()) +soup = BeautifulSoup(scraper.get('problemset/problem/4/A').text, 'lxml') +subject_options = [i.findAll('option') for i in soup.findAll('select', attrs={'name': 'programTypeId'})][0] +models = LanguageList(__root__=[LanguageCompiler(id=i['value'], name=i.text, extensions=[]) for i in subject_options]) +print(models.json()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b0471b7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta:__legacy__" \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..68acfdc --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ +import setuptools + +setuptools.setup( + name='codeforces-scraper', + version='0.1.0', + author='thematdev', + author_email='thematdev@thematdev.org', + description='Utility to do actions on codeforces', + packages=setuptools.find_packages(), + install_requires=[ + 'bs4', + 'lxml', + 'pydantic', + 'requests', + ], + python_requires='>=3.8', + zip_safe=True, + package_data={ + 'codeforces_scraper.assets': ['*'] + } +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..4bbeb84 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,60 @@ +from codeforces_scraper import Scraper, ScraperError +from getpass import getpass +import unittest +from random import randint + +CPP_LANG = 54 + +CONTEST_ID = 4 +PROBLEM_INDEX = 'A' + +SOURCE = \ + """#include + +using namespace std; + +int main() { + int w; cin >> w; + if (w % 2 == 0 && w >= 4) { + cout << "YES" << endl; + } else { + cout << "NO" << endl; + } +} +""" + + +class LoginTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.username = input('username: ') + cls.password = getpass(f'codeforces password for {cls.username}: ') + + def setUp(self): + self.scraper = Scraper() + + def tearDown(self): + self.scraper.close() + + def test_simple_login_logout(self): + self.assertEqual(self.scraper.fetch_current_user(), None) + self.scraper.login(self.username, self.password) + self.assertEqual(self.scraper.fetch_current_user(), self.username) + self.scraper.logout() + self.assertEqual(self.scraper.fetch_current_user(), None) + + def test_same_submission(self): + self.scraper.login(self.username, self.password) + self.assertRaises(ScraperError, self.scraper.submit, CONTEST_ID, PROBLEM_INDEX, SOURCE, CPP_LANG) + + def test_unique_submission(self): + salt = f'// Salt: {hex(randint(1, 1337666228))}\n' + source = salt + SOURCE + self.scraper.login(self.username, self.password) + self.scraper.submit(CONTEST_ID, PROBLEM_INDEX, source, CPP_LANG) + + def test_submit_while_not_logged_in(self): + self.assertRaises(ScraperError, self.scraper.submit, CONTEST_ID, PROBLEM_INDEX, SOURCE, CPP_LANG) + + def test_get_submissions(self): + self.scraper.get_submissions(CONTEST_ID, self.username)