diff --git a/python/ql/lib/semmle/python/security/dataflow/TarSlipCustomizations.qll b/python/ql/lib/semmle/python/security/dataflow/TarSlipCustomizations.qll index 2dbe2c542aee..fb28100f8e94 100644 --- a/python/ql/lib/semmle/python/security/dataflow/TarSlipCustomizations.qll +++ b/python/ql/lib/semmle/python/security/dataflow/TarSlipCustomizations.qll @@ -132,6 +132,52 @@ module TarSlip { } } + /** + * A call to `shutil.unpack_archive`, considered as a flow sink. + * + * The first argument (the archive filename) is the sink. + */ + class ShutilUnpackArchiveSink extends Sink { + ShutilUnpackArchiveSink() { + this = API::moduleImport("shutil").getMember("unpack_archive").getACall().getArg(0) + } + } + + /** + * A call to `subprocess` functions that invokes `tar` for archive extraction, + * considered as a flow sink. + * + * The sink is the non-literal element in the command list that corresponds + * to the archive filename (e.g. the `unsafe_filename` in + * `subprocess.run(["tar", "-xf", unsafe_filename])`). + */ + class SubprocessTarExtractionSink extends Sink { + SubprocessTarExtractionSink() { + exists(SequenceNode cmdList, DataFlow::CallCfgNode call, int i | + call = + API::moduleImport("subprocess") + .getMember(["run", "call", "check_call", "check_output", "Popen"]) + .getACall() and + cmdList = call.getArg(0).asCfgNode() and + // Command must be "tar" exactly or a path ending in "/tar" (e.g. "/usr/bin/tar") + exists(string cmd | + cmd = cmdList.getElement(0).getNode().(StringLiteral).getText() and + (cmd = "tar" or cmd.matches("%/tar")) + ) and + // At least one extraction-related flag must be present: + // single-dash flags containing 'x' (like -x, -xf, -xvf) or the long option --extract + exists(string flag | + flag = cmdList.getElement(_).getNode().(StringLiteral).getText() and + (flag.regexpMatch("-[a-zA-Z]*x[a-zA-Z]*") or flag = "--extract") + ) and + // The sink is the specific non-literal argument (the archive filename) + i > 0 and + not cmdList.getElement(i).getNode() instanceof StringLiteral and + this.asCfgNode() = cmdList.getElement(i) + ) + } + } + /** * Holds if `g` clears taint for `tarInfo`. * diff --git a/python/ql/test/query-tests/Security/CWE-022-TarSlip/tarslip.py b/python/ql/test/query-tests/Security/CWE-022-TarSlip/tarslip.py index 2c06d01adfd5..bf8e2d0e2642 100644 --- a/python/ql/test/query-tests/Security/CWE-022-TarSlip/tarslip.py +++ b/python/ql/test/query-tests/Security/CWE-022-TarSlip/tarslip.py @@ -114,3 +114,16 @@ def safemembers(members): tar = tarfile.open(unsafe_filename_tar) tar.extractall(members=safemembers(tar), filter=extraction_filter) # safe -- we assume `safemembers` makes up for the unsafe filter + +import shutil +import subprocess + +# shutil.unpack_archive +shutil.unpack_archive(unsafe_filename_tar, "out") # unsafe +shutil.unpack_archive("safe.tar", "out") # safe + +# subprocess tar extraction +subprocess.run(["tar", "-xf", unsafe_filename_tar]) # unsafe +subprocess.check_call(["tar", "-xf", unsafe_filename_tar]) # unsafe +subprocess.run(["tar", "-xf", "safe.tar"]) # safe +subprocess.run(["echo", unsafe_filename_tar]) # safe - not a tar extraction