Making a PySpark File for BigQuery Samples











up vote
0
down vote

favorite












I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



import json
import pprint
import subprocess
import pyspark

sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'bigquery-public-data',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)

# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

(word_counts
.map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
.saveAsTextFile(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'{dataset}.{table} {files}'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)


I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))


Any help is extremely appreciated!










share|improve this question


























    up vote
    0
    down vote

    favorite












    I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



    I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



    import json
    import pprint
    import subprocess
    import pyspark

    sc = pyspark.SparkContext()

    # Use the Google Cloud Storage bucket for temporary BigQuery export data used
    # by the InputFormat. This assumes the Google Cloud Storage connector for
    # Hadoop is configured.
    bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
    project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
    #input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
    input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

    conf = {
    # Input Parameters
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'bigquery-public-data',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
    }

    # Output Parameters
    #output_dataset = 'wordcount_dataset'
    #output_table = 'wordcount_table'
    output_dataset = 'CS410'
    output_table = 'A4'

    # Load data in from BigQuery.
    table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

    # Perform word count.
    word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

    # Display 10 results.
    pprint.pprint(word_counts.take(10))

    # Stage data formatted as newline-delimited JSON in Google Cloud Storage.
    output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
    partitions = range(word_counts.getNumPartitions())
    output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

    (word_counts
    .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
    .saveAsTextFile(output_directory))

    # Shell out to bq CLI to perform BigQuery import.
    subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--schema word:STRING,word_count:INTEGER '
    '{dataset}.{table} {files}'.format(
    dataset=output_dataset, table=output_table, files=','.join(output_files)
    ).split())

    # Manually clean up the staging_directories, otherwise BigQuery
    # files will remain indefinitely.
    input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
    input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
    output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)


    I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



    # Perform word count.
    word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))


    Any help is extremely appreciated!










    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



      I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



      import json
      import pprint
      import subprocess
      import pyspark

      sc = pyspark.SparkContext()

      # Use the Google Cloud Storage bucket for temporary BigQuery export data used
      # by the InputFormat. This assumes the Google Cloud Storage connector for
      # Hadoop is configured.
      bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
      project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
      #input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
      input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

      conf = {
      # Input Parameters
      'mapred.bq.project.id': project,
      'mapred.bq.gcs.bucket': bucket,
      'mapred.bq.temp.gcs.path': input_directory,
      'mapred.bq.input.project.id': 'bigquery-public-data',
      'mapred.bq.input.dataset.id': 'samples',
      'mapred.bq.input.table.id': 'shakespeare',
      }

      # Output Parameters
      #output_dataset = 'wordcount_dataset'
      #output_table = 'wordcount_table'
      output_dataset = 'CS410'
      output_table = 'A4'

      # Load data in from BigQuery.
      table_data = sc.newAPIHadoopRDD(
      'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
      'org.apache.hadoop.io.LongWritable',
      'com.google.gson.JsonObject',
      conf=conf)

      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))

      # Display 10 results.
      pprint.pprint(word_counts.take(10))

      # Stage data formatted as newline-delimited JSON in Google Cloud Storage.
      output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
      partitions = range(word_counts.getNumPartitions())
      output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

      (word_counts
      .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
      .saveAsTextFile(output_directory))

      # Shell out to bq CLI to perform BigQuery import.
      subprocess.check_call(
      'bq load --source_format NEWLINE_DELIMITED_JSON '
      '--schema word:STRING,word_count:INTEGER '
      '{dataset}.{table} {files}'.format(
      dataset=output_dataset, table=output_table, files=','.join(output_files)
      ).split())

      # Manually clean up the staging_directories, otherwise BigQuery
      # files will remain indefinitely.
      input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
      input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
      output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
      output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
      output_path, True)


      I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))


      Any help is extremely appreciated!










      share|improve this question













      I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



      I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



      import json
      import pprint
      import subprocess
      import pyspark

      sc = pyspark.SparkContext()

      # Use the Google Cloud Storage bucket for temporary BigQuery export data used
      # by the InputFormat. This assumes the Google Cloud Storage connector for
      # Hadoop is configured.
      bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
      project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
      #input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
      input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

      conf = {
      # Input Parameters
      'mapred.bq.project.id': project,
      'mapred.bq.gcs.bucket': bucket,
      'mapred.bq.temp.gcs.path': input_directory,
      'mapred.bq.input.project.id': 'bigquery-public-data',
      'mapred.bq.input.dataset.id': 'samples',
      'mapred.bq.input.table.id': 'shakespeare',
      }

      # Output Parameters
      #output_dataset = 'wordcount_dataset'
      #output_table = 'wordcount_table'
      output_dataset = 'CS410'
      output_table = 'A4'

      # Load data in from BigQuery.
      table_data = sc.newAPIHadoopRDD(
      'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
      'org.apache.hadoop.io.LongWritable',
      'com.google.gson.JsonObject',
      conf=conf)

      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))

      # Display 10 results.
      pprint.pprint(word_counts.take(10))

      # Stage data formatted as newline-delimited JSON in Google Cloud Storage.
      output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
      partitions = range(word_counts.getNumPartitions())
      output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

      (word_counts
      .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
      .saveAsTextFile(output_directory))

      # Shell out to bq CLI to perform BigQuery import.
      subprocess.check_call(
      'bq load --source_format NEWLINE_DELIMITED_JSON '
      '--schema word:STRING,word_count:INTEGER '
      '{dataset}.{table} {files}'.format(
      dataset=output_dataset, table=output_table, files=','.join(output_files)
      ).split())

      # Manually clean up the staging_directories, otherwise BigQuery
      # files will remain indefinitely.
      input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
      input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
      output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
      output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
      output_path, True)


      I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))


      Any help is extremely appreciated!







      pyspark google-cloud-platform google-cloud-datastore






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 10 at 1:08









      Adrian Bernat

      187




      187





























          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53235163%2fmaking-a-pyspark-file-for-bigquery-samples%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53235163%2fmaking-a-pyspark-file-for-bigquery-samples%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Schultheiß

          Verwaltungsgliederung Dänemarks

          Liste der Kulturdenkmale in Wilsdruff