Memperluas privasi diferensial

Dokumen ini memberikan contoh cara memperluas privasi diferensial untuk privasi diferensial BigQuery.

Ringkasan

BigQuery memungkinkan Anda memperluas privasi diferensial ke sumber data multi-cloud dan library privasi diferensial eksternal. Dokumen ini berisi contoh cara menerapkan privasi diferensial untuk sumber data multi-cloud seperti AWS S3 dengan BigQuery Omni, cara memanggil library privasi diferensial eksternal menggunakan fungsi jarak jauh, dan cara melakukan agregasi privasi diferensial dengan PipelineDP, library Python yang dapat dijalankan dengan Apache Spark dan Apache Beam.

Untuk informasi privasi diferensial selengkapnya, lihat Menggunakan privasi diferensial.

Privasi diferensial dengan BigQuery Omni

Privasi diferensial BigQuery mendukung panggilan ke sumber data multi-cloud seperti AWS S3. Contoh berikut meng-kueri sumber data eksternal, foo.wikidata, dan menerapkan privasi diferensial. Untuk informasi selengkapnya tentang sintaksis klausa privasi diferensial, lihat Klausa privasi diferensial.

SELECT
  WITH
    DIFFERENTIAL_PRIVACY
      OPTIONS (
        epsilon = 1,
        delta = 1e-5,
        privacy_unit_column = foo.wikidata.es_description)
      COUNT(*) AS results
FROM foo.wikidata;

Contoh ini menampilkan hasil yang mirip dengan berikut ini:

-- These results will change each time you run the query.
+----------+
| results  |
+----------+
| 3465     |
+----------+

Untuk mengetahui informasi selengkapnya tentang batasan BigQuery Omni, lihat Batasan.

Memanggil library privasi diferensial eksternal dengan fungsi jarak jauh

Anda dapat memanggil library privasi diferensial eksternal menggunakan fungsi jarak jauh. Link berikut menggunakan fungsi jarak jauh untuk memanggil library eksternal yang dihosting oleh Tumult Analytics untuk menggunakan privasi diferensial terkonsentrasi nol pada set data penjualan retail.

Untuk informasi tentang cara menggunakan Tumult Analytics, lihat postingan peluncuran Tumult Analytics {: .external}.

Agregasi privasi diferensial dengan PipelineDP

PipelineDP adalah library Python yang melakukan agregasi privasi diferensial dan dapat berjalan dengan Apache Spark dan Apache Beam. BigQuery dapat menjalankan prosedur tersimpan Apache Spark yang ditulis dalam Python. Untuk mengetahui informasi selengkapnya tentang cara menjalankan prosedur tersimpan Apache Spark, lihat Menggunakan prosedur tersimpan untuk Apache Spark.

Contoh berikut melakukan agregasi privasi diferensial menggunakan library PipelineDP. Metode ini menggunakan set data publik Perjalanan Taksi Chicago dan menghitung untuk setiap mobil taksi - jumlah perjalanan, serta jumlah dan rata-rata tips untuk perjalanan ini.

Sebelum memulai

Image Apache Spark standar tidak menyertakan PipelineDP. Anda harus membuat image Docker yang berisi semua dependensi yang diperlukan sebelum menjalankan prosedur tersimpan PipelineDP. Bagian ini menjelaskan cara membuat dan mengirim image Docker ke Google Cloud.

Sebelum memulai, pastikan Anda telah menginstal Docker di komputer lokal, dan menyiapkan autentikasi untuk mengirim image Docker ke gcr.io. Untuk mengetahui informasi selengkapnya tentang cara mengirim image Docker, lihat Mengirim dan mengambil image.

Membuat dan mengirim image Docker

Untuk membuat dan mengirim image Docker dengan dependensi yang diperlukan, ikuti langkah-langkah berikut:

  1. Buat folder lokal DIR.
  2. Download Penginstal Miniconda, dengan versi Python 3.9, ke DIR.
  3. Simpan teks berikut ke Dockerfile.

    
      # Debian 11 is recommended.
      FROM debian:11-slim
    
      # Suppress interactive prompts
      ENV DEBIAN_FRONTEND=noninteractive
    
      # (Required) Install utilities required by Spark scripts.
      RUN apt update && apt install -y procps tini libjemalloc2
    
      # Enable jemalloc2 as default memory allocator
      ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
    
      # Install and configure Miniconda3.
      ENV CONDA_HOME=/opt/miniconda3
      ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
      ENV PATH=${CONDA_HOME}/bin:${PATH}
      COPY Miniconda3-py39_23.1.0-1-Linux-x86_64.sh .
      RUN bash Miniconda3-py39_23.1.0-1-Linux-x86_64.sh -b -p /opt/miniconda3 \
      && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
      && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
      && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
      && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
    
      # The following packages are installed in the default image, it is
      # strongly recommended to include all of them.
      #
      # Use mamba to install packages quickly.
      RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
      && ${CONDA_HOME}/bin/mamba install \
        conda \
        cython \
        fastavro \
        fastparquet \
        gcsfs \
        google-cloud-bigquery-storage \
        google-cloud-bigquery[pandas] \
        google-cloud-bigtable \
        google-cloud-container \
        google-cloud-datacatalog \
        google-cloud-dataproc \
        google-cloud-datastore \
        google-cloud-language \
        google-cloud-logging \
        google-cloud-monitoring \
        google-cloud-pubsub \
        google-cloud-redis \
        google-cloud-spanner \
        google-cloud-speech \
        google-cloud-storage \
        google-cloud-texttospeech \
        google-cloud-translate \
        google-cloud-vision \
        koalas \
        matplotlib \
        nltk \
        numba \
        numpy \
        openblas \
        orc \
        pandas \
        pyarrow \
        pysal \
        pytables \
        python \
        regex \
        requests \
        rtree \
        scikit-image \
        scikit-learn \
        scipy \
        seaborn \
        sqlalchemy \
        sympy \
        virtualenv
    
      RUN apt install -y python3-pip
      RUN pip install --no-input pipeline-dp==0.2.0
    
      # (Required) Create the 'spark' group/user.
      # The GID and UID must be 1099. Home directory is required.
      RUN groupadd -g 1099 spark
      RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
      USER spark
    
    
  4. Jalankan perintah berikut.

    
    IMAGE=gcr.io/PROJECT_ID/DOCKER_IMAGE:0.0.1
    # Build and push the image.
    docker build -t "${IMAGE}"
    docker push "${IMAGE}"
    
    

    Ganti kode berikut:

    • PROJECT_ID: project tempat Anda ingin membuat image Docker.
    • DOCKER_IMAGE: nama image Docker.

    Image diupload.

Menjalankan prosedur tersimpan PipelineDP

  1. Untuk membuat prosedur tersimpan, gunakan pernyataan CREATE PROCEDURE.

    CREATE OR REPLACE
    PROCEDURE
      `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
      WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      engine = "SPARK",
      container_image= "gcr.io/PROJECT_ID/DOCKER_IMAGE")
    LANGUAGE PYTHON AS R"""
    from pyspark.sql import SparkSession
    import pipeline_dp
    
    def compute_dp_metrics(data, spark_context):
    budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=10,
                                                          total_delta=1e-6)
    backend = pipeline_dp.SparkRDDBackend(spark_context)
    
    # Create a DPEngine instance.
    dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
    
    params = pipeline_dp.AggregateParams(
        noise_kind=pipeline_dp.NoiseKind.LAPLACE,
        metrics=[
            pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM,
            pipeline_dp.Metrics.MEAN],
        max_partitions_contributed=1,
        max_contributions_per_partition=1,
        min_value=0,
        # Tips that are larger than 100 will be clipped to 100.
        max_value=100)
    # Specify how to extract privacy_id, partition_key and value from an
    # element of the taxi dataset.
    data_extractors = pipeline_dp.DataExtractors(
        partition_extractor=lambda x: x.taxi_id,
        privacy_id_extractor=lambda x: x.unique_key,
        value_extractor=lambda x: 0 if x.tips is None else x.tips)
    
    # Run aggregation.
    dp_result = dp_engine.aggregate(data, params, data_extractors)
    budget_accountant.compute_budgets()
    dp_result = backend.map_tuple(dp_result, lambda pk, result: (pk, result.count, result.sum, result.mean))
    return dp_result
    
    spark = SparkSession.builder.appName("spark-pipeline-dp-demo").getOrCreate()
    spark_context = spark.sparkContext
    
    # Load data from BigQuery.
    taxi_trips = spark.read.format("bigquery") \
    .option("table", "bigquery-public-data:chicago_taxi_trips.taxi_trips") \
    .load().rdd
    dp_result = compute_dp_metrics(taxi_trips, spark_context).toDF(["pk", "count","sum", "mean"])
    # Saving the data to BigQuery
    dp_result.write.format("bigquery") \
    .option("writeMethod", "direct") \
    .save("DATASET_ID.TABLE_NAME")
    """;
    

    Ganti kode berikut:

    • PROJECT_ID: project tempat Anda ingin membuat prosedur tersimpan.
    • DATASET_ID: set data tempat Anda ingin membuat prosedur tersimpan.
    • REGION: region tempat project Anda berada.
    • DOCKER_IMAGE: nama image Docker.
    • CONNECTION_ID: nama koneksi.
    • TABLE_NAME: nama tabel.
  2. Gunakan pernyataan CALL untuk memanggil prosedur.

    CALL `PROJECT_ID.DATASET_ID.pipeline_dp_example_spark_proc`()
    

    Ganti kode berikut:

    • PROJECT_ID: project tempat Anda ingin membuat prosedur tersimpan.
    • DATASET_ID: set data tempat Anda ingin membuat prosedur tersimpan.

Langkah selanjutnya