#!/usr/bin/env python2.7 # -*- coding: utf-8 -*- # # Copyright (c) 2014 Microsoft Corporation. All rights reserved. # Released under Apache 2.0 license as described in the file LICENSE. # # Author: Soonho Kong # import argparse import fnmatch import glob import os import platform import shutil import stat import subprocess import sys import tempfile import threading import urllib g_lean_path = "USE DEFAULT" g_ltags_path = "USE DEFAULT" g_ninja_path = "USE DEFAULT" g_working_dir = os.getcwd() g_lean_files = [] g_linja_path = os.path.realpath(__file__) g_lean_bin_dir = os.path.dirname(g_linja_path) g_lean_options = [] def which(program): """ Lookup program in a path """ import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def log(msg): print >> sys.stderr, msg def log_nonewline(msg): print >> sys.stderr, ("\r%s" % msg), sys.stderr.flush() def error(msg): log("Error: %s" % msg) exit(1) class LinjaException(Exception): """Custom Exception""" def __init__(self, msg): self.msg = msg def __str__(self): return self.msg def give_exec_permission(filename): "chmod +x filename" st = os.stat(filename) os.chmod(filename, st.st_mode | stat.S_IEXEC) def show_download_progress(*data): file_size = int(data[2]/1000) total_packets = data[2]/data[1] downloaded_packets = data[0] log_nonewline("Download: size\t= %i kb, packet: %i/%i" % (file_size, downloaded_packets, total_packets+1)) def get_ninja_url(): prefix = "https://leanprover.github.io/bin/" if platform.architecture()[0] == "64bit": if platform.system() == "Linux": return prefix + "ninja-1.5.1-linux-x86_64" elif platform.system() == "Windows": return prefix + "ninja-1.5.1-win.exe" elif platform.system() == "Darwin": return prefix + "ninja-1.5.1-osx" if platform.architecture()[0] == "32bit": if platform.system() == "Linux": return prefix + "ninja-1.5.1-linux-i386" elif platform.system() == "Windows": pass # TODO(soonhok): add support elif platform.system() == "Darwin": pass # TODO(soonhok): add support error("we do not have ninja executable for this platform: %s" % platform.platform()) def build_ninja_and_save_at(ninja_path, platform): saved_current_dir = os.getcwd() tempdir = tempfile.mkdtemp() build_dir = os.path.join(tempdir, "ninja") built_ninja_path = os.path.join(build_dir, "ninja") cmd_clone_ninja = ["git", "clone", "git://github.com/martine/ninja.git"] cmd_checkout_release = ["git", "checkout", "release"] cmd_bootstrap = [os.path.join(build_dir, "bootstrap.py"), "--platform", platform] try: os.chdir(tempdir) if subprocess.call(cmd_clone_ninja): raise LinjaException("Failed to clone ninja repository") os.chdir(os.path.join(build_dir)) if subprocess.call(cmd_checkout_release): raise LinjaException("Failed to checkout release branch of ninja") if subprocess.call(cmd_bootstrap): raise LinjaException("Failed to build ninja") shutil.copy2(built_ninja_path, ninja_path) except IOError as e: error(e) except LinjaException as e: error(e) finally: os.chdir(saved_current_dir) shutil.rmtree(tempdir) return ninja_path def download_ninja_and_save_at(ninja_path): if platform.system().startswith("CYGWIN"): return build_ninja_and_save_at(ninja_path, "linux") else: url = get_ninja_url() log("Downloading ninja: %s ===> %s\n" % (url, ninja_path)) urllib.urlretrieve(url, ninja_path, show_download_progress) log("\n") if not os.path.isfile(ninja_path): error("failed to download ninja executable from %s" % url) give_exec_permission(ninja_path) return ninja_path def check_required_packages(): global g_lean_path, g_ltags_path, g_ninja_path lean_exec_name = "lean.exe" if platform.system() == "Windows" else "lean" ltags_exec_name = "ltags.exe" if platform.system() == "Windows" else "ltags" ninja_exec_name = "ninja.exe" if platform.system() == "Windows" else "ninja" if g_lean_path == "USE DEFAULT": g_lean_path = which(lean_exec_name) or os.path.join(g_lean_bin_dir, lean_exec_name) if g_ltags_path == "USE DEFAULT": g_ltags_path = which(ltags_exec_name) or os.path.join(g_lean_bin_dir, ltags_exec_name) if g_ninja_path == "USE DEFAULT": g_ninja_path = which(ninja_exec_name) or os.path.join(g_lean_bin_dir, ninja_exec_name) if not os.path.isfile(g_lean_path): error("cannot find lean executable at " + g_lean_path) if not os.path.isfile(g_ltags_path): error("cannot find ltags executable at " + g_ltags_path) if not os.path.isfile(g_ninja_path): g_ninja_path = download_ninja_and_save_at(g_ninja_path) def make_deps(lean_file, dlean_file, olean_file): with open(dlean_file, "w") as f: deps = [] proc = subprocess.Popen([g_lean_path, "--deps", lean_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output = proc.communicate()[0] print >> f, olean_file + ": \\" for olean_file in output.strip().split("\n"): deps.append(olean_file) deps_str = " " + (" \\\n ".join(deps)) print >> f, deps_str def get_lean_names(lean_file, args): basename, ext = os.path.splitext(lean_file) item = {} item['base'] = basename; item['lean'] = basename + ".lean"; item['d'] = basename + ".d"; item['olean'] = basename + ".olean"; item['ilean'] = basename + ".ilean" if args.cache and len(args.targets) == 1 and item['lean'] == args.targets[0]: item['clean'] = args.cache else: item['clean'] = basename + ".clean"; return item def make_deps_all_files(directory, args): lean_files = find_lean_files(directory, args) threads = [] i, num_of_files = 1, len(lean_files) for item in lean_files: lean_file = item['lean'] dlean_file = item['d'] olean_file = item['olean'] if not os.path.isfile(dlean_file) or os.path.getmtime(dlean_file) < os.path.getmtime(lean_file): thread = threading.Thread(target=make_deps, args = [lean_file, dlean_file, olean_file]) sys.stderr.write("[%i/%i] generating dep... % -80s\r" % (i, num_of_files, dlean_file)) sys.stderr.flush() thread.start() threads.append(thread) i += 1 for thread in threads: thread.join() if threads != []: sys.stderr.write("\n") def rule_clean(): return """rule CLEAN command = "%s" -t clean description = Cleaning all built files...""" % g_ninja_path def rule_lean(): return """rule LEAN depfile = ${DLEAN_FILE} command = "%s" %s "$in" -o "${OLEAN_FILE}" -c "${CLEAN_FILE}" -i "${ILEAN_FILE}" """ \ % (g_lean_path, " ".join(g_lean_options)) def rule_tags(): return """rule LTAGS command = "%s" $in """ % (g_ltags_path) def build_all(lean_files): str = "build all: phony" for item in lean_files: str = str + " " + item['olean'] return str def build_tags(lean_files): tags_file = os.path.join(g_working_dir, "TAGS") str = "build tags: phony " + tags_file + "\n" str += "build " + tags_file + ": LTAGS" for item in lean_files: str = str + " " + item['ilean'] return str def build_clean(): return """build clean: CLEAN""" def build_olean(lean, olean, clean, dlean, ilean, base): if clean.startswith(base): str = """build %s %s %s: LEAN %s | %s\n""" % (olean, ilean, clean, lean, dlean) else: str = """build %s %s: LEAN %s | %s\n""" % (olean, ilean, lean, dlean) str += " DLEAN_FILE=%s\n" % dlean str += " OLEAN_FILE=%s\n" % olean str += " CLEAN_FILE=%s\n" % clean str += " ILEAN_FILE=%s\n" % ilean return str def ninja_default(): return """default all""" def make_build_ninja(directory, args): with open(os.path.join(directory, "build.ninja"), "w") as f: lean_files = find_lean_files(directory, args) print >> f, rule_clean() print >> f, rule_lean() print >> f, rule_tags() print >> f, build_all(lean_files) print >> f, build_tags(lean_files) print >> f, build_clean() for item in lean_files: print >> f, build_olean(item['lean'], item['olean'], item['clean'], item['d'], item['ilean'], item['base']) print >> f, ninja_default() def find_project_upward(path): project_file = os.path.join(path, ".project") if os.path.isfile(project_file): return path up = os.path.dirname(path) if up != path: return find_project_upward(up) return None def handle_failure_for_flycheck(out, err, args): if len(args.targets) == 0: error("handle_failure_for_flycheck is called without targets") target = args.targets[0] failed = set() for line in out.split("\n"): if line.startswith("FAILED:"): for lean_file in g_lean_files: lean = lean_file['lean'] if lean in line and lean != target: failed.add(lean) for failed_file in failed: print "FLYCHECK_BEGIN ERROR" print "%s:1:0: error: failed to compile %s" % (target, failed_file) print "FLYCHECK_END" if err: print "FLYCHECK_BEGIN ERROR" print "%s:1:0: error: %s" % (target, err.strip()) print "FLYCHECK_END" if failed: call_lean(target, args) def print_flycheck_output_upto_n(target, out, n): if target.endswith(".olean"): target = target[:-5] + "lean" if not target.endswith(".lean") or n == None: print out return inside_of_flycheck = False reach_limit = False flycheck_header = "FLYCHECK_BEGIN" flycheck_footer = "FLYCHECK_END" lines = out.split("\n") lines_len = len(lines) count = 0 i = 0 current_file = "" while i < lines_len: line = lines[i] if line.startswith(flycheck_header): inside_of_flycheck = True current_file = lines[i+1].split(":")[0] if inside_of_flycheck and not reach_limit: print line if line.startswith(flycheck_footer): if current_file == target: count += 1 inside_of_flycheck = False if count >= n: reach_limit = True i += 1 if reach_limit: print "FLYCHECK_BEGIN ERROR" print "%s:1:0: error: For performance, we only display %d errors/warnings out of %d. (lean-flycheck-max-messages-to-display)" % (target, n, count) print "FLYCHECK_END" def call_ninja(directory, args): targets = [] for item in args.targets: if item.endswith(".lean"): targets.append(item[:-4] + "olean") else: targets.append(item) proc_out = proc_err = None if args.flycheck: proc_out = subprocess.PIPE proc_err = subprocess.PIPE proc = subprocess.Popen([g_ninja_path] + targets, stdout=proc_out, stderr=proc_err) proc.wait() if args.flycheck: (out, err) = proc.communicate() if len(args.targets) == 1 and args.targets[0].endswith(".lean"): print_flycheck_output_upto_n(targets[0], out, args.flycheck_max_messages) handle_failure_for_flycheck(out, err, args) else: print out + err return proc.returncode def call_lean(filename, args): proc = subprocess.Popen([g_lean_path] + g_lean_options + [filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = proc.communicate()[0] print_flycheck_output_upto_n(filename, out, args.flycheck_max_messages) return proc.returncode def find_files(directory, pattern): if "/" in pattern: return glob.glob(os.path.join(directory, pattern)) matches = [] for root, dirnames, filenames in os.walk(directory): for filename in fnmatch.filter(filenames, pattern): matches.append(os.path.join(root, filename)) return matches def find_lean_files(project_dir, args): if g_lean_files != []: return g_lean_files files = set() include_patterns, exclude_patterns = [], [] with open(os.path.join(project_dir, ".project"), 'r') as f: for line in f: c = line[0] if c == '+': include_patterns.append(line[1:].strip()) elif c == '-': exclude_patterns.append(line[1:].strip()) elif c == '#': pass # Comment elif c == 'T': pass # TAG elif c == 'O': pass # Lean Option for pattern in include_patterns: files |= set(find_files(project_dir, pattern)) for pattern in exclude_patterns: files -= set(find_files(project_dir, pattern)) for file in args.targets: file = os.path.abspath(file) if file.endswith(".lean"): files.add(file) elif file.endswith(".olean"): file.add(file[:-5] + "lean") for f in files: g_lean_files.append(get_lean_names(f, args)) return g_lean_files def expand_target_to_fullname(target): if target in ["all", "clean", "tags", "clear-cache"]: return target elif os.path.isfile(target): return os.path.abspath(target) else: return os.path.join(g_working_dir, target) def parse_arg(argv): global g_working_dir parser = argparse.ArgumentParser(description='linja: ninja build wrapper for Lean theorem prover.') parser.add_argument('--flycheck', '-F', action='store', type=int, default=None, const=120, nargs='?', help="Use --flycheck option for Lean.") parser.add_argument('--flycheck-max-messages', action='store', type=int, default=None, const=999999, nargs='?', help="Number of maximum flycheck messages to display.") parser.add_argument('--cache', action='store', help="Use specified cache (clean) file.") parser.add_argument('--directory', '-C', action='store', help="change to DIR before doing anything else.") parser.add_argument('targets', nargs='*') args = parser.parse_args(argv) if (args.flycheck == None and args.flycheck_max_messages != None): error("Please use --flycheck option with --flycheck-max-messages option.") if args.cache: args.cache = expand_target_to_fullname(args.cache) if len(args.targets) != 1: error("--cache option can only be used with one target") if not args.cache.endswith(".lean"): error("cache argument has to be ends with .lean") args.cache = args.cache[:-4] + "clean" args.targets = map(expand_target_to_fullname, args.targets) if args.directory: os.chdir(args.directory) g_working_dir = args.directory return args def get_lean_options(args): options = [] if args.flycheck: options.append("--flycheck") options.append("-D pp.width=%d" % args.flycheck) return options def clear_cache(project_dir, args): for item in find_lean_files(project_dir, args): if os.path.isfile(item['clean']): os.remove(item['clean']) def main(argv=None): global g_lean_options if argv is None: argv = sys.argv[1:] check_required_packages() args = parse_arg(argv) project_dir = find_project_upward(g_working_dir) g_lean_options += get_lean_options(args) if not project_dir and args.targets in [[], ["all"], ["clean"], ["tags"], ["clear-cache"]]: error("cannot find project directory. Make sure that you have .project file at the project root.") if project_dir: os.chdir(project_dir) if args.targets == ["clear-cache"]: args.targets = [] clear_cache(project_dir, args) return 0 if not "clean" in args.targets: make_deps_all_files(project_dir, args) make_build_ninja(project_dir, args) return call_ninja(project_dir, args) else: returncode = 0 for filename in args.targets: if os.path.isfile(filename) and filename.endswith(".lean"): returncode |= call_lean(filename, args) return returncode if __name__ == "__main__": sys.exit(main())