import base64 import cgi import cgitb import datetime import json import os import re import shutil import socket import sys import time import urllib.parse from tempfile import TemporaryFile import flask from flask import Blueprint, Flask, g, request, session import pycurl scalyfm = Blueprint("fm", __name__) # Default Configuration CONFIG = { "lang":"en", "error_reporting": False, "show_hidden": True, "hide_Cols": False, "calc_folder": False } # TODO Artifact from PHP port. GLOBALS = {} # TFM version VERSION = '2.3.8' # Application Title APP_TITLE = 'Tiny File Manager' # Auth with login/password (set true/false to enable/disable it) # Is independent from IP white- and blacklisting use_auth = True # Users: array('Username' => 'Password', 'Username2' => 'Password2', ...) # Generate secure password hash - https://tinyfilemanager.github.io/docs/pwd.html auth_users = { 'admin': '$2y$10$/K.hjNr84lLNDt8fTXjoI.DBp6PpeyoJ.mGwrrLuCZfAwfSAGqhOW', # admin@123 'user': '$2y$10$Fg6Dz8oH9fPoZ2jJan5tZuv6Z4Kp7avtQ9bDfrdRntXtPeiMAZyGO' # 12345 } # Readonly users (username array) readonly_users = [ 'user' ] # Possible rules are 'OFF', 'AND' or 'OR' # OFF => Don't check connection IP, defaults to OFF # AND => Connection must be on the whitelist, and not on the blacklist # OR => Connection must be on the whitelist, or not on the blacklist ip_ruleset = 'OFF' # Should users be notified of their block? ip_silent = True # IP-addresses, both ipv4 and ipv6 ip_whitelist = [ '127.0.0.1', # local ipv4 '::1' # local ipv6 ] # IP-addresses, both ipv4 and ipv6 ip_blacklist = [ '0.0.0.0', # non-routable meta ipv4 '::' # non-routable meta ipv6 ] # user specific directories # array('Username' => 'Directory path', 'Username2' => 'Directory path', ...) directories_users = {} # Enable highlight.js (https:# highlightjs.org/) on view's page use_highlightjs = True # highlight.js style highlightjs_style = 'vs' # Enable ace.js (https:# ace.c9.io/) on view's page edit_files = True # Default timezone for date() and time() - http:# php.net/manual/en/timezones.php default_timezone = 'Etc/UTC' # UTC # Root path for file manager # use absolute path of directory i.e: '/var/www/folder' or $_SERVER['DOCUMENT_ROOT'].'/folder' # TODO still not sure this is right... Doesn't this allow the fm to see the script, meaning # that someone could reclaim the source code and thus the auth_users dict? root_path = os.path.dirname(os.path.abspath(__file__)) # $_SERVER['DOCUMENT_ROOT']; # Root url for links in file manager. Relative to $http_host. Variants: '', 'path/to/subfolder' # Will not working if $root_path will be outside of server document root root_url = '' # Server hostname. Can set manually if wrong http_host = socket.getfqdn() # $_SERVER['HTTP_HOST']; # input encoding for iconv iconv_input_encoding = 'UTF-8' # date() format for file modification date datetime_format = "%d.%m.%y %H.%M" # allowed file extensions for upload and rename # e.g. 'gif,png,jpg' allowed_extensions = '' # Favicon path. This can be either a full url to an .PNG image, or a path based on the document root. # full path, e.g http:# example.com/favicon.png # local path, e.g images/icons/favicon.png favicon_path = '?img=favicon' # Array of files and folders excluded from listing # e.r array('myfile.html', 'personal-folder') GLOBALS['exclude_items'] = [] # Online office Docs Viewer # Availabe rules are 'google', 'microsoft' or false # google => View documents using Google Docs Viewer # microsoft => View documents using Microsoft Web Apps Viewer # false => disable online dov viewer GLOBALS['online_viewer'] = 'google' # Sticky Nav bar # true => enable sticky header # false => disable sticky header sticky_navbar = True # max upload file size MAX_UPLOAD_SIZE = '2048' # available languages lang_list = { 'en': 'English' } _SERVER = {} # TODO Artifact from port; is this needed? class StdClass: def __init__(self): self.name = None self.type = None class FM_Config: """Load and save configuration data.""" def __init__(self): global root_path, root_url, CONFIG fm_url = root_url + _SERVER["PHP_SELF"] self.data = { "lang": "en", "error_reporting": True, "show_hidden": False } data = None try: data = json.loads(CONFIG) # TODO except (IOError, json.JSONDecodeError): msg = "Tiny File Manager
Errror: Cannot load configuration" if fm_url[-1] == "/": fm_url = fm_url.rstrip("/") msg += "
" msg += "
Seems like you have a trailing slash on the URL." msg += "
Try this link: {0}".format(fm_url) flask.flash(msg) if data is not None: self.data = data else: self.save() def save(self): fm_file = root_url + _SERVER["PHP_SELF"] current_config = None with open(fm_file) as fobj: current_config = json.load(fobj) current_config.update(self.data) with open(fm_file, "w") as fobj: json.dump(current_config, fobj) # Configuration cfg = FM_Config() # TODO # Default language lang = cfg.data.get("lang", "en") # Show or hide files and folders that starts with a dot show_hidden_files = cfg.data.get("show_hidden", True) # PHP error reporting - false = Turns off Errors, true = Turns on Errors report_errors = cfg.data.get('error_reporting', True) # Hide Permissions and Owner cols in file-listing hide_Cols = cfg.data.get('hide_Cols', True) # Show Dirsize: true or speedup output: false calc_folder = cfg.data.get('calc_folder', True) # **** HELPER FUNCTIONS *************************************************************** def fm_enc(*args): pass def fm_rdelete(*args): pass def fm_set_msg(*args): pass def fm_clean_path(path, trim=True): """Ensure that the given path is valid and free from extra slashes.""" if trim: path = path.strip() path = path.strip("\\/") # TODO path = path.replace("../", "").replace("..\\", "") # TODO if path == "..": path = "" return path.replace("\\", "/") def fm_redirect(url, code=302): print("302 Found") # TODO Where to put code? print('Content-Type: text/html') print('Location: %s' % url) print() # HTTP says you have to have a blank line between headers and content print('') print(' ') print(' ' % url) print(' You are going to be redirected') print(' ' ) print(' ') print(' Redirecting... Click here if you are not redirected' % url) print(' ') print('') sys.exit(0) # TODO This entire function def fm_get_images(): """Get base64-encoded images.""" return { 'favicon': """Qk04AgAAAAAAADYAAAAoAAAAEAAAABAAAAABABAAAAAAAAICAAASCwAAEgsAAAAAAAAAAAAAIQQhBCEEIQQhBCEEIQQhBCEEIQ QhBCEEIQQhBCEEIQQhBCEEIQQhBHNO3n/ef95/vXetNSEEIQQhBCEEIQQhBCEEIQQhBCEEc07ef95/3n/ef95/1lohBCEEIQQhBCEEIQQhBCEEIQ RzTt5/3n8hBDFG3n/efyEEIQQhBCEEIQQhBCEEIQQhBHNO3n/efyEEMUbef95/IQQhBCEEIQQhBCEEIQQhBCEErTVzTnNOIQQxRt5/3n8hBCEEIQ QhBCEEIQQhBCEEIQQhBCEEIQQhBDFG3n/efyEEIQQhBCEEIQQhBCEEIQQhBCEEIQQxRt5/3n+cc2stIQQhBCEEIQQhBCEEIQQhBCEEIQQIIZxz3n /ef5xzay0hBCEEIQQhBCEEIQQhBCEEIQQhBCEEIQQhBDFG3n/efyEEIQQhBCEEIQQhBCEEIQQhBK01c05zTiEEMUbef95/IQQhBCEEIQQhBCEEIQ QhBCEEc07ef95/IQQxRt5/3n8hBCEEIQQhBCEEIQQhBCEEIQRzTt5/3n8hBDFG3n/efyEEIQQhBCEEIQQhBCEEIQQhBKUUOWfef95/3n/ef95/IQ QhBCEEIQQhBCEEIQQhBCEEIQQhBJRW3n/ef95/3n8hBCEEIQQhBCEEIQQhBCEEIQQhBCEEIQQhBCEEIQQhBCEEIQQhBCEEIQQAAA==""" } # TODO this entire function def fm_show_images(img_name): fstr = "%A, %d %M %Y %h:%m:%s GMT" now = datetime.datetime.now() base = datetime.datetime(now.year, now.month, now.day, 0, 0, 0) mtime = base.strftime(fstr) etime = (base + datetime.timedelta(day=1)).strftime(fstr) img_name = img_name.strip() images = fm_get_images() default_image = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAEElEQVR42mL4//8/A0CAAQAI/AL+26JNFgAAAABJRU5ErkJggg==' image = images.get(img_name, default_image) image = base64.b64decode(image) size = len(image) print("Cache-Control:") print("Pragma:") print(f"Last-Modified: {mtime}") print(f"Expires: {etime}") print(f"Content-Length: {size}") print(f"Content-Type: image/png") print(image) def password_verify(*args): pass # TODO def password_hash(*args): pass def fm_show_footer_login(*args): pass def fm_show_header_login(*args): pass def fm_show_message(*args): pass def fm_get_translations(*args): pass def fm_set_message(*args): pass def trigger_error(*args): pass def fm_rcopy(*args): pass def fm_rename(*args): pass # **** ROUTES *************************************************************** @scalyfm.route("/") def scaly_root(): global _SERVER, lang, show_hidden_files # NOTE: Kludgy, but makes the port easier. # See: https://flask.palletsprojects.com/en/1.1.x/api/#flask.Request.url_root _SERVER = { "DOCUMENT_ROOT": os.path.dirname(os.path.abspath(__file__)), "PHP_SELF": request.root, "REMOTE_ADDR": request.remote_addr } # TODO Wtf is this E_USER_WARNING = 0 # TODO # if report_errors: # ini_set('error_reporting', E_ALL); # ini_set('display_errors', 1); # else: # @ini_set('error_reporting', E_ALL); # @ini_set('display_errors', 0); # TODO if fm included # if FM_EMBED: # use_auth = False; # sticky_navbar = False; # else: # @set_time_limit(600); # date_default_timezone_set($default_timezone); # ini_set('default_charset', 'UTF-8'); # if (version_compare(PHP_VERSION, '5.6.0', '<') && function_exists('mb_internal_encoding')) { # mb_internal_encoding('UTF-8'); # } # if (function_exists('mb_regex_encoding')) { # mb_regex_encoding('UTF-8'); # } # session_cache_limiter(''); # session_name(FM_SESSION_ID ); # @session_start(); use_auth = not (auth_users == {}) is_https = request.url.startswith("https") # update $root_url based on user specific directories # TODO may not be needed with routing if session["logged"] is not None and directories_users[session["logged"]] != {}: wd = fm_clean_path(os.path.dirname(_SERVER["PHP_SELF"])) root_url = root_url + wd + os.sep + directories_users[session["logged"]] # clean root_url root_url = fm_clean_path(root_url) # abs path for site if not FM_ROOT_URL: FM_ROOT_URL = "{}://{}".format( "https" if is_https else "http", "/" if not root_url else root_url ) if not FM_SELF_URL: FM_SELF_URL = "{}://{}".format( "https" if is_https else "http", _SERVER["PHP_SELF"] ) # logout if "logout" in request.args: # TODO Unset session? flask.redirect(FM_SELF_URL) # Show image here if "img" in request.args: fm_show_image(request.get.args("img")) # Validate connection IP if ip_ruleset: client_ip = _SERVER["REMOTE_ADDR"] proceed = False whitelisted = client_ip in ip_whitelist blacklisted = client_ip in ip_blacklist if ip_ruleset == "AND": if whitelisted and not blacklisted: proceed = True elif ip_ruleset == "OR": if whitelisted and not blacklisted: proceed = True if not proceed: trigger_error(f'User connection denied from: {client_ip}', E_USER_WARNING) if not ip_silent: fm_set_msg('Access denied. IP restriction applicable', 'error') fm_show_header_login() fm_show_message() # exit(); # Auth if use_auth: uname = request.post.args("fm_usr") pwd = request.post.args("fm_pwd") if "logged" in session and session["logged"] in auth_users: # TODO # Logged in, don't do anything pass elif uname is not None and pwd is not None: # Logging In time.sleep(1) # TODO Why? preventing bruteforce? if uname in auth_users and password_verify(pwd, auth_users[uname]): session["logged"] = uname fm_set_msg("You are logged in!") flask.redirect(FM_SELF_URL + "?p=") else: del session["logged"] fm_set_msg("Login failed; Invalid username or password", "error") flask.redirect("FM_SELF_URL") else: # Form del session["logged"] # TODO ?? fm_show_header_login() fm_show_message() print("""

""") fm_show_footer_login() # exit(); # update root path if use_auth and "logged" in session: root_path = directories_users.get(session["logged"], root_path) # clean and check $root_path root_path = root_path.rstrip("\\/") root_path = root_path.replace('\\', '/') if not os.path.isdir(root_path): print(f"

Root path \"{root_path}\" not found!

") return if not FM_SHOW_HIDDEN: FM_SHOW_HIDDEN = show_hidden_files if not FM_ROOT_PATH: FM_ROOT_PATH = root_path if not FM_LANG: FM_LANG = lang if not FM_EXTENSION: FM_EXTENSION = allowed_extensions FM_READONLY = use_auth and readonly_users != [] and "logged" in session and session["logged"] in readonly_users FM_IS_WIN = (os.name == "nt") # always use ?p= if not request.get.args("p") and not request.files: fm_redirect(FM_SELF_URL + "?p=") # get path p = request.get.args("p", "") # clean path p = fm_clean_path(p) # for ajax request - save # TODO # $input = file_get_contents('php://input'); # $_POST = (strpos($input, 'ajax') != FALSE && strpos($input, 'save') != FALSE) ? json_decode($input, true) : $_POST; # instead globals vars FM_PATH = p FM_USE_AUTH = use_auth FM_EDIT_FILE = edit_files if not FM_ICONV_INPUT_ENC: FM_ICONV_INPUT_ENC = iconv_input_encoding if not FM_USE_HIGHLIGHTJS: FM_USE_HIGHLIGHTJS = use_highlightjs if not FM_HIGHLIGHTJS_STYLE: FM_HIGHLIGHTJS_STYLE = highlightjs_style if not FM_DATETIME_FORMAT: FM_DATETIME_FORMAT = datetime_format # TODO ... what's the point? del p del use_auth del iconv_input_encoding del use_highlightjs del highlightjs_style # *************************** ACTIONS *************************** # AJAX Request if request.post.args("ajax") is not None and not FM_READONLY: # save if request.post.args("type") == "save": # get current path path = FM_ROOT_PATH if FM_PATH != '': path += '/' + FM_PATH # check path if not os.path.isdir(path): flask.redirect(FM_SELF_URL + '?p=') file = request.get.args("edit") file = fm_clean_path(file) file = file.replace("/", "") if file == '' or not os.path.isfile(path + os.sep + file): flask.flash('File not found', 'error') flask.redirect(FM_SELF_URL + '?p=' + urllib.parse.urlencode(FM_PATH)) print('X-XSS-Protection: 0') file_path = path + '/' + file writedata = request.post.args("content") with open(file_path, "w") as fobj: fobj.write(writedata) flask.flash('successful save!', 'alert') # die(true); # TODO return # backup files if request.post.args("type") == "backup": file = request.post.args("file") path = request.post.args("path") date = datetime.datetime.now().strftime("%d%M%Y-%H%i%s") # TODO new_file = f"{file}-{date}.bak" shutil.copy(path + os.sep + file, path + os.sep + new_file) # or die() # TODO print("Backup {new_file} created!") # TODO # Save Config if request.post.args("type") == "settings": global cfg, report_errors, lang_list, hide_cols, calc_folder new_lang = request.post.args("js-language") fm_get_translations([]) # TODO if new_lang not in lang_list: new_lang = "en" erp = bool(request.post.args("js-error-report")) shf = bool(request.post.args("js-show-hidden")) hco = bool(request.post.args('js-hide-cols')) caf = bool(request.post.args('js-calc-folder')) # TODO Wtf is the point? cfg.data.update({ "lang": new_lang, "error_reporting": erp, "show_hidden": shf, "hide_Cols": hco, "calc_folder": caf, }) # TODO Why? lang = new_lang report_errors = erp show_hidden_files = shf hide_cols = hco calc_folder = caf # new password hash if request.post.args("type") == "pwdhash": print(password_hash(request.post.args("inputPassword2"))) # TODO # upload using url # TODO if request.post.args("type") == "upload" and request.post.args("uploadurl") is not None: path = FM_ROOT_PATH if FM_PATH != '': path += "/" + FM_PATH url = None upload_url = request.post.args("uploadurl") if upload_url is not None and re.match(r"^http(s)?://.+$", upload_url.rstrip("/")): url = upload_url.rstrip("/") use_curl = False temp_file = TemporaryFile(prefix="upload-") fileinfo = { "type": None, # TODO Does this do what I actually think it does? # $fileinfo->name = trim(basename($url), ".\x00..\x20"); "name": os.path.basename(url).strip(".\x00..\x20") } def event_callback(message): # global callback # TODO ? print(json.dumps(message)) def get_file_path(): return path + os.sep + os.path.basename(fileinfo["name"]) err = False if not url: success = False elif use_curl: with open(temp_file, "w") as fp: ch = pycurl.Curl() ch.setopt(ch.URL, url) ch.setopt(ch.CURLOPT_NOPROGRESS, False) ch.setopt(ch.CURLOPT_FOLLOWLOCATION, True) # NOTE: changed from CURLOPT_FILE in PHP ch.setopt(ch.CURLOPT_WRITEDATA, fp) success = ch.exec() curl_info = ch.getinfo() if not success: err = {"message": ch.curl_error() } ch.close() fileinfo.size = curl_info["size_download"] fileinfo.type = curl_info["content_type"] else: # NOTE: Original code: # # $ctx = stream_context_create(); # @$success = copy($url, $temp_file, $ctx); # if (!$success) { # $err = error_get_last(); # } # # I think that this just downloads a file from url and puts it in temp_file... try: resp = urllib.request.urlopen(url) if resp.status != 200: raise RuntimeError(f"HTTP request failed with error code {resp.status}") temp_file.write(resp.read()) except (urllib.error.HTTPError, RuntimeError) as e: err = repr(e) # TODO if success: # TODO This is... weird. Better way to do this? I'm not sure it will even work shutil.copy(temp_file.name, get_file_path()) if success: event_callback({"done": fileinfo}) else: temp_file.close() # unlink($temp_file); if err: err = {"message": "Invalid url parameter"} event_callback({"fail": err}) return # Delete file / folder if "del" not in request.args and not FM_READONLY: _del = fm_clean_path(request.args("del")).replace("/", "") if _del not in (" ", "..", "."): path = FM_ROOT_PATH if FM_PATH != "": path += os.sep + FM_PATH is_dir = os.path.isdir(path + os.sep + _del) if fm_rdelete(path + os.sep + _del): msg = 'Folder {} deleted' if is_dir else 'File {} deleted' fm_set_msg(msg.format(fm_enc(_del))) else: msg = 'Folder {} not deleted' if is_dir else 'File {} not deleted' fm_set_msg(msg.format(fm_enc(_del)), 'error') else: fm_set_msg('Wrong file or folder name', 'error') flask.redirect(FM_SELF_URL + '?p=' + urllib.parse.urlencode(FM_PATH)) # Copy folder / file if ("copy" in request.args or "finish" in request.args) and not FM_READONLY: # FROM copy = fm_clean_path(request.args.get("copy")) # Check that it's not empty' if not copy: fm_set_msg("Source path is not defined", "error") flask.redirect(FM_SELF_URL + "?p=" + urllib.parse.urlencode(FM_PATH)) # Get source absolute path _from = FM_ROOT_PATH + "/" + copy # Get dest absolute path dest = FM_ROOT_PATH if FM_PATH != "": dest += "/" + FM_PATH dest += "/" + os.path.basename(_from) # Check if we can move move = "move" in request.args # Copy or move the file if _from != dest: msg_from = FM_PATH + os.sep + os.path.basename(_from) msg_from = msg_from.strip(os.sep) if move: rename = fm_rename(_from, dest) if rename is not None: fm_set_msg('Moved from {} to {}'.format(fm_enc(copy), fm_enc(msg_from))) elif rename is None: fm_set_msg('File or folder with this path already exists', 'alert') else: fm_set_msg('Error while moving from {} to {}'.format(fm_enc(copy), fm_enc(msg_from))) else: if fm_rcopy(_from, dest): fm_set_msg('Copied from {} to {}'.format(fm_enc(copy), fm_enc(msg_from))) else: fm_set_msg("Error while copying from {} to {}".format(fm_enc(copy), fm_enc(msg_from)), "error") else: fm_set_msg('Paths must be not equal', 'alert') fm_redirect(FM_SELF_URL + "?p=" + urllib.parse.urlencode(FM_PATH))