Demo - Queries for distribition of topics over time

Important: Here, for each query we read the data from files to memory, run the query, and then gets the results.



And unzipped it later.

  • How to generate nls_total_demo.txt
 find /mnt/lustre/at003/at003/rfilguei2/nls-data-encyclopaediaBritannica -maxdepth 1 -type d >& nls_total_demo.txt

(And delete the first row: '/mnt/lustre/at003/at003/rfilguei2/nls-data-encyclopaediaBritannica')

  • Install Spark and Java 8
 sudo apt install openjdk-8-jdk
 tar xvf spark-2.4.2-bin-hadoop2.7.tgz
  • Install defoe
 conda create -n mypy27 python=2.7 anaconda
 conda activate mypy27
 conda update -n base -c defaults conda
 pip install Pillow==4.0.0
 >> import nltk
  • Zip defoe code:
   cd defoe
   zip -r defoe

Individual Queries [defoe/]

Format:spark-submit --py-files defoe/ <DATA_FILE> <MODEL_NAME> <QUERY_NAME> <QUERY_CONFING> -r -n <NUM_CORES>

Notes: Everytime we run a query (e.g. defoe.nls.queries.total_documents or defoe.nls.queries.normalize), defoe loads/reads data from files into memory, and later the query is run. So, each time the data is read, ingested, queried.

  • Total_documents
  spark-submit --py-files defoe/ nls_total_demo.txt nls defoe.nls.queries.total_documents  -r results_total_documents -n 324 
  • Normalize query- It gets the total of documents, pages, words groupped by year
  spark-submit --py-files defoe/ nls_total_demo.txt nls defoe.nls.queries.normalize  -r results_norm -n 324  
  • Keysearch by topics [sport, philosophers, cities, animals] - group by year

    • Sports - normalize preprocessing (check queries/sport.yml to see the preprocessing treatments)
      spark-submit --py-files defoe/ nls_total_demo.txt nls defoe.nls.queries.keysearch_by_year queries/sport.yml -r results_ks_sports -n 324  
    • Scottish Philosophers - normalization and lemmatization (normalization is applied first always if lemmatization or stemming is selected) preprocessing (check queries/sc_philosophers to see the preprocessing treatment)
      spark-submit --py-files defoe/ nls_total_demo.txt nls defoe.nls.queries.keysearch_by_year queries/sc_philosophers.yml -r results_ks_philosophers -n 324  
    • Cities - normalization and lemmatization (check queries/sc_cities.yml)
      spark-submit --py-files defoe/ nls_total_demo.txt nls defoe.nls.queries.keysearch_by_year queries/sc_cities.yml -r results_ks_cities -n 324 > log.txt
    • Animals - normalization and lemmatization(check)
      spark-submit --py-files defoe/ nls_total_demo.txt nls defoe.nls.queries.keysearch_by_year queries/animal.yml -r results_ks_animal -n 324 > log.txt
  • Getting the inventory per year [title and edition]

  spark-submit --py-files defoe/ nls_total_demo.txt nls defoe.nls.queries.inventory_per_year -r results_inventory_per_year -n 324 

Writing and Reading data from/to HDFS

Writing pages to HDFS cvs file using dataframes loads in memory all the pages and their metadata, applies all type of preprocess treatment ot the pages, create a dataframe, store data into the dataframe, and finally save the dataframe into HDFS using a csv file.


The information stored per page is the following: "title", "edition", "year", "place", "archive_filename", "source_text_filename", "text_unit", "text_unit_id", "num_text_unit", "type_archive", "model", "source_text_raw", "source_text_clean", "source_text_norm", "source_text_lemmatize", "source_text_stem", "num_words".

In “source_text_clean”, I store the result of applying two modifications to the raw text (source_text_raw): 1) Handle hyphenated words and 2) fix the long-s. The pre-process treatments (normalize, stem and lemmatize) are applied to text stored in this field, and not from the raw one. Both, stem and lemmatize, they also include normalization. .

We have to indicate the HDFS FILE inside (e.g. "nls_demo.csv").

Notice that defoe_path and os_type properties (needed for the long-S fix) are indicated in the configuration file, such as queries/writehdfs.yml:

defoe_path: /home/rosa_filgueira_vicente/defoe/
os_type: linux
 nohup spark-submit --py-files defoe/ nls_tiny.txt nls defoe.nls.queries.write_pages_df_hdfs queries/writehdfs.yml -r results -n 324 > log.txt &

Important --> nls_tiny.txt is:

xxx/nls -data-encyclopaediaBritannica/191253839
xxx/nls -data-encyclopaediaBritannica/144133902
xxx/nls -data-encyclopaediaBritannica/144850368
xxx/nls -data-encyclopaediaBritannica/190273291
xxx/nls -data-encyclopaediaBritannica/191253819
xxx/nls -data-encyclopaediaBritannica/191678900
xxx/nls -data-encyclopaediaBritannica/192984259
xxx/nls -data-encyclopaediaBritannica/193819047
xxx/nls -data-encyclopaediaBritannica/191678897
xxx/nls -data-encyclopaediaBritannica/192547788
xxx/nls -data-encyclopaediaBritannica/193916150
  • Checking results from HDFS file
 hdfs dfs -getmerge /user/at003/rosa/nls_demo.csv nls_demo.csv

Read pages (preprocessed or just clean) as Dataframes from HDFS CSV file, and do a keysearch groupping results by year.

In hdfs_data.txt we have to indicate the HDFS file that we want to read from (e.g. hdfs:///user/at003/rosa/nls_demo.csv)

In the configuration file (e.g.queries/sport.yml) we have to indicate which preprocess treatment (e.g. none, normalize, etc.) we want to use in the query, so we can select the dataframe's columm (e.g. source_text_clean, source_text_norm, etc.) according to that.

	preprocess: normalize
	data: sport.txt
        defoe_path: /lustre/home/sc048/rosaf4/defoe/
        os_type: linux

Note that the defoe_path and os_type parameters are not needed in this query.

  spark-submit --py-files defoe/ hdfs_data.txt hdfs defoe.hdfs.queries.keysearch_by_year queries/sport.yml  -r results_ks_sports_tiny -n 324 
Writing and Reading data from/to PostgreSQL database

Writing pages to PostgresSQL database using dataframes loads in memory all the pages and their metadata, applies all type of preprocess treatment ot the pages, create a dataframe, store data into the dataframe, and finally save the dataframe into a database table. Properties of the database to use can be specified by using a config file (e.g. queries/db_properties.yml)

The information stored per page is the following: "title", "edition", "year", "place", "archive_filename", "source_text_filename", "text_unit", "text_unit_id", "num_text_unit", "type_archive", "model", "source_text_raw", "source_text_clean", "source_text_norm", "source_text_lemmatize", "source_text_stem", "num_words".

In “source_text_clean”, I store the result of applying two modifications to the raw text (source_text_raw): 1) Handle hyphenated words and 2) fix the long-s. The pre-process treatments (normalize, stem and lemmatize) are applied to text stored in this field, and not from the raw one. Both, stem and lemmatize, they also include normalization. .

spark-submit --driver-class-path $HOME/postgresql-42.2.8.jar --jars $HOME/postgresql-42.2.8.jar --py-files defoe/ nls_tiny.txt nls defoe.nls.queries.write_pages_df_psql queries/db_properties.yml  -r results -n 324 

Notice that the properties of the database to use are indicated in a configuration file, such as queries/db_properties.yml, along with defoe_path and os_type properties (needed for the long-S fix):

host: ati-nid00006
port: 55555
database: defoe_db
table: publication_page
user: rfilguei2
defoe_path: /home/rosa_filgueira_vicente/defoe/
os_type: linux


  • You need to have the postgresql driver, or download it and indicate it in the spark-submit command (see previous command).

  • You need to have previously the postgreSQL database created- See extended notes. However, the table will be created automatically.

psql -d defoe_db 

                          List of relations
 Schema |       Name       | Type  |   Owner   |  Size  | Description 
 public | publication_page | table | rfilguei2 | 138 MB | 
(1 row)

defoe_db=# \d+ publication_page
                                     Table "public.publication_page"
        Column         |  Type  | Collation | Nullable | Default | Storage  | Stats target | Description 
 title                 | text   |           |          |         | extended |              | 
 edition               | text   |           |          |         | extended |              | 
 year                  | bigint |           |          |         | plain    |              | 
 place                 | text   |           |          |         | extended |              | 
 archive_filename      | text   |           |          |         | extended |              | 
 source_text_filename  | text   |           |          |         | extended |              | 
 text_unit             | text   |           |          |         | extended |              | 
 text_unit_id          | text   |           |          |         | extended |              | 
 num_text_unit         | bigint |           |          |         | plain    |              | 
 type_archive          | text   |           |          |         | extended |              | 
 model                 | text   |           |          |         | extended |              | 
 source_text_raw       | text   |           |          |         | extended |              | 
 source_text_clean     | text   |           |          |         | extended |              | 
 source_text_norm      | text   |           |          |         | extended |              | 
 source_text_lemmatize | text   |           |          |         | extended |              | 
 source_text_stem      | text   |           |          |         | extended |              | 
 num_words             | bigint |           |          |         | plain    |              | 

Read pages (preprocessed or just clean) as Dataframes from PostgreSQL database, and do a keysearch groupping results by year.

In the configuration file (e.g.queries/sport.yml) we have to indicate which preprocess treatment (e.g. none, normalize, etc.) we want to use in the query, so we can select the dataframe's columm (e.g. source_text_clean, source_text_norm, etc.) according to that.

	preprocess: normalize
	data: sport.txt
        defoe_path: /lustre/home/sc048/rosaf4/defoe/
        os_type: linux

Note that the defoe_path and os_type parameters are not needed in this query.

spark-submit --driver-class-path $HOME/postgresql-42.2.8.jar --jars $HOME/postgresql-42.2.8.jar --py-files defoe/ db_data.txt psql defoe.psql.queries.keysearch_by_year queries/sport.yml  -r results_ks_sports_tiny -n 324

Important: A file with the database properties has to be specified (e.g.db_data.txt). It has to have the following information (and in this order), separated by comma:

#host,port,db_name,user,driver,table_name ati-nid00006,55555,defoe_db,rfilguei2,org.postgresql.Driver,publication_page

Writing and Reading data to/from ElasticSearch (ES)

Writing pages to ES using dataframes loads in memory all the pages and their metadata, applies all type of preprocess treatment ot the pages, create a dataframe, store data into the dataframe, and finally save the dataframe into ES.

The information stored per page is the following: "title", "edition", "year", "place", "archive_filename", "source_text_filename", "text_unit", "text_unit_id", "num_text_unit", "type_archive", "model", "source_text_raw", "source_text_clean", "source_text_norm", "source_text_lemmatize", "source_text_stem", "num_words".

In “source_text_clean”, I store the result of applying two modifications to the raw text (source_text_raw): 1) Handle hyphenated words and 2) fix the long-s. The pre-process treatments (normalize, stem and lemmatize) are applied to text stored in this field, and not from the raw one. Both, stem and lemmatize, they also include normalization. .

spark-submit --driver-class-path elasticsearch-hadoop-7.5.0/dist/elasticsearch-hadoop-7.5.0.jar --jars elasticsearch-hadoop-7.5.0/dist/elasticsearch-hadoop-7.5.0.jar  --py-files defoe/ nls-data.txt nls defoe.nls.queries.write_pages_df_es queries/es_properties_edina_eb-with-mapping.yml -r results -n 324

Notice that the properties of index and type name for ES are indicated in a configuration file, such as queries/es_properties_edina_eb-with-mapping.yml, along with defoe_path and os_type properties (needed for the long-S fix):

index: eb-with-mapping
port: 9200
defoe_path: /home/rosa_filgueira_vicente/defoe/
os_type: linux


  • You need to have the elasticsearch-hadoop driver, or download it and indicate it in the spark-submit command (see previous command).

Read pages (preprocessed or just clean) as Dataframes from ES, and do a keysearch groupping results by year.

In the configuration file (e.g.queries/sport.yml) we have to indicate which preprocess treatment (e.g. none, normalize, etc.) we want to use in the query, so we can select the dataframe's columm (e.g. source_text_clean, source_text_norm, etc.) according to that.

	preprocess: normalize
	data: sport.txt
        defoe_path: /lustre/home/sc048/rosaf4/defoe/
        os_type: linux

Note that the defoe_path and os_type parameters are not needed in this query.

spark-submit --driver-class-path elasticsearch-hadoop-7.5.0/dist/elasticsearch-hadoop-7.5.0.jar --jars elasticsearch-hadoop-7.5.0/dist/elasticsearch-hadoop-7.5.0.jar --py-files defoe/ es_data.txt es queries/sport.yml  -r results_ks_sports -n 324

Important: A file with the ES properties has to be specified (e.g.es_data.txt). It has to have the following information (and in this order), separated by comma:

#index,host,port eb-with-mapping,,9200

Spark in a SHELL - Pyspark

Reading dataframes from HDFS:

 >> df="hdfs:///user/at003/rosa/nls_demo.csv", header="true")
 >> def blank_as_null(x):
...     return when(col(x) != "", col(x)).otherwise(None)
>> fdf = df.withColumn("page_string_norm", blank_as_null("page_string_norm"))
>> newdf=fdf.filter(fdf.page_string_raw.isNotNull()).filter(fdf["model"]=="nls").select(fdf.year, fdf.page_string_raw)
>> nls_sample=pages.take(8)
>> entry= nls_sample[8]
>> year = entry[0]
>> page_as_string = entry[1]

Reading dataframes from PostgreSQL:

pyspark --driver-class-path postgresql-42.2.8.jar --jars postgresql-42.2.8.jar
from pyspark.sql import DataFrameReader

>> from pyspark.sql import DataFrameReader
>> from pyspark.sql.functions import when, col
>> url = 'postgresql://ati-nid00006:55555/defoe_db'
>> properties = {'user': 'rfilguei2', 'driver': 'org.postgresql.Driver'}
>> df = DataFrameReader(sqlContext).jdbc(url='jdbc:%s' % url, table='publication_page' , properties=properties)
>> def blank_as_null(x):
...     return when(col(x) != "", col(x)).otherwise(None)
>> fdf = df.withColumn("source_text_norm", blank_as_null("source_text_norm"))
>> newdf=fdf.filter(fdf.source_text_clean.isNotNull()).filter(fdf["model"]=="nls").select(fdf.year, fdf.source_text_clean)
>> nls_sample=pages.take(7)
>> entry= nls_sample[7]
>> year = entry[0]
>> page_as_string = entry[1]

Reading dataframes from ES:

pyspark --jars elasticsearch-hadoop-7.5.0/dist/elasticsearch-hadoop-7.5.0.jar 
>> from pyspark.sql.functions import when, col
>> reader ="org.elasticsearch.spark.sql").option("", "true").option("es.nodes.wan.only","true").option("es.port","9200").option("","false").option("es.nodes", "http://localhost")
>> df = reader.load("nls/Encyclopaedia_Britannica")
>> def blank_as_null(x):
...     return when(col(x) != "", col(x)).otherwise(None)
>> fdf = df.withColumn("source_text_norm", blank_as_null("source_text_norm"))
>> newdf=fdf.filter(fdf.source_text_clean.isNotNull()).filter(fdf["model"]=="nls").select(fdf.year, fdf.source_text_clean)
>> nls_sample=pages.take(7)
>> entry= nls_sample[7]
>> year = entry[0]
>> page_as_string = entry[1]

Reading rdds:

>> nls_data = sc.textFile("hdfs:///user/at003/rosa/<NAME OF THE HDFS FILE>.txt")
>> nls_sample = nls_data.take(10)
>> entry=nls_sample[8][1:-1].split("\',")
>> clean_entry=[item.split("\'")[1] for item in entry]
>> year = int(clean_entry[2])
>> preprocess_type = clean_entry[10]
>> page_as_string = clean_entry[11]

Writing to a YML file

Writing pages to a YML file using dataframes loads in memory all the pages and their metadata, applies all type of preprocess treatment ot the pages, create a dataframe, store data into the dataframe, and finally save the dataframe into a YML file.

The information stored per page is the following: "title", "edition", "year", "place", "archive_filename", "source_text_filename", "text_unit", "text_unit_id", "num_text_unit", "type_archive", "model", "source_text_raw", "source_text_clean", "source_text_norm", "source_text_lemmatize", "source_text_stem", "num_words".

In “source_text_clean”, I store the result of applying two modifications to the raw text (source_text_raw): 1) Handle hyphenated words and 2) fix the long-s. The pre-process treatments (normalize, stem and lemmatize) are applied to text stored in this field, and not from the raw one. Both, stem and lemmatize, they also include normalization. .

spark-submit --driver-class-path elasticsearch-hadoop-7.5.0/dist/elasticsearch-hadoop-7.5.0.jar --jars elasticsearch-hadoop-7.5.0/dist/elasticsearch-hadoop-7.5.0.jar  --py-files defoe/ nls-data.txt nls defoe.nls.queries.write_pages_df_es queries/write_to_yml.yml -r results -n 324

Notice that defoe_path and os_type properties (needed for the long-S fix) are indicated in the configuration file queries/write_to_yml.yml

defoe_path: /home/rosa_filgueira_vicente/defoe/
os_type: linux