#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014 Microsoft Corporation. All rights reserved. # Released under Apache 2.0 license as described in the file LICENSE. # # Author: Soonho Kong # import argparse import fnmatch import glob import logging import logging.handlers import os import platform import shutil import stat import subprocess import sys import tempfile import threading import urllib # Enforce subprocesses to use 'utf-8' reload(sys) sys.setdefaultencoding("utf-8") # Fixate the path separator as '\' on Windows Platform # even if users are on CYGWIN/MSYS2 environment if platform.system() == "Windows": os.path.sep = "\\" # Reset MSYSTEM environment variable to enforce native-WINDOWS # behavior to subprocesses. if "MSYSTEM" in os.environ: del os.environ["MSYSTEM"] if platform.system().startswith("MSYS"): # In MSYS platform, realpath has a strange behavior. # os.path.realpath("c:\a\b\c") => \:\a\b\c g_linja_path = os.path.abspath(os.path.normpath(__file__)) else: g_linja_path = os.path.abspath(os.path.realpath(__file__)) g_lean_bin_dir = os.path.dirname(g_linja_path) g_phony_targets = ["all", "clean", "tags", "clear-cache"] g_project_filename = ".project" g_lean_path = "USE DEFAULT" # System will search automatically g_ltags_path = "USE DEFAULT" # System will search automatically g_ninja_path = "USE DEFAULT" # System will search automatically g_flycheck_header = "FLYCHECK_BEGIN" g_flycheck_footer = "FLYCHECK_END" g_logger = logging.getLogger('linja_logger') g_debug_mode = False class FlycheckItem: def __init__(self, filename, lineno, colno, ty, msg): self.filename = filename self.lineno = lineno self.colno = colno self.ty = ty self.msg = msg pass def __str__(self): return unicode(self).encode('utf-8') def __unicode__(self): ret = g_flycheck_header + " " + self.ty.upper() + "\n" ret += "%s:%d:%d: %s: %s" % (self.filename, self.lineno, self.colno, self.ty, self.msg) + "\n" ret += g_flycheck_footer return ret def __lt__(self, other): return (self.filename, self.ty, self.lineno, self.colno) < (other.filename, other.ty, other.lineno, other.colno) def loc(self): return (self.filename, self.ty, self.lineno, self.colno) @classmethod def fromString(cls, text): # Throw the first and last lines (header/footer) lines = text.splitlines() lines = lines[1:-1] firstLine = lines[0] items = [item.strip() for item in firstLine.split(":")] filename = items[0] lineno = int(items[1]) colno = int(items[2]) ty = items[3] msg = ":".join(items[4:]) + "\n" + "\n".join(lines[1:]) msg = msg.strip() return cls(filename, lineno, colno, ty, msg) class FlycheckItemList: def __init__(self, items): self.items = items def __str__(self): return unicode(self).encode('utf-8') def __unicode__(self): return "\n".join([unicode(item) for item in self.items]) def __getitem__(self, i): return self.items[i] def __len__(self): return len(self.items) def sort(self): self.items = sorted(self.items) def filter(self, pred): self.items = filter(pred, self.items) def append(self, item): self.items.append(item) def truncate(self, n): del self.items[n:] def removeExtraItemsStartswith(self, text): self.sort() newItems = self.items[:1] i = 1 while i < len(self.items): prev_item = self.items[i-1] cur_item = self.items[i] if not cur_item.msg.startswith(text) or prev_item.loc() != cur_item.loc(): newItems.append(cur_item) i += 1 self.items = newItems @classmethod def fromString(cls, text): items = [] tmpBuffer = "" ignore = True for line in text.splitlines(): if line.startswith(g_flycheck_header): tmpBuffer = tmpBuffer + line + "\n" ignore = False elif line.startswith(g_flycheck_footer): tmpBuffer = tmpBuffer + line + "\n" items.append(FlycheckItem.fromString(tmpBuffer.strip())) tmpBuffer = "" ignore = True elif not ignore: tmpBuffer = tmpBuffer + line + "\n" return cls(items) def init_logger(): formatter = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s') streamHandler = logging.StreamHandler() streamHandler.setFormatter(formatter) g_logger.addHandler(streamHandler) if g_debug_mode == True: fileHandler = logging.FileHandler('./linja.log') fileHandler.setFormatter(formatter) g_logger.addHandler(fileHandler) def log(msg): print >> sys.stderr, msg def log_nonewline(msg): print >> sys.stderr, ("\r%s" % msg), sys.stderr.flush() def error(msg): log("Error: %s" % msg) exit(1) class LinjaException(Exception): """Custom Exception""" def __init__(self, msg): self.msg = msg def __str__(self): return self.msg def give_exec_permission(filename): """chmod +x filename""" st = os.stat(filename) os.chmod(filename, st.st_mode | stat.S_IEXEC) def show_download_progress(*data): file_size = int(data[2]/1000) total_packets = data[2]/data[1] downloaded_packets = data[0] log_nonewline("Download: size\t= %i kb, packet: %i/%i" % (file_size, downloaded_packets, total_packets+1)) def get_ninja_url(): prefix = "https://leanprover.github.io/bin/" if platform.architecture()[0] == "64bit": if platform.system() == "Linux": return prefix + "ninja-1.5.1-linux-x86_64" elif platform.system() == "Windows": return prefix + "ninja-1.5.1-win.exe" elif platform.system() == "Darwin": return prefix + "ninja-1.5.1-osx" if platform.architecture()[0] == "32bit": if platform.system() == "Linux": return prefix + "ninja-1.5.1-linux-i386" elif platform.system() == "Windows": pass # TODO(soonhok): add support elif platform.system() == "Darwin": pass # TODO(soonhok): add support error("we do not have ninja executable for this platform: %s" % platform.platform()) def build_ninja_and_save_at(ninja_path, platform): saved_current_dir = os.getcwd() tempdir = tempfile.mkdtemp() build_dir = os.path.join(tempdir, "ninja") built_ninja_path = os.path.join(build_dir, "ninja") cmd_clone_ninja = ["git", "clone", "git://github.com/martine/ninja.git"] cmd_checkout_release = ["git", "checkout", "release"] cmd_bootstrap = [os.path.join(build_dir, "bootstrap.py"), "--platform", platform] try: os.chdir(tempdir) if subprocess.call(cmd_clone_ninja): raise LinjaException("Failed to clone ninja repository") os.chdir(build_dir) if subprocess.call(cmd_checkout_release): raise LinjaException("Failed to checkout release branch of ninja") if subprocess.call(cmd_bootstrap): raise LinjaException("Failed to build ninja") shutil.copy2(built_ninja_path, ninja_path) except IOError as e: error(e) except LinjaException as e: error(e) finally: os.chdir(saved_current_dir) shutil.rmtree(tempdir) return ninja_path def download_ninja_and_save_at(ninja_path): if platform.system().startswith("CYGWIN"): return build_ninja_and_save_at(ninja_path, "linux") else: url = get_ninja_url() log("Downloading ninja: %s ===> %s\n" % (url, ninja_path)) urllib.urlretrieve(url, ninja_path, show_download_progress) log("\n") if not os.path.isfile(ninja_path): error("failed to download ninja executable from %s" % url) give_exec_permission(ninja_path) return ninja_path def find_file_upward(name, path = os.getcwd()): project_file = os.path.join(path, name) if os.path.isfile(project_file): return path up = os.path.dirname(path) if up != path: return find_file_upward(name, up) return None def escape_ninja_char(name): return name.replace(":", "$:") def normalize_drive_name(name): if platform.system() == "Windows": drive, path = os.path.splitdrive(name) if drive == None: return name else: # Leo: return drive.lower() + path return path else: return name def process_target(target): if target in g_phony_targets: return target return normalize_drive_name(os.path.abspath(target)) def which(program): """ Lookup program in a path """ import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def find_location(exec_name): pathname = which(exec_name) or os.path.join(g_lean_bin_dir, exec_name) pathname = os.path.abspath(pathname) return pathname def check_requirements(): global g_lean_path, g_ltags_path, g_ninja_path ltags_exec_name = "ltags" lean_exec_name = "lean" ninja_exec_name = "ninja" if platform.system() == "Windows" or platform.system().startswith("MSYS"): lean_exec_name = "lean.exe" ninja_exec_name = "ninja.exe" if g_lean_path == "USE DEFAULT": g_lean_path = find_location(lean_exec_name) if g_ltags_path == "USE DEFAULT": g_ltags_path = find_location(ltags_exec_name) if g_ninja_path == "USE DEFAULT": g_ninja_path = find_location(ninja_exec_name) if not os.path.isfile(g_lean_path): error("cannot find lean executable at " + os.path.abspath(g_lean_path)) if not os.path.isfile(g_ltags_path): error("cannot find ltags executable at " + os.path.abspath(g_ltags_path)) if not os.path.isfile(g_ninja_path): g_ninja_path = download_ninja_and_save_at(g_ninja_path) def process_args(args): if (args.flycheck == False and args.flycheck_max_messages != None): error("Please use --flycheck option with --flycheck-max-messages option.") args.targets = map(process_target, args.targets) if args.directory: os.chdir(args.directory) args.project_dir = find_file_upward(g_project_filename) if args.project_dir: os.chdir(args.project_dir) if args.cache: args.cache = process_target(args.cache) if len(args.targets) != 1: error("--cache option can only be used with one target") if not args.cache.endswith(".lean"): error("cache argument has to be ends with .lean") args.cache = args.cache[:-4] + "clean" args.phony_targets = list(set(g_phony_targets) & set(args.targets)) if args.verbose: g_logger.setLevel(logging.INFO) return args def get_lean_options(args): args.lean_options = [] if args.flycheck: args.lean_options.append("--flycheck") if args.discard: args.lean_options.append("--discard") if args.to_axiom: args.lean_options.append("--to_axiom") if args.cache: args.lean_options += ["-c", args.cache] if args.lean_config_option: for item in args.lean_config_option: args.lean_options.append("-D" + item) return args def parse_arg(argv): parser = argparse.ArgumentParser(description='linja: ninja build wrapper for Lean theorem prover.') parser.add_argument('--flycheck', '-F', action='store_true', default=False, help="Use --flycheck option for Lean.") parser.add_argument('--flycheck-max-messages', action='store', type=int, default=None, const=999999, nargs='?', help="Number of maximum flycheck messages to display.") parser.add_argument('--cache', action='store', help="Use specified cache (clean) file.") parser.add_argument('--directory', '-C', action='store', help="change to DIR before doing anything else.") parser.add_argument('--lean-config-option', '-D', action='append', help="set a Lean configuration option (name=value)") parser.add_argument('--verbose', '-v', action='store_true', help="turn on verbose option") parser.add_argument('--keep-going', '-k', action='store', default=None, const=1, nargs='?', help="keep going until N jobs fail [default=1]") parser.add_argument('--discard', '-T', action='store_true', default=False, help="discard the proof of imported theorems after checking") parser.add_argument('--to_axiom', '-X', action='store_true', default=False, help="discard proofs of all theorems after checking them, i.e., theorems become axioms after checking") parser.add_argument('targets', nargs='*') args = parser.parse_args(argv) check_requirements() args = process_args(args) args = get_lean_options(args) return args def debug_status(args): print "Working Directory =", os.getcwd() print "" for key, val in vars(args).iteritems(): print "Option[" + key + "] =", val print "" print "linja path =", g_linja_path print "lean/bin dir =", g_lean_bin_dir print "phony targets =", g_phony_targets print "lean path =", g_lean_path print "ltags path =", g_ltags_path print "ninja path =", g_ninja_path def handle_flycheck_failure(out, err, args): if len(args.targets) == 0: error("handle_flycheck_failure is called without targets") target = args.targets[0] failed = set() for line in out.splitlines(): if line.startswith("FAILED:"): for lean_file in find_lean_files(args): lean = lean_file['lean'] if lean in line and lean != target: failed.add(lean) for failed_file in failed: print g_flycheck_header, "ERROR" print "%s:1:0: error: failed to compile %s" % (target, failed_file) print g_flycheck_footer if err: print g_flycheck_header, "ERROR" print "%s:1:0: error: %s" % (target, err.strip()) print g_flycheck_footer if failed: call_lean(target, args) def process_lean_output(target, out, args): n = args.flycheck_max_messages if target.endswith(".olean"): target = target[:-5] + "lean" if not target.endswith(".lean"): print out return # Parse, filter, and remove extra items flycheckItemList = FlycheckItemList.fromString(out) flycheckItemList.filter(lambda item: item.filename == target) flycheckItemList.removeExtraItemsStartswith("failed to add declaration") # Only keep n items in the list. # Add tooManyItemsError at the end if we truncated the list if n and len(flycheckItemList) > n: count = len(flycheckItemList) flycheckItemList.truncate(n) tooManyItemsError = FlycheckItem(target, 1, 0, "error", "For performance, we only display %d errors/warnings out of %d. (lean-flycheck-max-messages-to-display)" % (n, count)) flycheckItemList.append(tooManyItemsError) print flycheckItemList def call_ninja(args): targets = [] for item in args.targets: if item.endswith(".lean"): targets.append(item[:-4] + "olean") else: targets.append(item) proc_out = proc_err = None if args.flycheck: proc_out = subprocess.PIPE proc_err = subprocess.PIPE ninja_option = [] if args.keep_going: ninja_option += ["-k", args.keep_going] proc = subprocess.Popen([g_ninja_path] + ninja_option + targets, stdout=proc_out, stderr=proc_err) (out, err) = proc.communicate() if args.flycheck: if len(args.targets) == 1 and args.targets[0].endswith(".lean"): process_lean_output(targets[0], out, args) handle_flycheck_failure(out, err, args) else: print out + err return proc.returncode def call_lean(filename, args): proc = subprocess.Popen([g_lean_path] + args.lean_options + [filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = proc.communicate()[0] process_lean_output(filename, out, args) return proc.returncode def get_lean_names(lean_file, args): lean_file = os.path.abspath(lean_file) basename, ext = os.path.splitext(lean_file) basename = normalize_drive_name(basename) lean_name_exts = ["lean", "olean", "clean", "ilean", "d"] item = {"base" : basename} for ext in lean_name_exts: item[ext] = basename + "." + ext if args.cache and len(args.targets) == 1 and item['lean'] == args.targets[0]: item['clean'] = args.cache return item def find_files(directory, pattern): if "/" in pattern: return glob.glob(os.path.join(directory, pattern)) matches = [] for root, dirnames, filenames in os.walk(directory): for filename in fnmatch.filter(filenames, pattern): matches.append(normalize_drive_name(os.path.join(root, filename))) return matches def find_lean_files(args): """Find lean files under project directory. Include and exclude files based on patterns in .project file. Use static cache to compute only once and reuse it""" project_dir = args.project_dir if find_lean_files.cached_list != []: return find_lean_files.cached_list files = set() include_patterns, exclude_patterns = [], [] with open(os.path.join(project_dir, g_project_filename), 'r') as f: for line in f: c = line[0] if c == '+': include_patterns.append(line[1:].strip()) elif c == '-': exclude_patterns.append(line[1:].strip()) elif c == '#': pass # Comment elif c == 'T': pass # TAG elif c == 'O': pass # Lean Option for pattern in include_patterns: files |= set(find_files(project_dir, pattern)) for pattern in exclude_patterns: files -= set(find_files(project_dir, pattern)) for file in args.targets: if file.endswith(".lean"): files.add(file) elif file.endswith(".olean"): file.add(file[:-5] + "lean") for f in files: find_lean_files.cached_list.append(get_lean_names(f, args)) return find_lean_files.cached_list # Initialize static variable find_lean_files.cached_list = [] def clear_cache(args): files = find_lean_files(args) files = find_lean_files(args) files = find_lean_files(args) files = find_lean_files(args) num_of_files = len(files) i = 0 for item in files: i += 1 clean_file = item['clean'] if os.path.isfile(clean_file): sys.stderr.write("[%i/%i] clear cache... % -80s\r" % (i, num_of_files, clean_file)) sys.stderr.flush() os.remove(clean_file) if num_of_files > 0: sys.stderr.write("\n") def build_olean(lean, olean, clean, dlean, ilean, base): (lean, olean, clean, dlean, ilean, base) = map(escape_ninja_char, (lean, olean, clean, dlean, ilean, base)) if clean.startswith(base): str = """build %s %s %s: LEAN %s | %s\n""" % (olean, ilean, clean, lean, dlean) else: str = """build %s %s: LEAN %s | %s\n""" % (olean, ilean, lean, dlean) str += " DLEAN_FILE=%s\n" % dlean str += " OLEAN_FILE=%s\n" % olean str += " CLEAN_FILE=%s\n" % clean str += " ILEAN_FILE=%s\n" % ilean return str def make_build_ninja(args): with open(os.path.join(args.project_dir, "build.ninja"), "w") as f: lean_files = find_lean_files(args) print >> f, """rule CLEAN""" print >> f, """ command = """, print >> f, """"%s" -t clean""" % g_ninja_path print >> f, """ description = Cleaning all built files...""" print >> f, """rule LEAN""" print >> f, """ depfile = ${DLEAN_FILE}""" print >> f, """ command = "%s" %s "$in" -o "${OLEAN_FILE}" -c "${CLEAN_FILE}" -i "${ILEAN_FILE}" """ \ % (g_lean_path, " ".join(args.lean_options)) print >> f, """rule LTAGS""" print >> f, """ command = """, if platform.system() == "Windows": print >> f, "python ", print >> f, """"%s" --relative -- $in """ % (g_ltags_path) print >> f, "build all: phony", for item in lean_files: print >> f, " " + escape_ninja_char(item['olean']), print >> f, "" tags_file = "TAGS" print >> f, "build tags: phony " + tags_file print >> f, "build " + tags_file + ": LTAGS", for item in lean_files: print >> f, " " + escape_ninja_char(item['ilean']), print >> f, "" print >> f, """build clean: CLEAN""" for item in lean_files: print >> f, build_olean(item['lean'], item['olean'], item['clean'], item['d'], item['ilean'], item['base']) print >> f, """default all""" def make_deps(lean_file, dlean_file, olean_file): with open(dlean_file, "w") as f: deps = [] proc = subprocess.Popen([g_lean_path, "--deps", lean_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output = proc.communicate()[0] print >> f, olean_file + ": \\" for olean_file in output.strip().splitlines(): if olean_file: deps.append(normalize_drive_name(os.path.abspath(olean_file))) deps_str = " " + (" \\\n ".join(deps)) print >> f, deps_str def make_deps_all_files(args): lean_files = find_lean_files(args) threads = [] i, num_of_files = 1, len(lean_files) for item in lean_files: lean_file = item['lean'] dlean_file = item['d'] olean_file = item['olean'] if not os.path.isfile(dlean_file) or os.path.getmtime(dlean_file) < os.path.getmtime(lean_file): thread = threading.Thread(target=make_deps, args = [lean_file, dlean_file, olean_file]) sys.stderr.write("[%i/%i] generating dep... % -80s" % (i, num_of_files, dlean_file)) if args.flycheck == True: sys.stderr.write("\n") else: sys.stderr.write("\r") sys.stderr.flush() thread.start() threads.append(thread) i += 1 for thread in threads: thread.join() if threads != []: sys.stderr.write("\n") def main(argv=None): init_logger() if argv is None: argv = sys.argv[1:] args = parse_arg(argv) if args.project_dir: if args.targets == ["clear-cache"]: args.targets = [] clear_cache(args) return 0 if not "clean" in args.targets: make_deps_all_files(args) make_build_ninja(args) return call_ninja(args) else: # NO Project Directory Found if args.phony_targets: error("cannot find project directory. Make sure that you have " \ + g_project_filename + " file at the project root.") returncode = 0 for filename in args.targets: if os.path.isfile(filename) and filename.endswith(".lean"): returncode |= call_lean(filename, args) return returncode return 0 if __name__ == "__main__": sys.exit(main())