diff --git a/.gitignore b/.gitignore index 7341285..ee673d9 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ tests/gnupg/pubring.kbx tests/gnupg/trustdb.gpg tests/gnupg/tofu.db tests/gnupg/pubring.kbx~ +__pycache__/ +*.pyc diff --git a/src/auxiliary.sh b/src/auxiliary.sh index d17e71d..20a6c93 100755 --- a/src/auxiliary.sh +++ b/src/auxiliary.sh @@ -31,7 +31,10 @@ is_false() { } gpg_version() { - gpg --version | head -n 1 | cut -d" " -f3 + python3 < "$file" + call_gpg contact/export.py "$homedir" "$file" "$@" # import from the tmp file - gpg --import "$file" + call_gpg contact/import.py "$file" workdir_clear } diff --git a/src/cmd/contact/fetchuri.sh b/src/cmd/contact/fetchuri.sh index 745e858..d1a00de 100644 --- a/src/cmd/contact/fetchuri.sh +++ b/src/cmd/contact/fetchuri.sh @@ -10,11 +10,10 @@ _EOF cmd_contact_fetchuri() { [[ -z $1 ]] && fail "Usage:\n$(cmd_contact_fetchuri_help)" - #gpg --fetch-keys "$@" workdir_make cd "$WORKDIR" wget -q $@ - gpg --import * + call_gpg contact/import.py * workdir_clear } diff --git a/src/cmd/contact/import.sh b/src/cmd/contact/import.sh index 50477b2..c7e595a 100644 --- a/src/cmd/contact/import.sh +++ b/src/cmd/contact/import.sh @@ -15,7 +15,7 @@ cmd_contact_import() { # import echo "Importing contacts from file: $file" - gpg --import "$file" + call_gpg contact/import.py "$file" } # diff --git a/src/cmd/contact/list.sh b/src/cmd/contact/list.sh index a63a030..9ae8cbb 100644 --- a/src/cmd/contact/list.sh +++ b/src/cmd/contact/list.sh @@ -35,14 +35,7 @@ cmd_contact_list() { return # display the details of each key - local ids - ids=$(gpg --list-keys --with-colons "$@" | grep '^pub' | cut -d: -f5) - source "$LIBDIR/fn/print_key.sh" - for id in $ids; do - echo - print_key $id - echo - done + call_gpg contact/list.py "$@" } # diff --git a/src/cmd/contact/receive.sh b/src/cmd/contact/receive.sh index 79f9b94..466b928 100644 --- a/src/cmd/contact/receive.sh +++ b/src/cmd/contact/receive.sh @@ -22,7 +22,7 @@ cmd_contact_receive() { [[ $err != 0 ]] && fail "Usage:\n$(cmd_contact_receive_help)" [[ -z $1 ]] && fail "Usage:\n$(cmd_contact_receive_help)" - gpg --keyserver "$keyserver" --recv-keys "$@" + call_gpg contact/receive.py "$keyserver" "$@" } # diff --git a/src/cmd/contact/search.sh b/src/cmd/contact/search.sh index 2a3fb6b..3773d44 100644 --- a/src/cmd/contact/search.sh +++ b/src/cmd/contact/search.sh @@ -22,7 +22,7 @@ cmd_contact_search() { [[ $err != 0 ]] && fail "Usage:\n$(cmd_contact_search_help)" [[ -z $1 ]] && fail "Usage:\n$(cmd_contact_search_help)" - gpg --keyserver="$keyserver" --search-keys "$@" + call_gpg contact/search.py "$keyserver" "$@" } # diff --git a/src/cmd/contact/trust.sh b/src/cmd/contact/trust.sh index d531417..3ee6fff 100644 --- a/src/cmd/contact/trust.sh +++ b/src/cmd/contact/trust.sh @@ -35,8 +35,7 @@ cmd_contact_trust() { *) fail "Unknown trust level: $level" ;; esac - local commands=$(echo "$level|quit" | tr '|' "\n") - echo -e "$commands" | gpg --no-tty --command-fd=0 --edit-key "$contact" trust 2>/dev/null + call_gpg contact/trust.py "$contact" "$level" || fail "" call cmd_contact_list "$contact" | grep -e "^uid:" -e "^trust:" -e "^\$" } diff --git a/src/cmd/key/delete.sh b/src/cmd/key/delete.sh index d3060c8..0c5dd04 100644 --- a/src/cmd/key/delete.sh +++ b/src/cmd/key/delete.sh @@ -19,8 +19,9 @@ cmd_key_delete() { for grip in $(get_keygrips $key_id); do rm -f "$GNUPGHOME"/private-keys-v1.d/$grip.key done + # delete public keys - gpg --delete-keys --batch --yes "$fingerprint" + call_gpg key/delete.py "$fingerprint" # remove any partials rm -f "$GNUPGHOME"/$key_id.key.[0-9][0-9][0-9] diff --git a/src/cmd/key/gen.sh b/src/cmd/key/gen.sh index b57ab27..cd5b845 100644 --- a/src/cmd/key/gen.sh +++ b/src/cmd/key/gen.sh @@ -59,7 +59,7 @@ cmd_key_gen() { # generate the key haveged_start - echo -e "$PARAMETERS" | gpg --batch --gen-key 2>/dev/null + call_gpg key/gen.py "$PARAMETERS" || fail "" haveged_stop # restrict expiration time to 1 month from now diff --git a/src/cmd/key/list.sh b/src/cmd/key/list.sh index 68e98c3..fead6fd 100644 --- a/src/cmd/key/list.sh +++ b/src/cmd/key/list.sh @@ -43,10 +43,9 @@ cmd_key_list() { return # display the details of each key - source "$LIBDIR/fn/print_key.sh" for gpg_key in $secret_keys; do echo - print_key $gpg_key + call_gpg fn/print_key.py $gpg_key echo done } diff --git a/src/cmd/key/pass.sh b/src/cmd/key/pass.sh index c3b3d51..1a20c90 100644 --- a/src/cmd/key/pass.sh +++ b/src/cmd/key/pass.sh @@ -17,7 +17,7 @@ Try first: $(basename $0) key join and: $(basename $0) key split " - gpg --batch --no-tty --passwd $GPG_KEY + call_gpg key/pass.py $GPG_KEY } # diff --git a/src/cmd/key/renew.sh b/src/cmd/key/renew.sh index f3d2cbc..1dfd766 100644 --- a/src/cmd/key/renew.sh +++ b/src/cmd/key/renew.sh @@ -28,9 +28,7 @@ Try first: $(basename $0) key join local today=$(date -d $(date +%F) +%s) time=$(( ( $expday - $today ) / 86400 )) - local commands=";expire;$time;y;key 1;expire;$time;y;key 1;save" - commands=$(echo "$commands" | tr ';' "\n") - echo -e "$commands" | gpg --no-tty --command-fd=0 --key-edit $GPG_KEY 2>/dev/null + call_gpg key/renew.py "$GPG_KEY" "$time" || fail "" call_fn gpg_send_keys $GPG_KEY call cmd_key_list diff --git a/src/cmd/key/rev.sh b/src/cmd/key/rev.sh index 10f0ab4..c0221f3 100644 --- a/src/cmd/key/rev.sh +++ b/src/cmd/key/rev.sh @@ -27,7 +27,8 @@ Are you sure about this?" || return 1 # import the revocation certificate sed -i "$revcert" -e "s/^:---/---/" - gpg --import "$revcert" + call_gpg key/rev.py "$revcert" || fail "" + call_fn gpg_send_keys $GPG_KEY } diff --git a/src/cmd/open.sh b/src/cmd/open.sh index b44889c..8918e43 100644 --- a/src/cmd/open.sh +++ b/src/cmd/open.sh @@ -18,11 +18,13 @@ cmd_open() { local output=${file%.sealed} [[ "$output" != "$file" ]] || fail "The given file does not end in '.sealed'." + if [[ -f "$output" ]]; then + yesno "File '$output' exists. Overwrite?" || return 1 + fi + # decrypt and verify gnupghome_setup - gpg --keyserver "$KEYSERVER" \ - --keyserver-options auto-key-retrieve,honor-keyserver-url \ - --decrypt --output "$output" "$file" + call_gpg open.py "$file" "$output" # $output will be overwritten if exists gnupghome_reset } diff --git a/src/cmd/seal.sh b/src/cmd/seal.sh index 79e5e2c..980fe64 100644 --- a/src/cmd/seal.sh +++ b/src/cmd/seal.sh @@ -19,22 +19,18 @@ cmd_seal() { rm -f "$file.sealed" fi - # get recipients get_gpg_key - local recipients="--recipient $GPG_KEY" - while [[ -n "$1" ]]; do - recipients="$recipients --recipient $1" - shift - done - + local recipients=("$GPG_KEY" "$@") + # sign and encrypt gnupghome_setup - gpg --no-tty --auto-key-locate=local,cert,keyserver,pka \ - --keyserver "$KEYSERVER" $recipients \ - --sign --encrypt --armor \ - --output "$file.sealed" "$file" + call_gpg seal.py "$file" "$recipients" + local err=$? gnupghome_reset + + [[ $err == 0 ]] || fail "" + [[ -s "$file.sealed" ]] || rm -f "$file.sealed" [[ -f "$file.sealed" ]] && shred "$file" } diff --git a/src/cmd/sign.sh b/src/cmd/sign.sh index a07e14f..c751ffa 100644 --- a/src/cmd/sign.sh +++ b/src/cmd/sign.sh @@ -15,8 +15,7 @@ cmd_sign() { # sign gnupghome_setup - gpg --local-user $GPG_KEY \ - --detach-sign --armor --output "$file.signature" "$file" + call_gpg sign.py $GPG_KEY "$file" gnupghome_reset } diff --git a/src/cmd/verify.sh b/src/cmd/verify.sh index 7c0622a..d9fa048 100644 --- a/src/cmd/verify.sh +++ b/src/cmd/verify.sh @@ -17,7 +17,7 @@ cmd_verify() { [[ -f "$file" ]] || fail "Cannot find file '$file'" # verify - gpg --verify "$signature" "$file" + call_gpg verify.py "$signature" "$file" } # diff --git a/src/egpg.sh b/src/egpg.sh index e10ee00..3604402 100755 --- a/src/egpg.sh +++ b/src/egpg.sh @@ -121,6 +121,18 @@ call_fn() { $fn "$@" } +call_gpg() { + local file=$1; shift + local pyfile="$LIBDIR/gpg/$file" + [[ -f "$pyfile" ]] || fail "Cannot find python file: $pyfile" + if is_true $DEBUG; then + # User can override level by exporting GPGME_DEBUG + [[ -z "$GPGME_DEBUG" ]] && export GPGME_DEBUG=2 + fi + export PYTHONPATH=$PYTHONPATH:"$LIBDIR/gpg/" + python3 "$pyfile" "$@" +} + call_ext() { local cmd=$1; shift @@ -185,6 +197,7 @@ config() { export GNUPGHOME export GPG_TTY=$(tty) + export DEBUG # create the config file, if it does not exist local gpghome="$GNUPGHOME" diff --git a/src/fn/print_key.sh b/src/fn/print_key.sh deleted file mode 100644 index dfffba0..0000000 --- a/src/fn/print_key.sh +++ /dev/null @@ -1,54 +0,0 @@ -# Print the details of the given key id. - -print_key() { - local id info fpr uid time1 time2 u start end exp - - id=$1 - info=$(gpg --list-keys --fingerprint --with-sig-check --with-colons $id) - - echo "id: $id" - - # uid - echo "$info" | grep -E '^uid:[^r]:' | cut -d: -f10 | \ - while read uid; do echo "uid: $uid"; done - - # fpr - fpr=$(echo "$info" | grep '^fpr:' | cut -d: -f10 | sed 's/..../\0 /g') - echo "fpr: $fpr" - - # trust - t=$(echo "$info" | grep '^pub:' | cut -d: -f9) - case "$t" in u) t='ultimate';; f) t='full';; m) t='marginal';; n) t='none';; *) t='unknown';; esac - [[ $t == 'unknown' ]] || echo "trust: $t" - - # keys - echo "$info" | grep -E '^(pub|sub):[^r]:' | cut -d: -f5,6,7,12 | while IFS=: read id time1 time2 u; do - start=$(date -d @$time1 +%F) - end='never'; [[ -n $time2 ]] && end=$(date -d @$time2 +%F) - exp=''; [[ -n $time2 ]] && [ $(date +%s) -gt $time2 ] && exp='expired' - case "$u" in a) u='auth';; s) u='sign';; e) u='decr';; *) u='sign';; esac - echo "$u: $id $start $end $exp" - done - - # verifications - echo "$info" | grep '^sig:!:' | grep -v "$id" | cut -d: -f5,10 | uniq | \ - while IFS=: read id uid; do echo "certified by: $uid ($id)"; done -} - -# -# This file is part of EasyGnuPG. EasyGnuPG is a wrapper around GnuPG -# to simplify its operations. Copyright (C) 2016 Dashamir Hoxha -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see http://www.gnu.org/licenses/ -# diff --git a/src/fn/restore_key.sh b/src/fn/restore_key.sh index 1c91854..d0b2330 100644 --- a/src/fn/restore_key.sh +++ b/src/fn/restore_key.sh @@ -13,7 +13,7 @@ restore_key() { # restore public keys local pub_key=$(ls "$WORKDIR"/*/*.pub) - gpg --import "$pub_key" || fail "Failed to import public key." + call_gpg contact/import.py "$pub_key" || fail "Failed to import public key." # set trust to 'ultimate' local key_id=$(basename "${pub_key%.pub}") diff --git a/src/gpg/__init__.py b/src/gpg/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gpg/contact/__init__.py b/src/gpg/contact/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gpg/contact/delete.py b/src/gpg/contact/delete.py new file mode 100644 index 0000000..81bac95 --- /dev/null +++ b/src/gpg/contact/delete.py @@ -0,0 +1,30 @@ +import sys + +import gpg + +from fn.auxiliary import handle_exception +from fn.print_key import print_key + + +@handle_exception(gpg.errors.GpgError) +def delete(contacts, force): + c = gpg.Context() + for contact in contacts: + keys = list(c.keylist(contact)) + ans = "n" + for key in keys: + if not force: + print_key(key.fpr, end="\n") + try: + ans = input("Delete this contact? (y/N)") + except EOFError: + exit(0) + + if ans.lower() == 'y' or force: + c.op_delete(key, False) + + +if __name__ == "__main__": + force = int(sys.argv[1]) + contacts = sys.argv[2:] + delete(contacts, force) diff --git a/src/gpg/contact/export.py b/src/gpg/contact/export.py new file mode 100644 index 0000000..916f66b --- /dev/null +++ b/src/gpg/contact/export.py @@ -0,0 +1,42 @@ +import os +import sys + +import gpg + +from fn.auxiliary import handle_exception, print_debug, print_error + + +@handle_exception(PermissionError, gpg.errors.GpgError) +def export(export_path, homedir, contacts): + c = gpg.Context(armor=True, home_dir=homedir) + + if export_path == "-": + export_file = sys.stdout + else: + export_file = open(export_path, "w") + + for user in contacts: + expkey = gpg.Data() + c.op_export(user, 0, expkey) + expkey.seek(0, os.SEEK_SET) + expstring = expkey.read() + if expstring: + export_file.write(expstring.decode()) + else: + print_error("No keys found for %s \n" % user) + + if export_path != "-": + export_file.close() + + +if __name__ == "__main__": + homedir = sys.argv[1] + export_path = sys.argv[2] + + contacts = [None] + if len(sys.argv) > 3: + contacts = sys.argv[3:] + + print_debug("contacts:", contacts, sep="\n") + + export(export_path, homedir, contacts) diff --git a/src/gpg/contact/import.py b/src/gpg/contact/import.py new file mode 100644 index 0000000..2c12423 --- /dev/null +++ b/src/gpg/contact/import.py @@ -0,0 +1,22 @@ +import sys + +import gpg + +from fn.auxiliary import fail, handle_exception + + +@handle_exception(gpg.errors.GpgError, FileNotFoundError) +def import_contact(import_path): + c = gpg.Context() + with open(import_path) as import_file: + c.op_import(import_file) + result = c.op_import_result() + if result is None: + fail("Could not import contact") + + +if __name__ == "__main__": + import_path_list = sys.argv[1:] + + for import_path in import_path_list: + import_contact(import_path) diff --git a/src/gpg/contact/list.py b/src/gpg/contact/list.py new file mode 100644 index 0000000..8598b05 --- /dev/null +++ b/src/gpg/contact/list.py @@ -0,0 +1,32 @@ +import sys + +import gpg + +from fn.auxiliary import fail, handle_exception, print_debug +from fn.print_key import print_key + + +@handle_exception(gpg.errors.GpgError) +def list_contacts(contacts): + c = gpg.Context() + + key_set = set() + + for contact in contacts: + key_set.update(c.keylist(contact)) + + if len(list(key_set)) == 0: + fail("No matching contacts found!") + + for key in key_set: + print_key(key.fpr, end="\n") + + +if __name__ == "__main__": + contacts = [None] + if len(sys.argv) > 1: + contacts = sys.argv[1:] + + print_debug("contacts:", contacts, sep="\n") + + list_contacts(contacts) diff --git a/src/gpg/contact/receive.py b/src/gpg/contact/receive.py new file mode 100644 index 0000000..d01146b --- /dev/null +++ b/src/gpg/contact/receive.py @@ -0,0 +1,31 @@ +import sys +import gpg +from fn.auxiliary import handle_exception, fail, print_debug +from fn.hkp import Server +import re + + +@handle_exception() +def receive(serverurl, keystring): + pattern8 = re.compile(r"(^[0-9a-fA-F]{8}$)|(^[0-9a-fA-F]{16}$)") + if re.match(pattern8, keystring) is None: + fail("Invalid key pattern!") + + server = Server(serverurl) + fullKey = server.get(keystring) + + if fullKey in [None, []]: + fail("No keys found") + + c = gpg.Context() + c.op_import(fullKey.encode()) + result = c.op_import_result() + if result is None: + fail("Could not import contact") + print_debug(result) + + +if __name__ == "__main__": + serverurl = sys.argv[1] + keystring = sys.argv[2] + receive(serverurl, keystring) diff --git a/src/gpg/contact/search.py b/src/gpg/contact/search.py new file mode 100644 index 0000000..6f40772 --- /dev/null +++ b/src/gpg/contact/search.py @@ -0,0 +1,31 @@ +import sys +import gpg +from fn.auxiliary import handle_exception, fail, print_debug +from fn.hkp import Server + + +@handle_exception() +def search(serverurl, searchstring): + server = Server(serverurl) + print_debug(server.serverurl) + + keys = server.index(searchstring) + + if keys in [None, []]: + fail("No keys found") + else: + keysToImport = server.getchoice(keys) + + c = gpg.Context() + for key in keysToImport: + c.op_import(key.fullKey.encode()) + result = c.op_import_result() + if result is None: + fail("Could not import contact") + print_debug(result) + + +if __name__ == "__main__": + serverurl = sys.argv[1] + searchstring = sys.argv[2] + search(serverurl, searchstring) diff --git a/src/gpg/contact/trust.py b/src/gpg/contact/trust.py new file mode 100644 index 0000000..4a8053b --- /dev/null +++ b/src/gpg/contact/trust.py @@ -0,0 +1,18 @@ +import sys + +import gpg + +from fn.auxiliary import handle_exception +from fn.interact import interact + + +@handle_exception(gpg.errors.GpgError) +def trust(contact, level): + commands = [None, "trust", None, level, None, "quit", None, None] + interact(contact, commands) + + +if __name__ == "__main__": + contact = sys.argv[1] + level = sys.argv[2] + trust(contact, level) diff --git a/src/gpg/fn/__init__.py b/src/gpg/fn/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gpg/fn/auxiliary.py b/src/gpg/fn/auxiliary.py new file mode 100644 index 0000000..b2dd203 --- /dev/null +++ b/src/gpg/fn/auxiliary.py @@ -0,0 +1,37 @@ +import os +import sys + +debug = True if os.environ["DEBUG"] == "yes" else False + + +def print_debug(*args, **kwargs): + if debug: + print(*args, **kwargs) + + +def print_error(*args, **kwargs): + print(*args, **kwargs, file=sys.stderr, flush=True) + + +def fail(e): + if debug and isinstance(e, BaseException): + raise e + print_error(e) + exit(1) + + +def handle_exception(*exceptions): + """ + Used as a decorator. + Takes exeptions and a function, + returns function warpped with try except and debug functionality + """ + def wrapper(function): + def customized_function(*args, **kwargs): + try: + function(*args, **kwargs) + except (exceptions) as e: + fail(e) + customized_function.__wrapped__ = True + return customized_function + return wrapper diff --git a/src/gpg/fn/hkp.py b/src/gpg/fn/hkp.py new file mode 100644 index 0000000..f3422cb --- /dev/null +++ b/src/gpg/fn/hkp.py @@ -0,0 +1,137 @@ +import requests +from fn.auxiliary import print_debug + + +class Key: + algomap = { + "1": "RSA", + "2": "RSA", + "3": "RSA", + "16": "Elgamal", + "17": "DSA", + "20": "Elgamal" + } + + def __init__(self, pub, uids, fullKey=None): + self.fpr = pub[0] + + try: + self.algo = Key.algomap[pub[1]] + except KeyError: + self.algo = "Unknown" + + self.length = pub[2] + self.start = pub[3] + self.expire = pub[4] + self.status = pub[5] + + self.uids = uids + self.fullKey = fullKey + + def __str__(self): + uidstring = "\n".join([x[0] for x in self.uids]) + rstring = uidstring + ("\n\t{length} bit {algo} key {fpr}, " + "created: {start} " + "expires: {end}".format(length=self.length, + algo=self.algo, + fpr=self.fpr[-16:], + start=self.start, + end=self.expire)) + return rstring + + def setFullKey(self, fullKey): + self.fullKey = fullKey + + +class TooManyKeys(Exception): + pass + + +class Server: + def __init__(self, serverurl): + self.serverurl = Server.sanitize(serverurl) + self.lookupurl = self.serverurl + "pks/lookup" + + @classmethod + def sanitize(self, serverurl): + serverurl = serverurl.strip() + + if serverurl.startswith("hkp://"): + serverurl = "http://" + serverurl[6:] + if not serverurl.startswith("http"): + serverurl = "http://" + serverurl + if not serverurl.endswith("/"): + serverurl = serverurl + "/" + + return serverurl + + def get(self, pattern): + payload = {"op": "get", "search": "0x" + pattern, "options": "mr"} + r = requests.get(self.lookupurl, verify=True, params=payload) + print_debug(r) + print_debug("Key text", r.text, sep="\n") + return r.text + + def index(self, pattern): + payload = {"op": "index", "search": pattern, "options": "mr"} + r = requests.get(self.lookupurl, verify=True, params=payload) + + # No keys found + if r.status_code == 404: + return None + elif r.status_code == 500: + raise TooManyKeys("Too many keys") + elif r.status_code == 200: + text = r.text.split("\n", 1)[1].strip() + keylist = [] + key_text = text.split("pub:")[1:] + for key in key_text: + pub, *uids = key.strip().split("\n") + pub = pub.strip().split(":") + final_uids = [] + for uid in uids: + uid = uid.strip().split(":") + if uid[0] == "uid": + final_uids.append(tuple(uid[1:])) + + finalkey = Key(pub, final_uids) + keylist.append(finalkey) + return keylist + + else: + print_debug(r) + raise Exception("Unknown Error") + + def getchoice(self, keys): + results = [] + + for sno, key in enumerate(keys): + print("({number}) {key}".format(number=sno+1, key=str(key))) + + while(True): + try: + choices = map(int, + input("Enter your choice > ").strip().split()) + break + except EOFError: + return None + except ValueError: + print("Please Enter valid choices") + + for number in choices: + if number < len(keys) and number >= 1: + keys[number-1].setFullKey(self.get(keys[number-1].fpr)) + results.append(keys[number-1]) + + return results + + +if __name__ == "__main__": + server = Server("pgp.mit.edu") + searchstr = input("Enter a recipient to search> ") + keys = server.index(searchstr) + + if keys in [None, []]: + print("No keys found") + else: + server.getchoice(keys) diff --git a/src/gpg/fn/interact.py b/src/gpg/fn/interact.py new file mode 100644 index 0000000..cdc4644 --- /dev/null +++ b/src/gpg/fn/interact.py @@ -0,0 +1,49 @@ +import gpg + +from fn.auxiliary import debug, fail, handle_exception, print_debug + + +class KeyEditor(object): + def __init__(self, cmds, verbose=False): + self.cmds = cmds + self.step = 0 + self.done = False + + def edit_fnc(self, status, args, out=None): + print_debug("Code: {status}, args: {args}\n" + .format(status=status, args=args)) + + # There may be multiple attempts to pinentry + if status == "PINENTRY_LAUNCHED": + cmd = None + else: + cmd = self.cmds[self.step] + self.step += 1 + self.done = True if len(self.cmds) == self.step else False + + if debug: + print_debug("cmd: {cmd}\n".format(cmd=cmd)) + try: + input("Debug mode: Press any key to continue!") + except EOFError: + pass + + return cmd + + +@handle_exception(gpg.errors.GpgError, AssertionError) +def interact(key, commands): + c = gpg.Context() + keys = list(c.keylist(key)) + print_debug(keys, end="\n\n") + + if len(keys) != 1: + if len(keys) == 0: + error_msg = r"No key matching {key}" + else: + error_msg = r"More than 1 matching keys for {key}" + fail(error_msg.format(key=key)) + + editor = KeyEditor(commands, 2) + c.interact(keys[0], editor.edit_fnc) + assert editor.done diff --git a/src/gpg/fn/print_key.py b/src/gpg/fn/print_key.py new file mode 100644 index 0000000..79121ff --- /dev/null +++ b/src/gpg/fn/print_key.py @@ -0,0 +1,121 @@ +# Print the details of the given key id. +import sys +import textwrap +import time + +import gpg + +from fn.auxiliary import fail, print_debug + + +def print_key(identity, end=""): + """ + print the details of key whose identitiy is passed + """ + c = gpg.Context() + key = list(c.keylist(identity, mode=gpg.constants.keylist.mode.SIGS)) + + # exit with status 1 if more than one key matches are found for identity + if len(key) != 1: + if len(key) == 0: + error_msg = r"No key matching {identity}" + else: + error_msg = r"More than 1 matching keys for {identity}" + fail(error_msg.format(identity=identity)) + + key = key[0] + + print_debug(key, end='\n\n') + + # uid + uid_list = ["uid: " + user_id.uid + "\n" for user_id in key.uids] + all_uids = "".join(uid_list) + + # fpr + fpr = " ".join(textwrap.wrap(key.fpr, 4)) + + # keyid + keyid = key.fpr[-16:] + + # trust + trust_map = { + "UNKNOWN": "UNKNOWN", + "UNDEFINED": "UNKNOWN", + "ULTIMATE": "ULTIMATE", + "NEVER": "NONE", + "MARGINAL": "MARGINAL", + "FULL": "FULL" + } + + trust = filter(lambda t: eval("gpg.constants.validity." + t) == + key.owner_trust, trust_map.keys()) + trust = trust_map[list(trust)[0]].lower() + trust = "trust: " + trust + "\n" if trust != "unknown" else "" + + # keys + subkey_list = [] + for subkey in key.subkeys: + starttime = time.strftime("%Y-%m-%d", time.localtime(subkey.timestamp)) + endtime = time.strftime("%Y-%m-%d", time.localtime(subkey.expires)) + + # check if key never expires + if subkey.expires == 0: + endtime = "never" + + exp = "expired" if subkey.expired else "" + + if subkey.can_sign: + u = "sign" + elif subkey.can_authenticate: + u = "auth" + elif subkey.can_encrypt: + u = "decr" + + subkey_map = { + "u": u, + "subkey_id": subkey.keyid, + "start": starttime, + "end": endtime, + "exp": exp + } + + subkey_list.append("{u}: {subkey_id} {start} {end} {exp}" + .format_map(subkey_map)) + + subkeys = "\n".join(subkey_list) + "\n" if subkey_list else "" + + # verifications + sign_list = set() + for uid in key.uids: + for sign in uid.signatures: + if sign.keyid != keyid and sign.uid: + if not (sign.revoked or sign.expired): + sign_list.add("certified by: " + + sign.uid + " " + sign.keyid) + + signatures = "\n".join(sign_list) + "\n" if sign_list else "" + + key_map = { + "identity": keyid, + "all_uids": all_uids, + "fpr": fpr, + "trust": trust, + "subkeys": subkeys, + "signatures": signatures + } + + print("id: {identity}\n" + "{all_uids}" + "fpr: {fpr}\n" + "{trust}" + "{subkeys}" + "{signatures}" + .format_map(key_map), end=end) + + +if __name__ == "__main__": + try: + identity = sys.argv[1] + print_key(identity) + except gpg.errors.GpgError as e: + fail(e) diff --git a/src/gpg/key/__init__.py b/src/gpg/key/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gpg/key/delete.py b/src/gpg/key/delete.py new file mode 100644 index 0000000..f95d2ec --- /dev/null +++ b/src/gpg/key/delete.py @@ -0,0 +1,18 @@ +import sys + +import gpg + +from fn.auxiliary import handle_exception + + +@handle_exception(gpg.errors.GpgError) +def delete(key_id): + c = gpg.Context() + keys = list(c.keylist(key_id)) + for key in keys: + c.op_delete(key, True) + + +if __name__ == "__main__": + key_id = sys.argv[1] + delete(key_id) diff --git a/src/gpg/key/gen.py b/src/gpg/key/gen.py new file mode 100644 index 0000000..c5111b3 --- /dev/null +++ b/src/gpg/key/gen.py @@ -0,0 +1,23 @@ +import sys + +import gpg + +from fn.auxiliary import handle_exception + + +@handle_exception(gpg.errors.GpgError) +def generate_key(parameters): + params = """ + {parameters} + + """.format(parameters=parameters) + + c = gpg.Context() + c.op_genkey(params, None, None) + print("Generated key with fingerprint {fpr}.".format( + fpr=c.op_genkey_result().fpr)) + + +if __name__ == "__main__": + parameters = sys.argv[1] + generate_key(parameters) diff --git a/src/gpg/key/pass.py b/src/gpg/key/pass.py new file mode 100644 index 0000000..03c00e3 --- /dev/null +++ b/src/gpg/key/pass.py @@ -0,0 +1,18 @@ +import sys + +import gpg + +from fn.auxiliary import handle_exception + + +@handle_exception(gpg.errors.GpgError) +def passwd(keyid): + c = gpg.Context() + keys = list(c.keylist(keyid)) + key = keys[0] + c.op_passwd(key, 0) + + +if __name__ == "__main__": + keyid = sys.argv[1] + passwd(keyid) diff --git a/src/gpg/key/renew.py b/src/gpg/key/renew.py new file mode 100644 index 0000000..ddd9d85 --- /dev/null +++ b/src/gpg/key/renew.py @@ -0,0 +1,20 @@ +import sys + +import gpg + +from fn.auxiliary import handle_exception +from fn.interact import interact + + +@handle_exception(gpg.errors.GpgError) +def renew(key, time): + commands = [None, "expire", None, time, None, + "key 1", None, "expire", None, time, None, + "save", None, None] + interact(key, commands) + + +if __name__ == "__main__": + key = sys.argv[1] + time = sys.argv[2] + renew(key, time) diff --git a/src/gpg/key/rev.py b/src/gpg/key/rev.py new file mode 100644 index 0000000..de9ee2c --- /dev/null +++ b/src/gpg/key/rev.py @@ -0,0 +1,20 @@ +import sys + +import gpg + +from fn.auxiliary import fail, handle_exception + + +@handle_exception(gpg.errors.GpgError, PermissionError, FileNotFoundError) +def revoke(revcert_path): + c = gpg.Context() + with open(revcert_path) as revcert_file: + c.op_import(revcert_file) + result = c.op_import_result() + if result is None: + fail("Error in revocation") + + +if __name__ == "__main__": + revcert_path = sys.argv[1] + revoke(revcert_path) diff --git a/src/gpg/open.py b/src/gpg/open.py new file mode 100644 index 0000000..980bba3 --- /dev/null +++ b/src/gpg/open.py @@ -0,0 +1,46 @@ +import sys +import textwrap +import time + +import gpg + +from fn.auxiliary import handle_exception, print_debug + + +def print_signatures(verify_result): + c = gpg.Context() + for signature in verify_result.signatures: + user = c.get_key(signature.fpr).uids[0].uid + fpr = signature.fpr + signed_time = time.ctime(signature.timestamp) + + message = ''' + Good signature from "{user}" + with key {fingerprint} + made at {time} + '''.format(user=user, + fingerprint=fpr, + time=signed_time) + + print(textwrap.dedent(message)) + + +@handle_exception(gpg.errors.GpgError, PermissionError, FileNotFoundError) +def open_file(sealed_file_path, output_file_path): + c = gpg.Context() + with open(sealed_file_path, "rb") as cfile: + plaintext, result, verify_result = c.decrypt( + cfile, verify=True) + + print_debug(verify_result, result) + + print_signatures(verify_result) + + with open(output_file_path, "wb") as nfile: + nfile.write(plaintext) + + +if __name__ == "__main__": + sealed_file_path = sys.argv[1] + output_file_path = sys.argv[2] + open_file(sealed_file_path, output_file_path) diff --git a/src/gpg/seal.py b/src/gpg/seal.py new file mode 100644 index 0000000..0504937 --- /dev/null +++ b/src/gpg/seal.py @@ -0,0 +1,95 @@ +import sys + +import gpg + +from fn.auxiliary import fail, handle_exception + + +def in_keyring(recipient): + matched_keys = list(gpg.Context().keylist(recipient)) + valid_keys = list(filter(lambda k: k.revoked == 0 and k.expired == 0, + matched_keys)) + if len(valid_keys) == 0: + return False + else: + return True + + +def get_selected_key(keys): + all_keys = list(keys) + valid_keys = list(filter( + lambda k: k.revoked == 0 and k.expired == 0, all_keys)) + + if len(valid_keys) == 1: + return valid_keys + + for sno, key in enumerate(valid_keys): + print(sno+1, key.uids[0].uid, key.fpr) + + chosen_keys = [] + while(True): + try: + print("Enter the number(s) to select a key, Q)uit", end='>') + indices = input().strip().split() + for index in indices: + if index.lower() == 'q': + return None + else: + chosen_keys.append(valid_keys[int(index)-1]) + + return chosen_keys + + except (IndexError, ValueError): + print("Please enter a valid choice") + + +@handle_exception(gpg.errors.GpgError, PermissionError, FileNotFoundError) +def seal(file_path, recipients): + c = gpg.Context(armor=True) + c.signers = list(c.keylist(pattern=None, secret=True)) + seal_list = [] + + # prepare the list + for recipient in recipients: + keys = c.keylist(pattern=recipient) + chosen_keys = get_selected_key(list(keys)) + seal_list.extend(chosen_keys) + + # print the list + print("The following recipients were selected:") + for sno, key in enumerate(seal_list): + print(str(sno+1) + ")", key.uids[0].uid, key.fpr) + + # encrypt + seal_path = file_path+".sealed" + with open(file_path) as infile: + with open(seal_path, "w") as outfile: + try: + _cyphertext, _result, _sign_result = c.encrypt( + infile, recipients=seal_list, + sign=True, sink=outfile) + except gpg.errors.InvalidRecipients: + print("Some of the recipients are not trusted." + " Do you want to proceed anyway[y/N]", end='>') + answer = input().lower() + if answer == 'y' or answer == 'yes': + _cyphertext, _result, _sign_result = c.encrypt( + infile, recipients=seal_list, sign=True, + sink=outfile, always_trust=True) + else: + exit(1) + + +if __name__ == "__main__": + file_path = sys.argv[1] + recipients = sys.argv[2:] + + for recipient in recipients: + if not in_keyring(recipient): + fail("Sorry! No matching contact for `{recipient}`\n" + "Please add the person to your contacts first!\n" + "For help on adding contacts run" + "`egpg contact help`\n\n" + .format(recipient=recipient)) + + seal(file_path, recipients) diff --git a/src/gpg/sign.py b/src/gpg/sign.py new file mode 100644 index 0000000..96f9562 --- /dev/null +++ b/src/gpg/sign.py @@ -0,0 +1,23 @@ +import sys + +import gpg + +from fn.auxiliary import handle_exception + + +@handle_exception(gpg.errors.GpgError, PermissionError, FileNotFoundError) +def sign(key, filename): + sig_src = list(gpg.Context().keylist(pattern=key, secret=True)) + c = gpg.Context(signers=sig_src, armor=True) + with open(filename, "rb") as tfile: + text = tfile.read() + + signed_data, _result = c.sign(text, mode=gpg.constants.sig.mode.DETACH) + + with open(filename+".signature", "wb") as afile: + afile.write(signed_data) + + +if __name__ == "__main__": + key, filename = sys.argv[1], sys.argv[2] + sign(key, filename) diff --git a/src/gpg/verify.py b/src/gpg/verify.py new file mode 100644 index 0000000..29f6c19 --- /dev/null +++ b/src/gpg/verify.py @@ -0,0 +1,29 @@ +import sys +import textwrap +import time + +import gpg + +from fn.auxiliary import handle_exception + + +@handle_exception(gpg.errors.GpgError, PermissionError, FileNotFoundError) +def verify(signature_file, filename): + c = gpg.Context() + _, result = c.verify(open(filename), open(signature_file)) + + for signature in result.signatures: + message = ''' + Good signature from "{user}" + with key {fingerprint} + made at {time} + '''.format(user=c.get_key(signature.fpr).uids[0].uid, + fingerprint=signature.fpr, + time=time.ctime(signature.timestamp)) + + print(textwrap.dedent(message)) + + +if __name__ == "__main__": + signature_file, filename = sys.argv[1], sys.argv[2] + verify(signature_file, filename) diff --git a/tests/gnupg/pubring.kbx b/tests/gnupg/pubring.kbx new file mode 100644 index 0000000..f42e3f0 Binary files /dev/null and b/tests/gnupg/pubring.kbx differ diff --git a/tests/gnupg/trustdb.gpg b/tests/gnupg/trustdb.gpg new file mode 100644 index 0000000..1a0d545 Binary files /dev/null and b/tests/gnupg/trustdb.gpg differ diff --git a/tests/setup.sh b/tests/setup.sh index cfd6d44..4b51bb0 100644 --- a/tests/setup.sh +++ b/tests/setup.sh @@ -1,5 +1,13 @@ # This file should be sourced by all test-scripts +export KEY_ID="18D1DA4D9E7A4FD0" +export KEY_FPR="A9446F790F9BE7C9D108FC6718D1DA4D9E7A4FD0" +export KEY_GRIP_1="3914E664597E9C5998B8BC994C420602895881AB" + +export CONTACT_1="290F15FEDA94668A" +export CONTACT_2="C95634F06073B549" +export CONTACT_3="262A29CB12F046E8" + cd "$(dirname "$0")" source ./sharness.sh @@ -12,22 +20,19 @@ egpg() { "$EGPG" "$@" ; } unset EGPG_DIR export HOME="$SHARNESS_TRASH_DIRECTORY" - export GNUPGHOME="$HOME"/.gnupg + +# copy keyring to $GNUPGHOME of the test cp -a "$CODE"/tests/gnupg/ "$GNUPGHOME" +chmod 700 -R "$GNUPGHOME" +# extend key expiration +commands=$(echo ";expire;1m;y;key 1;expire;1m;y;key 1;save" | tr ';' "\n") +echo -e "$commands" | gpg --no-tty --command-fd=0 --key-edit $KEY_ID 2>/dev/null export DONGLE="$HOME"/dongle mkdir -p "$DONGLE" chmod 700 "$DONGLE" -export KEY_ID="18D1DA4D9E7A4FD0" -export KEY_FPR="A9446F790F9BE7C9D108FC6718D1DA4D9E7A4FD0" -export KEY_GRIP_1="3914E664597E9C5998B8BC994C420602895881AB" - -export CONTACT_1="290F15FEDA94668A" -export CONTACT_2="C95634F06073B549" -export CONTACT_3="262A29CB12F046E8" - egpg_init() { egpg init "$@" && source "$HOME"/.bashrc && @@ -65,12 +70,7 @@ setup_autopin() { cp -f "$CODE"/utils/autopin.sh "$EGPG_DIR"/ && local autopin="$EGPG_DIR"/autopin.sh && sed -i "$autopin" -e "/^PIN=/ c PIN='$pin'" && - sed -i "$GNUPGHOME"/gpg-agent.conf -e "/^pinentry-program/ c pinentry-program \"$autopin\"" -} - -extend_test_key_expiration() { - local commands=";expire;1m;y;key 1;expire;1m;y;key 1;save" - commands=$(echo "$commands" | tr ';' "\n") - local homedir="$SHARNESS_TEST_DIRECTORY/gnupg" - echo -e "$commands" | gpg --homedir="$homedir" --no-tty --command-fd=0 --key-edit $KEY_ID 2>/dev/null + local homedir="$EGPG_DIR/.gnupg" && + sed -i "$homedir"/gpg-agent.conf -e "/^pinentry-program/ c pinentry-program \"$autopin\"" && + gpgconf --homedir="$homedir" --reload gpg-agent } diff --git a/tests/t03-init.t b/tests/t03-init.t index 8700e73..f326435 100755 --- a/tests/t03-init.t +++ b/tests/t03-init.t @@ -2,7 +2,6 @@ test_description='Command: init' source "$(dirname "$0")"/setup.sh -extend_test_key_expiration # make sure that the test key has not expired test_expect_success 'egpg' ' [[ ! -d "$HOME/.egpg" ]] && diff --git a/tests/t10-verify.t b/tests/t10-verify.t index 06ab739..f04b599 100755 --- a/tests/t10-verify.t +++ b/tests/t10-verify.t @@ -25,7 +25,7 @@ test_expect_success 'egpg verify (missing file)' ' test_expect_success 'egpg verify' ' echo "Test 1" > test1.txt && - egpg verify test1.txt.signature 2>&1 | grep "gpg: Good signature from \"Test 1 \"" + egpg verify test1.txt.signature 2>&1 | grep "Good signature from \"Test 1 \"" ' test_done diff --git a/tests/t12-open.t b/tests/t12-open.t index 41bc076..144b258 100755 --- a/tests/t12-open.t +++ b/tests/t12-open.t @@ -16,7 +16,7 @@ test_expect_success 'egpg seal' ' ' test_expect_success 'egpg open' ' - egpg open test1.txt.sealed 2>&1 | grep "gpg: Good signature from \"Test 1 \"" && + egpg open test1.txt.sealed 2>&1 | grep "Good signature from \"Test 1 \"" && [[ -f test1.txt.sealed ]] && [[ -f test1.txt ]] && [[ $(cat test1.txt) == "Test 1" ]] diff --git a/tests/t24-key-pass.t b/tests/t24-key-pass.t index 4f0a43c..4280610 100755 --- a/tests/t24-key-pass.t +++ b/tests/t24-key-pass.t @@ -9,10 +9,15 @@ test_expect_success 'egpg key pass' ' setup_autopin "0123456789" && egpg key pass && - + echo "Test 1" > test1.txt && egpg sign test1.txt && - egpg verify test1.txt.signature 2>&1 | grep "gpg: Good signature" + egpg verify test1.txt.signature 2>&1 | grep "Good signature" + + setup_autopin "xyz-wrong-passphrase" && + rm -f test1.txt.signature && + egpg sign test1.txt 2>&1 | grep "Bad passphrase" && + [[ ! -f test1.txt.signature ]] ' test_done diff --git a/tests/t31-key-split.t b/tests/t31-key-split.t index fe61fab..4c1da58 100755 --- a/tests/t31-key-split.t +++ b/tests/t31-key-split.t @@ -70,17 +70,9 @@ test_expect_success 'egpg key split (option checks)' ' echo "$DONGLE/test1" | egpg key split 2>&1 | grep "Dongle directory does not exist" && - mkdir -p "$DONGLE/test1" && - chmod -w "$DONGLE/test1" && - echo "$DONGLE/test1" | egpg key split 2>&1 | grep "Dongle directory is not writable" && - egpg set dongle "$DONGLE" && - egpg key split -b "$HOME/test1" 2>&1 | grep "Backup directory does not exist" && - - mkdir -p "$HOME/test1" && - chmod -w "$HOME/test1" && - egpg key split -b "$HOME/test1" 2>&1 | grep "Backup directory is not writable" + egpg key split -b "$HOME/test1" 2>&1 | grep "Backup directory does not exist" ' test_done diff --git a/tests/t34-split-key.t b/tests/t34-split-key.t index affb196..22a170f 100755 --- a/tests/t34-split-key.t +++ b/tests/t34-split-key.t @@ -24,7 +24,7 @@ test_expect_success 'egpg sign' ' ' test_expect_success 'egpg verify' ' - egpg verify test1.txt.signature 2>&1 | grep "gpg: Good signature from \"Test 1 \"" + egpg verify test1.txt.signature 2>&1 | grep "Good signature from \"Test 1 \"" ' test_expect_success 'egpg seal' ' @@ -34,7 +34,7 @@ test_expect_success 'egpg seal' ' ' test_expect_success 'egpg open' ' - egpg open test1.txt.sealed 2>&1 | grep "gpg: Good signature from \"Test 1 \"" && + egpg open test1.txt.sealed 2>&1 | grep "Good signature from \"Test 1 \"" && [[ -f test1.txt.sealed ]] && [[ -f test1.txt ]] && [[ $(cat test1.txt) == "Test 1" ]] diff --git a/tests/t51-contact-ls.t b/tests/t51-contact-ls.t index ee84bd8..b858fe3 100755 --- a/tests/t51-contact-ls.t +++ b/tests/t51-contact-ls.t @@ -40,8 +40,8 @@ test_expect_success 'egpg contact ls -r' ' ' test_expect_success 'egpg contact ls -c' ' - [[ $(egpg contact ls -c | grep fpr) == "fpr:::::::::A9446F790F9BE7C9D108FC6718D1DA4D9E7A4FD0:" ]] && - [[ $(egpg contact ls --colons | grep fpr) == "fpr:::::::::A9446F790F9BE7C9D108FC6718D1DA4D9E7A4FD0:" ]] + [[ $(egpg contact ls -c | grep fpr | head -n 1) == "fpr:::::::::A9446F790F9BE7C9D108FC6718D1DA4D9E7A4FD0:" ]] && + [[ $(egpg contact ls --colons | grep fpr | head -n 1) == "fpr:::::::::A9446F790F9BE7C9D108FC6718D1DA4D9E7A4FD0:" ]] ' test_done diff --git a/tests/t73-ext-key2dongle.t b/tests/t73-ext-key2dongle.t index 6593899..489f9bd 100755 --- a/tests/t73-ext-key2dongle.t +++ b/tests/t73-ext-key2dongle.t @@ -42,11 +42,7 @@ test_expect_success 'egpg key2dongle (dongle check)' ' egpg key2dongle 2>&1 | grep "You need a dongle to move the key." && - egpg key2dongle "$DONGLE/test1" 2>&1 | grep "Dongle directory does not exist" && - - mkdir -p "$DONGLE/test1" && - chmod -w "$DONGLE/test1" && - echo "$DONGLE/test1" | egpg key2dongle 2>&1 | grep "Dongle directory is not writable" + egpg key2dongle "$DONGLE/test1" 2>&1 | grep "Dongle directory does not exist" ' test_done