Skip to content

Deduplication globally within the entire directory #372

@ql1235

Description

@ql1235

Hello,

I have a directory of structure as follow

/data/
   /sub-folder1
       file1
       file2
   /sub-folder2
       file3 ..

I want to perform deduplication globally by running this job. After running, I realize that it does not dedup across different files. I tried to generate rows with the same text value in different files and the output still have both rows. Can someone help point me to the right configuration to perform deduplication globally?

Another question, I have read in the fineweb paper that deduplication within "dump" gives a better results than globally. What exactly is a "dump" here? It looks like a folder of data, but just want to confirm.

Thank you!

minhash_config = MinhashConfig(
    hash_config=HashConfig(hash_fc="xxhash", precision=64),
    num_buckets=14, #Used to be 14
    hashes_per_bucket=8,
    n_grams=5,
)
TOTAL_TASKS = 132

PARQUET_READER = ParquetReader(PARQUET_DIR)

# 1. Minhash deduplication
dedup_step1 = LocalPipelineExecutor(
    pipeline=[
        PARQUET_READER,
        MinhashDedupSignature(
            output_folder=f"{MINHASH_DIR}/minhash_signatures",
            config=minhash_config,
            language=LANGUAGE
        ),
    ],
    tasks = TOTAL_TASKS
)

dedup_step2 = LocalPipelineExecutor(
    pipeline=[
        MinhashDedupBuckets(
            input_folder=f"{MINHASH_DIR}/minhash_signatures",
            output_folder=f"{MINHASH_DIR}/minhash_buckets",
            index_folder=f"{MINHASH_DIR}/minhash_indices",
            create_index_name="my_dataset_index",
            config=minhash_config,
            only_dedup_in_index=False,
        ),
    ],
    tasks = minhash_config.num_buckets,
    depends=dedup_step1,
)

dedup_step3 = LocalPipelineExecutor(
    pipeline=[
        MinhashDedupCluster(
            input_folder=f"{MINHASH_DIR}/minhash_buckets",
            output_folder=f"{MINHASH_DIR}/remove_ids",
            config=minhash_config,
            save_cluster_size=True
        ),
    ],
    tasks = 1,
    depends=dedup_step2,
)

dedup_step4 = LocalPipelineExecutor(
    pipeline=[
        PARQUET_READER,
        MinhashDedupFilter(
            input_folder=f"{MINHASH_DIR}/remove_ids",
            exclusion_writer=JsonlWriter(f"{MINHASH_DIR}/removed"),
        ),
        JsonlWriter(output_folder=f"{MINHASH_DIR}/deduplicated_output"),
    ],
    tasks = 1,
    depends=dedup_step3,
)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions