First commit

This commit is contained in:
thematdev 2022-11-10 22:23:31 +03:00
commit 4090a25471
12 changed files with 827 additions and 0 deletions

View File

@ -0,0 +1,2 @@
from codeforces_scraper.models import *
from codeforces_scraper.scraper import *

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,182 @@
[
{
"id": 43,
"name": "GNU GCC C11 5.1.0",
"extensions": [".c"]
},
{
"id": 80,
"name": "Clang++20 Diagnostics",
"extensions": [".cpp"]
},
{
"id": 52,
"name": "Clang++17 Diagnostics",
"extensions": [".cpp"]
},
{
"id": 50,
"name": "GNU G++14 6.4.0",
"extensions": [".cpp"]
},
{
"id": 54,
"name": "GNU G++17 7.3.0",
"extensions": [".cpp"]
},
{
"id": 73,
"name": "GNU G++20 11.2.0 (64 bit, winlibs)",
"extensions": [".cpp"]
},
{
"id": 59,
"name": "Microsoft Visual C++ 2017",
"extensions": [".cpp"]
},
{
"id": 61,
"name": "GNU G++17 9.2.0 (64 bit, msys 2)",
"extensions": [".cpp"]
},
{
"id": 65,
"name": "C# 8, .NET Core 3.1",
"extensions": [".cs"]
},
{
"id": 79,
"name": "C# 10, .NET SDK 6.0",
"extensions": [".cs"]
},
{
"id": 9,
"name": "C# Mono 6.8",
"extensions": [".cs"]
},
{
"id": 28,
"name": "D DMD32 v2.091.0",
"extensions": [".d"]
},
{
"id": 32,
"name": "Go 1.19",
"extensions": [".go"]
},
{
"id": 12,
"name": "Haskell GHC 8.10.1",
"extensions": [".hs"]
},
{
"id": 60,
"name": "Java 11.0.6",
"extensions": [".java"]
},
{
"id": 74,
"name": "Java 17 64bit",
"extensions": [".java"]
},
{
"id": 36,
"name": "Java 1.8.0_241",
"extensions": [".java"]
},
{
"id": 48,
"name": "Kotlin 1.4.31",
"extensions": [".kt"]
},
{
"id": 72,
"name": "Kotlin 1.5.31",
"extensions": [".kt"]
},
{
"id": 77,
"name": "Kotlin 1.6.10",
"extensions": [".kt"]
},
{
"id": 19,
"name": "OCaml 4.02.1",
"extensions": []
},
{
"id": 3,
"name": "Delphi 7",
"extensions": [".pas"]
},
{
"id": 4,
"name": "Free Pascal 3.0.2",
"extensions": [".pas"]
},
{
"id": 51,
"name": "PascalABC.NET 3.4.2",
"extensions": [".pas"]
},
{
"id": 13,
"name": "Perl 5.20.1",
"extensions": [".pl"]
},
{
"id": 6,
"name": "PHP 8.1.7",
"extensions": [".php"]
},
{
"id": 7,
"name": "Python 2.7.18",
"extensions": [".py"]
},
{
"id": 31,
"name": "Python 3.8.10",
"extensions": [".py"]
},
{
"id": 40,
"name": "PyPy 2.7.13 (7.3.0)",
"extensions": [".py"]
},
{
"id": 41,
"name": "PyPy 3.6.9 (7.3.0)",
"extensions": [".py"]
},
{
"id": 70,
"name": "PyPy 3.9.10 (7.3.9, 64bit)",
"extensions": [".py"]
},
{
"id": 67,
"name": "Ruby 3.0.0",
"extensions": [".rb"]
},
{
"id": 75,
"name": "Rust 1.64.0 (2021)",
"extensions": [".rs"]
},
{
"id": 20,
"name": "Scala 2.12.8",
"extensions": []
},
{
"id": 34,
"name": "JavaScript V8 4.8.0",
"extensions": [".js"]
},
{
"id": 55,
"name": "Node.js 12.16.3",
"extensions": [".js"]
}
]

View File

@ -0,0 +1,50 @@
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 `importlib_resources`.
import importlib_resources as pkg_resources
from pydantic import BaseModel, parse_obj_as
from typing import List, Iterable, Optional
from . import assets
import json
class LanguageCompiler(BaseModel):
"""Model containing information about compiler"""
id: int
name: str
extensions: List[str]
ALL_LANGUAGE_COMPILERS = parse_obj_as(
List[LanguageCompiler],
json.loads(pkg_resources.read_text(assets, 'all_language_compilers.json'))
)
def compiler_by_id(id: int) -> LanguageCompiler:
"""Return compiler model by id"""
for comp in ALL_LANGUAGE_COMPILERS:
if comp.id == id:
return comp
def all_compilers_by_ext(extension: str) -> Iterable[LanguageCompiler]:
"""Returns ALL compiler supporting given extension
"""
return filter(lambda comp: extension in comp.extensions,
ALL_LANGUAGE_COMPILERS)
def some_compiler_by_ext(extension: str) -> Optional[LanguageCompiler]:
"""Returns some compiler for extension, or None if not supported
"""
if extension == '.cpp':
return compiler_by_id(73)
if extension == '.py':
return compiler_by_id(70)
if extension == '.c':
return compiler_by_id(43)
if extension == '.hs':
return compiler_by_id(12)
return None

View File

@ -0,0 +1,234 @@
from pydantic import BaseModel
from enum import Enum
from typing import List, Optional
def de_eblanify(string: str) -> str:
if string == '__root__':
return string
# if string == 'friendof_count':
# return 'friendOfCount'
result: str = ''.join(word.capitalize() for word in string.split('_'))
if len(result) > 0:
result = result[0].lower() + result[1:]
return result
class Verdict(str, Enum):
FAILED = "FAILED"
OK = "OK"
PT = "PARTIAL"
CE = "COMPILATION_ERROR"
RE = "RUNTIME_ERROR"
WA = "WRONG_ANSWER"
PE = "PRESENTATION_ERROR"
TL = "TIME_LIMIT_EXCEEDED"
ML = "MEMORY_LIMIT_EXCEEDED"
IL = "IDLENESS_LIMIT_EXCEEDED"
SV = "SECURITY_VIOLATED"
CRASHED = "CRASHED"
INPUT_PREPARATION_CRASHED = "INPUT_PREPARATION_CRASHED"
CHALLENGED = "CHALLENGED"
SK = "SKIPPED"
TESTING = "TESTING"
RJ = "REJECTED"
class HackVerdict(str, Enum):
HACK_SUCCESSFUL = "HACK_SUCCESSFUL"
HACK_UNSUCCESSFUL = "HACK_UNSUCCESSFUL"
INVALID_INPUT = "INVALID_INPUT"
GENERATOR_INCOMPILABLE = "GENERATOR_INCOMPILABLE"
GENERATOR_CRASHED = "GENERATOR_CRASHED"
IGNORED = "IGNORED"
TESTING = "TESTING"
OTHER = "OTHER"
class ContestType(str, Enum):
CF = "CF"
IOI = "IOI"
ICPC = "ICPC"
class ContestPhase(str, Enum):
BEFORE = "BEFORE"
CODING = "CODING"
PENDING_SYSTEM_TEST = "PENDING_SYSTEM_TEST"
SYSTEM_TEST = "SYSTEM_TEST"
FINISHED = "FINISHED"
class ProblemResultType(str, Enum):
PRELIMINARY = "PRELIMINARY"
FINAL = "FINAL"
class APIModel(BaseModel):
class Config:
alias_generator = de_eblanify
class JudgeProtocol(APIModel):
manual: bool
protocol: Optional[str]
verdict: Optional[str]
class BlogEntry(APIModel):
id: int
original_locale: str
creation_time_seconds: int
author_handle: str
title: str
content: Optional[str]
locale: str
modification_time_seconds: int
allow_view_history: bool
tags: List[str]
rating: int
class Comment(APIModel):
id: int
creation_time_seconds: int
commentator_handle: str
locale: str
text: str
parent_comment_id: Optional[int]
rating: int
class RecentAction(APIModel):
time_seconds: int
blog_entry: BlogEntry
comment: Comment
class ProblemStatistics(APIModel):
contest_id: int
index: str
solved_count: int
class RatingChange(APIModel):
contest_id: int
contest_name: str
handle: str
rank: int
rating_update_time_seconds: int
old_rating: int
new_rating: int
class Member(APIModel):
handle: str
class Problem(APIModel):
contest_id: Optional[int]
problem_set_name: Optional[str]
index: str
name: str
type: str
points: Optional[float]
rating: Optional[int]
tags: List[str]
class User(APIModel):
handle: str
email: Optional[str]
vk_id: Optional[str]
open_id: Optional[str]
first_name: Optional[str]
last_name: Optional[str]
country: Optional[str]
city: Optional[str]
organization: Optional[str]
contribution: int
rank: str
rating: int
max_rank: str
max_rating: int
last_online_time_seconds: int
registration_time_seconds: int
friendof_count: int
avatar: str
title_photo: str
class Party(APIModel):
contest_id: int
members: List[Member]
participant_type: str
team_id: Optional[int]
team_name: Optional[str]
ghost: bool
room: Optional[int]
start_time_seconds: Optional[int]
class Submission(APIModel):
id: int
contest_id: int
creation_time_seconds: int
relative_time_seconds: int
problem: Problem
author: Party
programming_language: str
verdict: Optional[Verdict]
testset: str
passed_test_count: int
time_consumed_millis: int
memory_consumed_bytes: int
points: Optional[float]
class Contest(APIModel):
id: int
name: str
type: ContestType
phase: ContestPhase
frozen: bool
duration_seconds: bool
start_time_seconds: Optional[int]
relative_time_seconds: Optional[int]
prepared_by: Optional[str]
website_url: Optional[str]
description: Optional[str]
difficulty: Optional[int]
kind: Optional[str]
icpc_region: Optional[str]
country: Optional[str]
city: Optional[str]
season: Optional[str]
class Hack(APIModel):
id: int
creation_time_seconds: int
hacker: Party
defender: Party
problem: Problem
test: Optional[str]
judge_protocol = JudgeProtocol
class ProblemResult(APIModel):
points: float
penalty: int
rejected_attempt_count: int
type: ProblemResultType
best_submission_time_seconds: int
class RanklistRow(APIModel):
party: Party
rank: int
points: float
penalty: int
successful_hack_count: int
unsuccessful_hack_count: int
problem_result: List[ProblemResult]
last_submission_time_seconds: int

View File

@ -0,0 +1,214 @@
import requests
from requests import Session
from bs4 import BeautifulSoup as bs
from codeforces_scraper.utils import get_token, get_messages, create_jar
from codeforces_scraper.models import Submission, Problem
from typing import List
from functools import reduce
BASE_URL = 'https://codeforces.com'
class ScraperError(Exception):
pass
class MessagedScrapError(ScraperError):
def __init__(self, codeforces_message: str):
self.codeforces_message = codeforces_message
def __str__(self):
f'Codeforces returned message, which is not considered as good: {self.codeforces_message}'
class CodeforcesAPIException(ScraperError):
def __init__(self, comment: str):
self.comment = comment
def __str__(self):
return f'Request to Codeforces API failed. Comment: {self.comment}'
class Scraper:
def __init__(self, create_session=True, base_url=BASE_URL):
"""Initialize scraper
If ``create_session`` is True(default), will create session,
``base_url`` (default 'codeforces.com') describes URL
to which all requests will be sent
"""
self.session = Session() if create_session else None
self.base_url = base_url
self.current_user = None
def close(self):
"""Close scraper(closes session it is not None)"""
if self.session is not None:
self.session.close()
def logout(self):
"""Logout from codeforces
Does nothing if you're not logged in
"""
if self.current_user is None:
return
soup = bs(self.get().text, 'lxml')
refs = soup.find(class_='lang-chooser').find_all('a')
for ref in refs:
if 'logout' in ref['href']:
self.get(ref['href'])
self.update_current_user()
if self.current_user is not None:
raise ScraperError('Failed to logout!')
return
def get_csrf_token(self):
"""Get csrf token, which is needed
to make requests by hand
"""
return get_token(self.get())
def fetch_current_user(self):
"""Fetch current user by querying codeforces"""
soup = bs(self.get().text, 'lxml')
avatar_element = soup.find(class_='avatar')
if avatar_element is None:
return None
return avatar_element.find('div').find('a').text
def update_current_user(self):
"""Update cached ``current_user`` variable"""
self.current_user = self.fetch_current_user()
# Tries to login with given credentials, will relogin, if logged under another user
def login(self, username: str, password: str):
"""Login to codeforces by ``username`` and ``password``"""
if self.current_user == username:
return
if self.current_user is not None:
self.logout()
token = get_token(self.get('enter'))
payload = {
'csrf_token': token,
'action': 'enter',
'handleOrEmail': username,
'password': password,
'remember': 'on'
}
self.post('enter', data=payload)
self.update_current_user()
if self.current_user != username:
# TODO: Parse response and raise different errors(if they can be)
raise ScraperError('Failed to login!')
def set_cookies_from_header(self, str_cookie: str):
self.session.cookies = create_jar(str_cookie)
def submit(self, contest_id: int, problem_index, source_code: str, lang: int) -> None:
"""Submit code in problem ``BASE_URL/contest_id/problem_index`` with source
``source_code`` and language code ``lang``.
Get your language code using Language class
"""
if self.current_user is None:
raise ScraperError('Submitting while not logged in')
url = f'contest/{contest_id}/submit'
submit_page_response = self.get(url)
for message in get_messages(submit_page_response):
raise MessagedScrapError(message)
token = get_token(submit_page_response)
payload = {
'csrf_token': token,
'source': source_code,
'submittedProblemIndex': problem_index,
'action': 'submitSolutionFormSubmitted',
'programTypeId': lang
}
post_response = self.post(url, data=payload)
if len(get_messages(post_response)) == 0:
raise ScraperError("Failed to submit. No success message found")
def make_manual_hack(self, submission_id: int, test_data: str) -> None:
"""Make manual hack(explicit test) of submission with id
``submission_id`` and test ``test_data``
"""
if self.current_user is None:
raise ScraperError('Hacking while not logged in')
url = 'data/challenge'
payload = {
'csrf_token': self.get_csrf_token(),
'action': 'challengeFormSubmitted',
'submissionId': submission_id,
'inputType': 'manual',
'testcase': test_data
}
self.post(url, data=payload)
def scrap_submissions(self, contest_id: int) -> List[Submission]:
if self.current_user is None:
raise ScraperError('Submitting while not logged in')
url = f'contest/{contest_id}/my'
page_response = self.get(url)
soup = bs(page_response.text, 'lxml')
tables = soup.find_all('table', attrs={'class': 'status-frame-datatable'})
tbodys = [table.find('tbody') for table in tables]
rows = [tbody.find_all('tr', attrs={'class': 'highlighted-row'}) for tbody in tbodys]
rows = reduce(lambda x, y: x + y, rows)
return rows
def get_submission_source(self, contest_id: int, submission_id: int) -> str:
"""Get source code of submission by ``contest_id`` and ``submission_id``"""
url = f'contest/{contest_id}/submission/{submission_id}'
page_response = self.get(url)
soup = bs(page_response.text, 'lxml')
srcs = soup.find_all('pre', attrs={'id': 'program-source-text'})
try:
return srcs[0].contents[0]
except IndexError:
raise ScraperError("Submission not found!")
def get_submissions(self, contest_id: int, username: str) -> List[Submission]:
"""Get all submissions in contest ``contest_id``
of user with handle ``username``, if None returns all submissions
in this contest
"""
if username is not None:
params = {
'contestId': contest_id,
'handle': username
}
else:
params = {'contestId': contest_id}
return [Submission.parse_obj(x) for x in self.api_request('contest.status', params)]
def get_contest_tasks(self, contest_id: int) -> List[Problem]:
"""Get all tasks in contest with id ``contest_id``"""
params = {
'from': 1,
'count': 1
}
return self.api_request('contest.standings', params)['problems']
def get(self, sub_url='', **kwargs):
"""Make a GET request to BASE_URL"""
url = self.base_url + '/' + sub_url
if self.session is not None:
return self.session.get(url, **kwargs)
else:
return requests.get(url, **kwargs)
def post(self, sub_url='', **kwargs):
"""Make a POST request to BASE_URL"""
url = self.base_url + '/' + sub_url
if self.session is not None:
return self.session.post(url, **kwargs)
else:
return requests.post(url, **kwargs)
def api_request(self, method: str, params):
"""Make a request to Codeforces API with ``params``"""
response = self.get(f'api/{method}', params=params).json()
if response['status'] == 'FAILED':
raise CodeforcesAPIException(response['comment'])
return response['result']

View File

@ -0,0 +1,32 @@
import requests
import re
from requests import Response
from bs4 import BeautifulSoup as bs
from typing import List
MESSAGE_GREP_STRING = r'Codeforces\.showMessage\('
# TODO: Grep for Codeforces.showMessage(" to find message, that has been sent
def create_jar(str_cookie: str):
cookies = str_cookie.split(';')
d = {}
for c in cookies:
i = c.find('=')
k = c[:i]
v = c[i+1:]
d[k] = v
cj = requests.cookies.merge_cookies(requests.cookies.RequestsCookieJar(), d)
return cj
def get_token(response: Response) -> str:
text = response.text
soup = bs(text, 'lxml')
token = soup.find(class_='csrf-token')['data-csrf']
return token
def get_messages(response: Response) -> List[str]:
text = response.text
return re.findall(fr'{MESSAGE_GREP_STRING}\"(.+?)\"', text)

View File

@ -0,0 +1,28 @@
"""Script to fetch all language compilers in JSON
It is used only to 'build' distribution, do not use it
"""
from bs4 import BeautifulSoup
import sys
sys.path.append('..')
from codeforces_scraper import Scraper
from getpass import getpass
from pydantic import BaseModel
from typing import List
class LanguageCompiler(BaseModel):
id: int
name: str
extensions: List[str]
class LanguageList(BaseModel):
__root__: List[LanguageCompiler]
scraper = Scraper()
scraper.login(input(), getpass())
soup = BeautifulSoup(scraper.get('problemset/problem/4/A').text, 'lxml')
subject_options = [i.findAll('option') for i in soup.findAll('select', attrs={'name': 'programTypeId'})][0]
models = LanguageList(__root__=[LanguageCompiler(id=i['value'], name=i.text, extensions=[]) for i in subject_options])
print(models.json())

3
pyproject.toml Normal file
View File

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta:__legacy__"

21
setup.py Normal file
View File

@ -0,0 +1,21 @@
import setuptools
setuptools.setup(
name='codeforces-scraper',
version='0.1.0',
author='thematdev',
author_email='thematdev@thematdev.org',
description='Utility to do actions on codeforces',
packages=setuptools.find_packages(),
install_requires=[
'bs4',
'lxml',
'pydantic',
'requests',
],
python_requires='>=3.8',
zip_safe=True,
package_data={
'codeforces_scraper.assets': ['*']
}
)

0
tests/__init__.py Normal file
View File

60
tests/test_main.py Normal file
View File

@ -0,0 +1,60 @@
from codeforces_scraper import Scraper, ScraperError
from getpass import getpass
import unittest
from random import randint
CPP_LANG = 54
CONTEST_ID = 4
PROBLEM_INDEX = 'A'
SOURCE = \
"""#include <iostream>
using namespace std;
int main() {
int w; cin >> w;
if (w % 2 == 0 && w >= 4) {
cout << "YES" << endl;
} else {
cout << "NO" << endl;
}
}
"""
class LoginTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.username = input('username: ')
cls.password = getpass(f'codeforces password for {cls.username}: ')
def setUp(self):
self.scraper = Scraper()
def tearDown(self):
self.scraper.close()
def test_simple_login_logout(self):
self.assertEqual(self.scraper.fetch_current_user(), None)
self.scraper.login(self.username, self.password)
self.assertEqual(self.scraper.fetch_current_user(), self.username)
self.scraper.logout()
self.assertEqual(self.scraper.fetch_current_user(), None)
def test_same_submission(self):
self.scraper.login(self.username, self.password)
self.assertRaises(ScraperError, self.scraper.submit, CONTEST_ID, PROBLEM_INDEX, SOURCE, CPP_LANG)
def test_unique_submission(self):
salt = f'// Salt: {hex(randint(1, 1337666228))}\n'
source = salt + SOURCE
self.scraper.login(self.username, self.password)
self.scraper.submit(CONTEST_ID, PROBLEM_INDEX, source, CPP_LANG)
def test_submit_while_not_logged_in(self):
self.assertRaises(ScraperError, self.scraper.submit, CONTEST_ID, PROBLEM_INDEX, SOURCE, CPP_LANG)
def test_get_submissions(self):
self.scraper.get_submissions(CONTEST_ID, self.username)