#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014 Microsoft Corporation. All rights reserved. # Released under Apache 2.0 license as described in the file LICENSE. # # Author: Soonho Kong # # Python 2/3 compatibility from __future__ import print_function import python_lib.six as six from python_lib.six.moves import filter, map import argparse import fnmatch import glob import logging import logging.handlers import os import platform import shutil import stat import subprocess import sys import tempfile import threading from python_lib.six.moves import urllib # Enforce subprocesses to use 'utf-8' in Python 2 if six.PY2: reload(sys) sys.setdefaultencoding("utf-8") # Fixate the path separator as '\' on Windows Platform # even if users are on CYGWIN/MSYS2 environment if platform.system() == "Windows": os.path.sep = "\\" # Reset MSYSTEM environment variable to enforce native-WINDOWS # behavior to subprocesses. if "MSYSTEM" in os.environ: del os.environ["MSYSTEM"] if platform.system().startswith("MSYS"): # In MSYS platform, realpath has a strange behavior. # os.path.realpath("c:\a\b\c") => \:\a\b\c g_linja_path = os.path.abspath(os.path.normpath(__file__)) else: g_linja_path = os.path.abspath(os.path.realpath(__file__)) g_lean_bin_dir = os.path.dirname(g_linja_path) g_phony_targets = ["all", "clean", "tags", "clear-cache"] g_project_filename = ".project" g_lean_path = "USE DEFAULT" # System will search automatically g_leantags_path = "USE DEFAULT" # System will search automatically g_ninja_path = "USE DEFAULT" # System will search automatically g_flycheck_header = "FLYCHECK_BEGIN" g_flycheck_footer = "FLYCHECK_END" g_logger = logging.getLogger('linja_logger') g_debug_mode = False @six.python_2_unicode_compatible class FlycheckItem: def __init__(self, filename, lineno, colno, ty, msg): self.filename = filename self.lineno = lineno self.colno = colno self.ty = ty self.msg = msg pass def __str__(self): ret = g_flycheck_header + " " + self.ty.upper() + "\n" ret += "%s:%d:%d: %s: %s" % (self.filename, self.lineno, self.colno, self.ty, self.msg) + "\n" ret += g_flycheck_footer return ret def __lt__(self, other): return (self.filename, self.ty, self.lineno, self.colno) < (other.filename, other.ty, other.lineno, other.colno) def loc(self): return (self.filename, self.ty, self.lineno, self.colno) @classmethod def fromString(cls, text): # Throw the first and last lines (header/footer) lines = text.splitlines() lines = lines[1:-1] firstLine = lines[0] items = [item.strip() for item in firstLine.split(":")] filename = items[0] lineno = int(items[1]) colno = int(items[2]) ty = items[3] msg = ":".join(items[4:]) + "\n" + "\n".join(lines[1:]) msg = msg.strip() return cls(filename, lineno, colno, ty, msg) @six.python_2_unicode_compatible class FlycheckItemList: def __init__(self, items): self.items = items def __str__(self): return "\n".join([six.text_type(item) for item in self.items]) def __getitem__(self, i): return self.items[i] def __len__(self): return len(self.items) def sort(self): self.items = sorted(self.items) def filter(self, pred): self.items = list(filter(pred, self.items)) def append(self, item): self.items.append(item) def truncate(self, n): del self.items[n:] def removeExtraItemsStartswith(self, text): self.sort() newItems = self.items[:1] i = 1 while i < len(self.items): prev_item = self.items[i-1] cur_item = self.items[i] if not cur_item.msg.startswith(text) or prev_item.loc() != cur_item.loc(): newItems.append(cur_item) i += 1 self.items = newItems @classmethod def fromString(cls, text): items = [] tmpBuffer = "" ignore = True for line in text.splitlines(): if line.startswith(g_flycheck_header): tmpBuffer = tmpBuffer + line + "\n" ignore = False elif line.startswith(g_flycheck_footer): tmpBuffer = tmpBuffer + line + "\n" items.append(FlycheckItem.fromString(tmpBuffer.strip())) tmpBuffer = "" ignore = True elif not ignore: tmpBuffer = tmpBuffer + line + "\n" return cls(items) def init_logger(): formatter = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s') streamHandler = logging.StreamHandler() streamHandler.setFormatter(formatter) g_logger.addHandler(streamHandler) if g_debug_mode == True: fileHandler = logging.FileHandler('./linja.log') fileHandler.setFormatter(formatter) g_logger.addHandler(fileHandler) def log(msg): print(msg, file=sys.stderr) def log_nonewline(msg): print(("\r%s" % msg), end=' ', file=sys.stderr) sys.stderr.flush() def error(msg): log("Error: %s" % msg) exit(1) class LinjaException(Exception): """Custom Exception""" def __init__(self, msg): self.msg = msg def __str__(self): return self.msg def give_exec_permission(filename): """chmod +x filename""" st = os.stat(filename) os.chmod(filename, st.st_mode | stat.S_IEXEC) def show_download_progress(*data): file_size = int(data[2]/1000) total_packets = data[2]/data[1] downloaded_packets = data[0] log_nonewline("Download: size\t= %i kb, packet: %i/%i" % (file_size, downloaded_packets, total_packets+1)) def get_ninja_url(): prefix = "https://leanprover.github.io/bin/" if platform.architecture()[0] == "64bit": if platform.system() == "Linux": return prefix + "ninja-1.5.1-linux-x86_64" elif platform.system() == "Windows": return prefix + "ninja-1.5.1-win.exe" elif platform.system() == "Darwin": return prefix + "ninja-1.5.1-osx" if platform.architecture()[0] == "32bit": if platform.system() == "Linux": return prefix + "ninja-1.5.1-linux-i386" elif platform.system() == "Windows": pass # TODO(soonhok): add support elif platform.system() == "Darwin": pass # TODO(soonhok): add support error("we do not have ninja executable for this platform: %s" % platform.platform()) def build_ninja_and_save_at(ninja_path, platform): saved_current_dir = os.getcwd() tempdir = tempfile.mkdtemp() build_dir = os.path.join(tempdir, "ninja") built_ninja_path = os.path.join(build_dir, "ninja") cmd_clone_ninja = ["git", "clone", "git://github.com/martine/ninja.git"] cmd_checkout_release = ["git", "checkout", "release"] cmd_bootstrap = [os.path.join(build_dir, "bootstrap.py"), "--platform", platform] try: os.chdir(tempdir) if subprocess.call(cmd_clone_ninja): raise LinjaException("Failed to clone ninja repository") os.chdir(build_dir) if subprocess.call(cmd_checkout_release): raise LinjaException("Failed to checkout release branch of ninja") if subprocess.call(cmd_bootstrap): raise LinjaException("Failed to build ninja") shutil.copy2(built_ninja_path, ninja_path) except IOError as e: error(e) except LinjaException as e: error(e) finally: os.chdir(saved_current_dir) shutil.rmtree(tempdir) return ninja_path def download_ninja_and_save_at(ninja_path): if platform.system().startswith("CYGWIN"): return build_ninja_and_save_at(ninja_path, "linux") else: url = get_ninja_url() log("Downloading ninja: %s ===> %s\n" % (url, ninja_path)) urllib.request.urlretrieve(url, ninja_path, show_download_progress) log("\n") if not os.path.isfile(ninja_path): error("failed to download ninja executable from %s" % url) give_exec_permission(ninja_path) return ninja_path def find_file_upward(name, path = os.getcwd()): project_file = os.path.join(path, name) if os.path.isfile(project_file): return path up = os.path.dirname(path) if up != path: return find_file_upward(name, up) return None def escape_ninja_char(name): return name.replace(":", "$:") def normalize_drive_name(name): if platform.system() == "Windows": drive, path = os.path.splitdrive(name) if drive == None: return name else: # Leo: return drive.lower() + path return path else: return name def process_target(target): if target in g_phony_targets: return target return normalize_drive_name(os.path.abspath(target)) def which(program): """ Lookup program in a path """ import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def find_location(exec_name): pathname = which(exec_name) or os.path.join(g_lean_bin_dir, exec_name) pathname = os.path.abspath(pathname) return pathname def check_requirements(): global g_lean_path, g_leantags_path, g_ninja_path leantags_exec_name = "leantags" lean_exec_name = "lean" ninja_exec_name = "ninja" if platform.system() == "Windows" or platform.system().startswith("MSYS"): lean_exec_name = "lean.exe" ninja_exec_name = "ninja.exe" if g_lean_path == "USE DEFAULT": g_lean_path = find_location(lean_exec_name) if g_leantags_path == "USE DEFAULT": g_leantags_path = find_location(leantags_exec_name) if g_ninja_path == "USE DEFAULT": g_ninja_path = find_location(ninja_exec_name) if not os.path.isfile(g_lean_path): error("cannot find lean executable at " + os.path.abspath(g_lean_path)) if not os.path.isfile(g_leantags_path): error("cannot find leantags executable at " + os.path.abspath(g_leantags_path)) if not os.path.isfile(g_ninja_path): g_ninja_path = download_ninja_and_save_at(g_ninja_path) def process_args(args): if (args.flycheck == False and args.flycheck_max_messages != None): error("Please use --flycheck option with --flycheck-max-messages option.") args.targets = list(map(process_target, args.targets)) if args.directory: os.chdir(args.directory) args.project_dir = find_file_upward(g_project_filename) if args.project_dir: os.chdir(args.project_dir) if args.cache: args.cache = process_target(args.cache) if len(args.targets) != 1: error("--cache option can only be used with one target") if not args.cache.endswith(".lean") and not args.cache.endswith(".hlean"): error("cache argument has to end with .lean or .hlean") if args.cache.endswith(".lean"): args.cache = args.cache[:-4] + "clean" else: args.cache = args.cache[:-5] + "clean" args.phony_targets = list(set(g_phony_targets) & set(args.targets)) if args.verbose: g_logger.setLevel(logging.INFO) return args def get_lean_options(args): args.lean_options = [] if args.flycheck: args.lean_options.append("--flycheck") if args.discard: args.lean_options.append("--discard") if args.memory: args.lean_options += ["-M", args.memory] if args.Trust: args.lean_options.append("--Trust") if args.to_axiom: args.lean_options.append("--to_axiom") if args.cache: args.lean_options += ["-c", args.cache] if args.lean_config_option: for item in args.lean_config_option: args.lean_options.append("-D" + item) return args def parse_arg(argv): parser = argparse.ArgumentParser(description='linja: ninja build wrapper for Lean theorem prover.') parser.add_argument('--flycheck', '-F', action='store_true', default=False, help="Use --flycheck option for Lean.") parser.add_argument('--flycheck-max-messages', action='store', type=int, default=None, const=999999, nargs='?', help="Number of maximum flycheck messages to display.") parser.add_argument('--cache', action='store', help="Use specified cache (clean) file.") parser.add_argument('--directory', '-C', action='store', help="change to DIR before doing anything else.") parser.add_argument('--lean-config-option', '-D', action='append', help="set a Lean configuration option (name=value)") parser.add_argument('--verbose', '-v', action='store_true', help="turn on verbose option") parser.add_argument('--memory', '-M', action='store', default=None, const=1, nargs='?', help="maximum amount of memory that can be used by Lean [default=unbounded]") parser.add_argument('--keep-going', '-k', action='store', default=None, const=1, nargs='?', help="keep going until N jobs fail [default=1]") parser.add_argument('--discard', '-r', action='store_true', default=False, help="discard the proof of imported theorems after checking") parser.add_argument('--to_axiom', '-X', action='store_true', default=False, help="discard proofs of all theorems after checking them, i.e., theorems become axioms after checking") parser.add_argument('--Trust', '-T', action='store_true', default=False, help="short-cut for maximum trust level (e.g., Lean will not type check imported files)") parser.add_argument('targets', nargs='*') args = parser.parse_args(argv) check_requirements() args = process_args(args) args = get_lean_options(args) return args def debug_status(args): print("Working Directory =", os.getcwd()) print("") for key, val in six.iteritems(vars(args)): print("Option[" + key + "] =", val) print("") print("linja path =", g_linja_path) print("lean/bin dir =", g_lean_bin_dir) print("phony targets =", g_phony_targets) print("lean path =", g_lean_path) print("leantags path =", g_leantags_path) print("ninja path =", g_ninja_path) def handle_flycheck_failure(out, err, args): if len(args.targets) == 0: error("handle_flycheck_failure is called without targets") target = args.targets[0] failed = set() for line in out.splitlines(): if line.startswith("FAILED:"): for lean_file in find_lean_files(args): lean = lean_file['lean'] if lean in line and lean != target: failed.add(lean) for failed_file in failed: print(g_flycheck_header, "ERROR") print("%s:1:0: error: failed to compile %s" % (target, failed_file)) print(g_flycheck_footer) if err: print(g_flycheck_header, "ERROR") print("%s:1:0: error: %s" % (target, err.strip())) print(g_flycheck_footer) if failed: call_lean(target, args) def process_lean_output(target, out, args, using_hlean): n = args.flycheck_max_messages if target.endswith(".olean"): if using_hlean: target = target[:-5] + "hlean" else: target = target[:-5] + "lean" if (not target.endswith(".lean") and not using_hlean) or (not target.endswith(".hlean") and using_hlean): print(out) return # Parse, filter, and remove extra items flycheckItemList = FlycheckItemList.fromString(out) flycheckItemList.filter(lambda item: item.filename == target) flycheckItemList.removeExtraItemsStartswith("failed to add declaration") # Only keep n items in the list. # Add tooManyItemsError at the end if we truncated the list if n and len(flycheckItemList) > n: count = len(flycheckItemList) flycheckItemList.truncate(n) tooManyItemsError = FlycheckItem(target, 1, 0, "error", "For performance, we only display %d errors/warnings out of %d. (lean-flycheck-max-messages-to-display)" % (n, count)) flycheckItemList.append(tooManyItemsError) print(flycheckItemList) def call_ninja(args): targets = [] for item in args.targets: if item.endswith(".lean"): targets.append(item[:-4] + "olean") elif item.endswith(".hlean"): targets.append(item[:-5] + "olean") else: targets.append(item) proc_out = proc_err = None if args.flycheck: proc_out = subprocess.PIPE proc_err = subprocess.PIPE ninja_option = [] if args.keep_going: ninja_option += ["-k", args.keep_going] proc = subprocess.Popen([g_ninja_path] + ninja_option + targets, stdout=proc_out, stderr=proc_err) (out, err) = proc.communicate() if args.flycheck: if len(args.targets) == 1 and (args.targets[0].endswith(".lean") or args.targets[0].endswith(".hlean")): process_lean_output(targets[0], out, args, args.targets[0].endswith(".hlean")) handle_flycheck_failure(out, err, args) else: print(out + err) return proc.returncode def call_lean(filename, args): proc = subprocess.Popen([g_lean_path] + args.lean_options + [filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = proc.communicate()[0] process_lean_output(filename, out, args, filename.endswith(".hlean")) return proc.returncode def get_lean_names(lean_file, args, using_hlean): lean_file = os.path.abspath(lean_file) basename, ext = os.path.splitext(lean_file) basename = normalize_drive_name(basename) item = {"base" : basename} lean_name_exts = ["lean", "olean", "clean", "ilean", "d"] for ext in lean_name_exts: if ext == "lean" and using_hlean: item[ext] = basename + ".hlean" else: item[ext] = basename + "." + ext if args.cache and len(args.targets) == 1 and item['lean'] == args.targets[0]: item['clean'] = args.cache return item def find_files(directory, pattern): if "/" in pattern: return glob.glob(os.path.join(directory, pattern)) matches = [] for root, dirnames, filenames in os.walk(directory): for filename in fnmatch.filter(filenames, pattern): matches.append(normalize_drive_name(os.path.join(root, filename))) return matches def find_lean_files(args): """Find lean files under project directory. Include and exclude files based on patterns in .project file. Use static cache to compute only once and reuse it""" project_dir = args.project_dir if find_lean_files.cached_list != []: return find_lean_files.cached_list files = set() include_patterns, exclude_patterns = [], [] with open(os.path.join(project_dir, g_project_filename), 'r') as f: for line in f: c = line[0] if c == '+': include_patterns.append(line[1:].strip()) elif c == '-': exclude_patterns.append(line[1:].strip()) elif c == '#': pass # Comment elif c == 'T': pass # TAG elif c == 'O': pass # Lean Option for pattern in include_patterns: files |= set(find_files(project_dir, pattern)) for pattern in exclude_patterns: files -= set(find_files(project_dir, pattern)) has_lean = False has_hlean = False for file in files: if file.endswith(".lean"): has_lean = True if file.endswith(".hlean"): has_hlean = True if has_lean and has_hlean: error("project cannot mix .lean and .hlean files") for file in args.targets: if file.endswith(".lean") or file.endswith(".hlean"): files.add(file) elif file.endswith(".olean"): if has_hlean: file.add(file[:-5] + "hlean") else: file.add(file[:-5] + "lean") for f in files: find_lean_files.cached_list.append(get_lean_names(f, args, has_hlean)) return find_lean_files.cached_list # Initialize static variable find_lean_files.cached_list = [] def clear_cache(args): files = find_lean_files(args) files = find_lean_files(args) files = find_lean_files(args) files = find_lean_files(args) num_of_files = len(files) i = 0 for item in files: i += 1 clean_file = item['clean'] if os.path.isfile(clean_file): sys.stderr.write("[%i/%i] clear cache... % -80s\r" % (i, num_of_files, clean_file)) sys.stderr.flush() os.remove(clean_file) if num_of_files > 0: sys.stderr.write("\n") def build_olean(lean, olean, clean, dlean, ilean, base): (lean, olean, clean, dlean, ilean, base) = list(map(escape_ninja_char, (lean, olean, clean, dlean, ilean, base))) if clean.startswith(base): str = """build %s %s %s: LEAN %s | %s\n""" % (olean, ilean, clean, lean, dlean) else: str = """build %s %s: LEAN %s | %s\n""" % (olean, ilean, lean, dlean) str += " DLEAN_FILE=%s\n" % dlean str += " OLEAN_FILE=%s\n" % olean str += " CLEAN_FILE=%s\n" % clean str += " ILEAN_FILE=%s\n" % ilean return str def make_build_ninja(args): with open(os.path.join(args.project_dir, "build.ninja"), "w") as f: lean_files = find_lean_files(args) print("""rule CLEAN""", file=f) print(""" command = """, end=' ', file=f) print(""""%s" -t clean""" % g_ninja_path, file=f) print(""" description = Cleaning all built files...""", file=f) print("""rule LEAN""", file=f) print(""" depfile = ${DLEAN_FILE}""", file=f) print(""" command = "%s" %s "$in" -o "${OLEAN_FILE}" -c "${CLEAN_FILE}" -i "${ILEAN_FILE}" """ \ % (g_lean_path, " ".join(args.lean_options)), file=f) print("""rule LEANTAGS""", file=f) print(""" command = """, end=' ', file=f) if platform.system() == "Windows": print("python ", end=' ', file=f) print(""""%s" --relative -- $in """ % (g_leantags_path), file=f) print("build all: phony", end=' ', file=f) for item in lean_files: print(" " + escape_ninja_char(item['olean']), end=' ', file=f) print("", file=f) tags_file = "TAGS" print("build tags: phony " + tags_file, file=f) print("build " + tags_file + ": LEANTAGS", end=' ', file=f) for item in lean_files: print(" " + escape_ninja_char(item['ilean']), end=' ', file=f) print("", file=f) print("""build clean: CLEAN""", file=f) for item in lean_files: print(build_olean(item['lean'], item['olean'], item['clean'], item['d'], item['ilean'], item['base']), file=f) print("""default all""", file=f) def make_deps(lean_file, dlean_file, olean_file): with open(dlean_file, "w") as f: deps = [] proc = subprocess.Popen([g_lean_path, "--deps", lean_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output = proc.communicate()[0] print(olean_file + ": \\", file=f) for olean_file in output.strip().splitlines(): if olean_file: deps.append(normalize_drive_name(os.path.abspath(olean_file))) deps_str = " " + (" \\\n ".join(deps)) print(deps_str, file=f) def make_deps_all_files(args): lean_files = find_lean_files(args) threads = [] i, num_of_files = 1, len(lean_files) for item in lean_files: lean_file = item['lean'] dlean_file = item['d'] olean_file = item['olean'] if not os.path.isfile(dlean_file) or os.path.getmtime(dlean_file) < os.path.getmtime(lean_file): thread = threading.Thread(target=make_deps, args = [lean_file, dlean_file, olean_file]) sys.stderr.write("[%i/%i] generating dep... % -80s" % (i, num_of_files, dlean_file)) if args.flycheck == True: sys.stderr.write("\n") else: sys.stderr.write("\r") sys.stderr.flush() thread.start() threads.append(thread) i += 1 for thread in threads: thread.join() if threads != []: sys.stderr.write("\n") def main(argv=None): init_logger() if argv is None: argv = sys.argv[1:] args = parse_arg(argv) if args.project_dir: if args.targets == ["clear-cache"]: args.targets = [] clear_cache(args) return 0 if not "clean" in args.targets: make_deps_all_files(args) make_build_ninja(args) return call_ninja(args) else: # NO Project Directory Found if args.phony_targets: error("cannot find project directory. Make sure that you have " \ + g_project_filename + " file at the project root.") returncode = 0 for filename in args.targets: if os.path.isfile(filename) and (filename.endswith(".lean") or filename.endswith(".hlean")): returncode |= call_lean(filename, args) return returncode return 0 if __name__ == "__main__": sys.exit(main())