-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathppedt_server.py
261 lines (214 loc) · 9.23 KB
/
ppedt_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
PGFPlotsEdt LaTeX backend server
"""
# Copyright (c) Log Creative 2020--2024.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Method: Open locally
# Usage:
# 1. Meet system requirements: TeX Distribution (TeX Live/MiKTeX/MacTeX) and Python 3.6+ with flask package.
# 2. Run this script by `python ppedt_server.py`
# 3. Open your browser and visit `http://127.0.0.1:5678/`. Terminate the server by `Ctrl+C`.
# See the documentation https://github.com/LogCreative/PGFPlotsEdt/blob/master/docs/README.md#open-locally for details.
import os
import shutil
import subprocess
import time
import platform
import glob
import re
from res.version_updater import write_version_info
from flask import Flask, send_from_directory, render_template_string, Response, request
rootdir = os.path.dirname(os.path.abspath(__file__))
tmpdir = os.path.join(rootdir, 'tmp')
app = Flask(__name__, static_url_path='', static_folder=".", template_folder=".")
def run_cmd(cmd: str):
subprocess.call("cd {} && {}".format(tmpdir, cmd), shell=True)
def get_header_name(sessid: str):
return "{}_header".format(sessid)
def get_body_name(sessid: str):
return "{}".format(sessid)
ctex_re = re.compile(r"\\usepackage(\[[^\]]*\])?\{ctex\}")
def get_header_body(tex: str, compiler: str, sessid: str):
# Normalize CRLF
tex = tex.replace("\r\n", "\n")
# Add magic command to make the file different for different compilers
program_magic_cmd = "% !TEX program = {}\n".format(compiler)
header_additional = program_magic_cmd
body_additional = program_magic_cmd
if not compiler == "pdflatex":
header_additional += "\\RequirePackage[OT1]{fontenc}\n"
body_additional += "\\endofdump\n"
# Handle CJK
if platform.system() == "Windows":
tex = tex.replace("\\begin{CJK}{UTF8}{gbsn}", "\\begin{CJK}{UTF8}{song}")
# Hand CTeX
ctex_match = ctex_re.search(tex)
if ctex_match is not None:
tex = tex.replace(ctex_match.group(0), "")
body_additional += ctex_match.group(0) + '\n'
# Find the header end
header_end = tex.find("\\begin{document}")
if header_end == -1:
return None, None
return header_additional + tex[:header_end] + "\\begin{document}\n\\end{document}\n", \
"%&{}\n".format(get_header_name(sessid)) + body_additional + tex[header_end:]
def same_or_write(filename: str, cur_content: str):
filepath = os.path.join(tmpdir, "{}.tex".format(filename))
if os.path.isfile(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
prev_content = f.read()
if prev_content == cur_content:
return False # the same as before
filepath = os.path.join(tmpdir, "{}.tex".format(filename))
pdfpath = os.path.join(tmpdir, "{}.pdf".format(filename))
if os.path.isfile(pdfpath):
os.remove(pdfpath)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(cur_content)
return True
def clean_log(filename: str):
logpath = os.path.join(tmpdir, "{}.log".format(filename))
if os.path.isfile(logpath):
os.remove(logpath)
def header_cmd(jobname: str, compiler: str):
ecompiler = "xetex" if compiler == "xelatex" else "etex"
return '{} -ini -interaction=nonstopmode -halt-on-error -jobname={} "&{}" mylatexformat.ltx """{}.tex"""'.format(
ecompiler, jobname, compiler, jobname)
def compile_header(cur_header: str, compiler: str, sessid: str):
header_name = get_header_name(sessid)
if same_or_write(header_name, cur_header):
clean_log(header_name)
clean_log(get_body_name(sessid))
run_cmd(header_cmd(header_name, compiler))
def body_cmd(jobname: str, compiler: str):
return '{} -interaction=nonstopmode -halt-on-error {}.tex'.format(compiler, jobname)
def compile_body(cur_body: str, compiler: str, sessid: str):
body_name = get_body_name(sessid)
if same_or_write(body_name, cur_body):
clean_log(body_name)
run_cmd(body_cmd(body_name, compiler))
def clean_files(sessid: str, pdf: bool):
for filepath in glob.glob("{}/{}*".format(tmpdir, sessid)):
if not (filepath.endswith(".tex") or filepath.endswith(".log") or filepath.endswith(".fmt")):
if not (not pdf and filepath.endswith(".pdf")):
os.remove(filepath)
def tex_length_limit_hook(tex: str):
pass
def compile_tex(tex: str, compiler: str, sessid: str):
tex_length_limit_hook(tex)
tex_header, tex_body = get_header_body(tex, compiler, sessid)
try:
if tex_header is not None:
compile_header(tex_header, compiler, sessid)
fmtpath = os.path.join(tmpdir, "{}.fmt".format(get_header_name(sessid)))
if os.path.isfile(fmtpath):
compile_body(tex_body, compiler, sessid)
pdfpath = os.path.join(tmpdir, "{}.pdf".format(get_body_name(sessid)))
if os.path.isfile(pdfpath):
with open(pdfpath, 'rb') as f:
pdf = f.read()
clean_files(sessid, pdf=False)
return pdf
except subprocess.TimeoutExpired as e:
clean_files(sessid, pdf=True)
app.logger.warning("Compilation timeout for session {}: {}".format(sessid, e))
raise Exception("Compilation timeout.")
except Exception as e:
clean_files(sessid, pdf=True)
app.logger.warning("Compilation error for session {}: {}".format(sessid, e))
raise Exception("Compilation Error.")
clean_files(sessid, pdf=True)
app.logger.warning("Compilation error for session {}".format(sessid))
return None
def get_log(sessid: str):
for logpath in [
os.path.join(tmpdir, "{}.log".format(get_body_name(sessid))),
os.path.join(tmpdir, "{}.log".format(get_header_name(sessid)))
]:
if os.path.isfile(logpath):
with open(logpath, "r", encoding='utf-8') as f:
return f.read()
return "Compilation Failure"
# A dict for storing compiling session id
compiling_sessions = dict()
def reqid_hook(reqid: str):
return reqid
def get_reqid(request):
try:
reqid = request.form['requestid']
reqtex = request.form['texdata']
reqcompiler = request.form['compiler']
reqid = int(reqid)
assert reqid >= 0, "reqid must be a non-negative integer."
assert reqcompiler in ["pdflatex", "xelatex"], "Invalid compiler."
reqid = str(reqid)
except Exception as e:
app.logger.warning("Invalid request: {}".format(e))
return None
return reqid_hook(reqid)
def avail_hook():
return True
@app.route('/', methods=['GET'])
def index():
return send_from_directory(rootdir, "index.html")
@app.route('/compile', methods=['GET', 'POST'])
def compile():
if not avail_hook():
app.logger.warning("Server is currently downgraded, number of occupied workers: {}".format(len(compiling_sessions.keys())))
return "PGFPlotsEdt Server is currently downgraded.", 503
if request.method == 'POST':
reqid = get_reqid(request)
if reqid is None:
return render_template_string("Invalid request.")
if reqid not in compiling_sessions.keys():
compiling_sessions[reqid] = 1
try:
pdf = compile_tex(request.form['texdata'], request.form['compiler'], reqid)
except Exception as e:
compiling_sessions.pop(reqid)
return render_template_string("{} {}".format(e, time.strftime("%Y-%m-%d %H:%M:%S")))
if pdf is not None:
res = Response(
pdf,
mimetype="application/pdf"
)
else:
res = Response(
get_log(reqid),
mimetype="text/plain"
)
compiling_sessions.pop(reqid)
return res
else:
app.logger.warning(
"Previous run of Session {} has not been finished. The request is discarded.".format(reqid))
return render_template_string("Previous run has not been finished.")
else:
return render_template_string("PGFPlotsEdt LaTeX Server: POST a LaTeX request (texdata, compiler, requestid) to render.\n")
def llm_hook(code, prompt):
return code
def llm_test():
return "PGFPlotsEdt LLM Server is not available.", 503
@app.route('/llm', methods=['GET', 'POST'])
def code_llm():
if request.method == 'POST':
llm_input = request.get_json()
return Response(
llm_hook(llm_input['code'], llm_input['prompt']), # could be a function with a generator
mimetype="application/octet-stream" # make chrome happy, update to change
)
else:
llm_string, llm_status = llm_test()
return Response(llm_string, status=llm_status, mimetype="text/plain")
if __name__ == '__main__':
# Clean up the tmpdir and create a new one.
if os.path.isdir(tmpdir):
shutil.rmtree(tmpdir)
os.mkdir(tmpdir)
ver = write_version_info(os.path.join(rootdir, "res"))
print("PGFPlotsEdt {}".format(ver))
app.run(host="127.0.0.1", port=5678)