Actualizado el 29/11/2022.
Versión: 2.x y 3.x.
Descargas: dupsearch.zip.
Un simple pero eficaz script para encontrar todos los archivos dobles o multiplicados en el disco. Permite especificar una ruta al comenzar (C:/
, por ejemplo) y luego recorre el árbol entero de directorios con la función os.walk()
. Compara los archivos luego de obtener el hash de cada uno de ellos con el algoritmo MD5 vía el módulo estándar hashlib. Esto posibilita tratar con archivos de todos los tamaños, sin hacer diferencia, en cadenas hexadecimales de 32 caracteres. Además, permite que el programa calcule con gran precisión: dos archivos con el mismo nombre y tamaño pueden ser completamente diferentes. Al finalizar, los resultados son desplegados en un archivo de texto output.txt
. Si el proceso es detenido por el usuario (CTRL
+ C
en Windows o CTRL
+ D
en Linux), se muestran los resultados de los archivos procesados hasta el momento. Para directorios que incluyan gran cantidad de archivos y carpetas el proceso puede ser considerablemente duradero.
Para iniciar el programa debe especificarse una ruta de raíz de la siguiente forma:
python dupsearch.py ruta
Algunos ejemplos:
python dupsearch.py "C:/Users/Pc Usuario/Documents"
python dupsearch.py /home
Actualización 02/11/2015: optimización del código e implementación de múltiples hilos de acuerdo a la cantidad de núcleos del procesador para un mejor rendimiento.
Fuente
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function from hashlib import md5 from multiprocessing import cpu_count from os import walk from os.path import exists, getsize try: from Queue import Queue except ImportError: from queue import Queue from sys import argv from threading import current_thread, Lock, Thread import logging logging.basicConfig(filename="search.log", level=logging.DEBUG) # A partir de este tamaño crítico los archivos se leen # por partes. CRITIC_SIZE = 100000000 # 100 MB. # Archivos máx grandes serán ignorados. MAX_SIZE = 120000000 # 120 MB. def safe_print(lock, *args): """ Prevents multiple threads writing at the same time. """ lock.acquire() print(*args) lock.release() def get_file_md5(f, size): h = md5() split = size > CRITIC_SIZE if split: logging.info( "{0} will be splitted ({1} bytes).".format(f.name, size)) while True: # Leer por partes a partir de un tamaño crítico. token = f.read(CRITIC_SIZE if split else -1) if token: h.update(token) else: break return h.hexdigest() def worker(q, lock, files): while True: path, filename = q.get() if path is not None: process_file(path, filename, files, lock) q.task_done() if path is None: break safe_print(lock, current_thread().name, "exited.") def process_file(path, filename, files, lock): filepath = "{0}/{1}".format(path, filename) try: size = getsize(filepath) except OSError as e: logging.error( "getsize() failed for {0}. {1}.".format(filepath, e)) # Omit. size = MAX_SIZE + 1 # Ignorar archivos de gran tamaño. if size > MAX_SIZE: safe_print(lock, filepath, "omitted.") return 0 f = None try: f = open(filepath, "rb") except IOError as e: logging.error("open() failed for {0}. {1}.".format(filepath, e)) else: # Obtener el hash MD5 del contenido y # almacenarlo en el diccionario. h = get_file_md5(f, size) files.append((filepath, h)) finally: if f is not None: f.close() def main(): if len(argv) < 2: return begin_path = argv[1] if not exists(begin_path): print("Invalid path.") return files = [] # Archivos procesados. q = Queue() # Para evitar la superposición en la impresión # de textos. lock = Lock() cores = cpu_count() threads = [] logging.info("{} cores available.".format(cores)) # Lanzar tantos hilos como núcleos disponibles. for i in range(cores): t = Thread(target=worker, args=(q, lock, files)) t.start() threads.append(t) try: # Recorrer el árbol de directorios. for path, dirnames, filenames in walk(begin_path): print(path) for filename in filenames: q.put((path, filename)) except KeyboardInterrupt: # Detener el proceso. logging.info("Manually stopped.") # Terminar hilos. for i in range(cores): q.put((None, None)) # Esperar a que finalice el procesamiento de cada hilo. print("Waiting for threads...") for t in threads: t.join() scanned = len(files) print(scanned, "processed files.") filepath = [None, None] filehash = [None, None] output = open("output.txt", "a") # Cantidad de archivos duplicados o multiplicados. count = 0 print("Writing results...") while files: filepath[0], filehash[0] = files[0] # Nombre del archivo sin la ruta. filename = filepath[0][filepath[0].rfind("/") + 1:] matches = 0 matches_path = [] limit = len(files) i = 0 while i < limit: filepath[1], filehash[1] = files[i] # Determinar si coincide el nombre. name_match = filepath[1].endswith(filename) # Verificación del hash. if filehash[0] == filehash[1] and name_match: matches += 1 matches_path.append(filepath[1]) del files[i] limit -= 1 else: i += 1 if matches > 1: # Escribir resultados a output.txt. output.write(u"{0} found {1} times {2}.\n".format( filename, matches, matches_path)) count += 1 output.close() print(count, "multiplied files.") if __name__ == "__main__": main()
Curso online 👨💻
¡Ya lanzamos el curso oficial de Recursos Python en Udemy!
Un curso moderno para aprender Python desde cero con programación orientada a objetos, SQL y tkinter
en 2024.
Consultoría 💡
Ofrecemos servicios profesionales de desarrollo y capacitación en Python a personas y empresas. Consultanos por tu proyecto.
Juan says:
Gracias, funcionó perfecto en una ruta de Red.
Sólo lo modifiqué para buscar archivos con una extensión especifica.
[17] import os
[123] ext = os.path.splitext(filename)[-1].lower()
[124] if ext == ‘.cnc’: # Buscar sólo este tipo de archivo
[125] q.put((path, filename))
Andres Reyes says:
Buenos dias,
Antes que nada agradezco por compartir y creo que en mi pensar me parce un funcion muy util. Desafortunadamente no la he podido probar puesto que cuando corro el progroma en mi python 3.5 no me marca ningun error pero tampoco hace nada.
Muchas gracias por su atencion, Saludos!
Recursos Python says:
Hola Andres, me alegro que te sirva. Probablemente no estés especificando ninguna ruta de raíz para que el script comience a trabajar, tenés que pasarla como parámetro al momento de ejecutarlo. Ya actualicé el artículo con algunos ejemplos y arreglé varios errores en el código.
Cualquier otro problema no dudes en avisar.
Un saludo.