|
|
|
from flask import Flask, request, jsonify, render_template, stream_with_context, Response |
|
import os |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
import sys |
|
import logging |
|
|
|
app = Flask(__name__) |
|
logging.basicConfig(level=logging.DEBUG) |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
os.chmod(temp_dir, 0o777) |
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
def execute_shell_command(command): |
|
"""Executes a shell command and streams output with improved error handling.""" |
|
try: |
|
|
|
if command.startswith('git clone'): |
|
|
|
process = subprocess.Popen( |
|
command, |
|
shell=True, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
cwd=temp_dir, |
|
env={'GIT_TERMINAL_PROMPT': '0'} |
|
) |
|
else: |
|
process = subprocess.Popen( |
|
command, |
|
shell=True, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
cwd=temp_dir |
|
) |
|
|
|
|
|
while True: |
|
output = process.stdout.readline() |
|
error = process.stderr.readline() |
|
|
|
if output: |
|
yield f"{output}<br>" |
|
if error: |
|
yield f"<span style='color: red'>Error: {error}</span><br>" |
|
|
|
|
|
if output == '' and error == '' and process.poll() is not None: |
|
break |
|
|
|
|
|
return_code = process.poll() |
|
if return_code != 0: |
|
yield f"<span style='color: red'>Command failed with return code {return_code}</span><br>" |
|
else: |
|
yield "Command completed successfully.<br>" |
|
|
|
except Exception as e: |
|
yield f"<span style='color: red'>Error executing command: {str(e)}</span><br>" |
|
finally: |
|
if process: |
|
process.stdout.close() |
|
process.stderr.close() |
|
|
|
@app.route("/execute", methods=["POST"]) |
|
def execute_code(): |
|
command = request.json.get("code", "").strip() |
|
if not command: |
|
return jsonify({"result": "Error: No command provided."}) |
|
|
|
try: |
|
if command.startswith("!"): |
|
shell_command = command[1:] |
|
|
|
app.logger.debug(f"Executing shell command: {shell_command}") |
|
return Response( |
|
stream_with_context(execute_shell_command(shell_command)), |
|
content_type='text/html' |
|
) |
|
else: |
|
|
|
process = subprocess.run( |
|
[sys.executable, "-c", command], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
cwd=temp_dir |
|
) |
|
return jsonify({"result": process.stdout + process.stderr}) |
|
except Exception as e: |
|
app.logger.error(f"Error executing command: {str(e)}") |
|
return jsonify({"result": f"Error: {str(e)}"}) |
|
|
|
@app.route("/cleanup", methods=["POST"]) |
|
def cleanup(): |
|
global temp_dir |
|
try: |
|
if os.path.exists(temp_dir): |
|
shutil.rmtree(temp_dir, ignore_errors=True) |
|
temp_dir = tempfile.mkdtemp() |
|
os.chmod(temp_dir, 0o777) |
|
return jsonify({"result": "Temporary files cleaned up."}) |
|
except Exception as e: |
|
return jsonify({"result": f"Error during cleanup: {str(e)}"}) |
|
|
|
if __name__ == "__main__": |
|
|
|
os.chmod(temp_dir, 0o777) |
|
app.run(host="0.0.0.0", port=7860, debug=True) |