From 7548e612427d7b4f05cbd3c679980339e59473c7 Mon Sep 17 00:00:00 2001
From: Chen Zhiling <chnzhlng@gmail.com>
Date: Tue, 28 May 2019 10:29:31 +0800
Subject: [PATCH] Fix pytests and make TS conversion conditional (#205)

* Add wait flag for jobs, fix go proto path for dataset service

* Rebase with master

* Fix tests, update requirements

* Update min version for pandas

* Update integration testutils to use pandas nullable ints

* Fix warehouse validate int32

* Distinguish pandas dtype and dtype

* Add Int32 support for kafka producer

* Remove print
---
 integration-tests/testutils/kafka_producer.py |  4 ++-
 integration-tests/testutils/requirements.txt  |  2 +-
 integration-tests/testutils/spec.py           |  4 +--
 .../testutils/validate_feature_values.py      |  6 ++++-
 sdk/python/feast/sdk/importer.py              |  3 ++-
 sdk/python/feast/sdk/utils/bq_util.py         |  2 +-
 sdk/python/feast/sdk/utils/types.py           |  4 +--
 sdk/python/setup.py                           |  2 +-
 sdk/python/test-requirements.txt              |  5 ++--
 sdk/python/tests/sdk/test_client.py           |  2 +-
 sdk/python/tests/sdk/test_importer.py         | 25 +++++++++++++++++--
 sdk/python/tests/sdk/utils/test_bq_utils.py   | 19 +-------------
 12 files changed, 45 insertions(+), 33 deletions(-)

diff --git a/integration-tests/testutils/kafka_producer.py b/integration-tests/testutils/kafka_producer.py
index f9c5435b81..6c6d99e274 100644
--- a/integration-tests/testutils/kafka_producer.py
+++ b/integration-tests/testutils/kafka_producer.py
@@ -38,8 +38,10 @@ def produce_feature_rows(
             feature.id = info["id"]
             feature_value = Value()
             feature_name = info["name"]
-            if info["dtype"] is np.int64:
+            if info["dtype"] is "Int64":
                 feature_value.int64Val = row[feature_name]
+            elif info["dtype"] is "Int32":
+                feature_value.int32Val = row[feature_name]
             elif info["dtype"] is np.float64:
                 feature_value.doubleVal = row[feature_name]
             else:
diff --git a/integration-tests/testutils/requirements.txt b/integration-tests/testutils/requirements.txt
index e67b4f7196..a0b3f9c984 100644
--- a/integration-tests/testutils/requirements.txt
+++ b/integration-tests/testutils/requirements.txt
@@ -1,3 +1,3 @@
-pandas==0.23.*
+pandas==0.24.*
 numpy==1.15.*
 kafka-python==1.4.*
diff --git a/integration-tests/testutils/spec.py b/integration-tests/testutils/spec.py
index e5a06980c6..1b5faf296e 100644
--- a/integration-tests/testutils/spec.py
+++ b/integration-tests/testutils/spec.py
@@ -12,8 +12,8 @@ def get_entity_name(entity_spec_file):
 
 def get_feature_infos(feature_specs_files):
     value_type_to_dtype = {
-        "INT32": np.int64,
-        "INT64": np.int64,
+        "INT32": "Int32",
+        "INT64": "Int64",
         "DOUBLE": np.float64,
         "FLOAT": np.float64,
     }
diff --git a/integration-tests/testutils/validate_feature_values.py b/integration-tests/testutils/validate_feature_values.py
index e56b6426b3..d68e176d3b 100644
--- a/integration-tests/testutils/validate_feature_values.py
+++ b/integration-tests/testutils/validate_feature_values.py
@@ -27,6 +27,10 @@ def validate_warehouse(
         parse_dates=["event_timestamp"],
     )
 
+    dtypes = {"event_timestamp": "datetime64[ns]"}
+    for f in feature_infos:
+        dtypes[f["name"]] = f["dtype"]
+
     # TODO: Retrieve actual values via Feast Core rather than directly from BigQuery
     #       Need to change Python SDK so can retrieve values via Feast Core while
     #       "ensuring correct value types"
@@ -37,7 +41,7 @@ def validate_warehouse(
         )
         .sort_values(["id", "event_timestamp"])
         .reset_index(drop=True)
-        .astype({"event_timestamp": "datetime64[ns]"})
+        .astype(dtypes)
     )[expected.columns]
 
     pd.testing.assert_frame_equal(expected, actual)
diff --git a/sdk/python/feast/sdk/importer.py b/sdk/python/feast/sdk/importer.py
index d8444be3ab..0a86f48120 100644
--- a/sdk/python/feast/sdk/importer.py
+++ b/sdk/python/feast/sdk/importer.py
@@ -267,7 +267,8 @@ def stage(self):
         if not self.require_staging:
             return
         ts_col = self.spec.schema.timestampColumn
-        _convert_timestamp(self.df, ts_col)
+        if ts_col != "":
+            _convert_timestamp(self.df, ts_col)
         df_to_gcs(self.df, self.remote_path)
 
     def describe(self):
diff --git a/sdk/python/feast/sdk/utils/bq_util.py b/sdk/python/feast/sdk/utils/bq_util.py
index b52dee92ca..ebb838502b 100644
--- a/sdk/python/feast/sdk/utils/bq_util.py
+++ b/sdk/python/feast/sdk/utils/bq_util.py
@@ -74,7 +74,7 @@ def get_table_name(feature_id, storage_spec):
     except KeyError:
         raise ValueError("storage spec has empty project or dataset option")
 
-    table_name = "_".join(feature_id.split(".")[:2])
+    table_name = feature_id.split(".")[0]
     return ".".join([project, dataset, table_name])
 
 
diff --git a/sdk/python/feast/sdk/utils/types.py b/sdk/python/feast/sdk/utils/types.py
index 21752c8e24..2f79a2119d 100644
--- a/sdk/python/feast/sdk/utils/types.py
+++ b/sdk/python/feast/sdk/utils/types.py
@@ -39,8 +39,8 @@
 FEAST_VALUETYPE_TO_DTYPE = {
     "bytesVal": np.byte,
     "stringVal": np.object,
-    "int32Val": np.int64,
-    "int64Val": np.int64,
+    "int32Val": "Int32", # Use pandas nullable int type
+    "int64Val": "Int64", # Use pandas nullable int type
     "doubleVal": np.float64,
     "floatVal": np.float64,
     "boolVal": np.bool,
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 8f87c3f8c6..a8b0c7cdc8 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -29,7 +29,7 @@
     "google-cloud-storage>=1.13.0",
     "googleapis-common-protos>=1.5.5",
     "grpcio>=1.16.1",
-    "pandas",
+    "pandas>=0.24.0",
     "protobuf>=3.0.0",
     "PyYAML",
     "fastavro>=0.21.19"
diff --git a/sdk/python/test-requirements.txt b/sdk/python/test-requirements.txt
index b89319fb9f..b97a8770cd 100644
--- a/sdk/python/test-requirements.txt
+++ b/sdk/python/test-requirements.txt
@@ -6,8 +6,9 @@ google-resumable-media==0.3.1
 googleapis-common-protos>=1.5.5
 grpcio>=1.16.1
 numpy
-pandas
+pandas>=0.24.0
 protobuf>=3.0.0
 pytest
 pytest-mock
-PyYAML
\ No newline at end of file
+PyYAML
+fastavro>=0.21.23
\ No newline at end of file
diff --git a/sdk/python/tests/sdk/test_client.py b/sdk/python/tests/sdk/test_client.py
index f0d789afd0..84db40bb63 100644
--- a/sdk/python/tests/sdk/test_client.py
+++ b/sdk/python/tests/sdk/test_client.py
@@ -325,7 +325,7 @@ def test_serving_response_to_df_with_missing_value(self, client):
             })
         expected_df = pd.DataFrame({'entity': ["1", "2"],
                                     'entity.feat1': [1, 3],
-                                    'entity.feat2': [np.NaN, 4]}) \
+                                    'entity.feat2': [np.nan, 4]}) \
             .reset_index(drop=True)
         df = client._response_to_df(FeatureSet("entity", ["entity.feat1",
                                                           "entity.feat2"]),
diff --git a/sdk/python/tests/sdk/test_importer.py b/sdk/python/tests/sdk/test_importer.py
index b1049831e7..6a0c9b0784 100644
--- a/sdk/python/tests/sdk/test_importer.py
+++ b/sdk/python/tests/sdk/test_importer.py
@@ -164,9 +164,8 @@ def test_from_df(self):
             assert feature.id == "driver." + feature.name
 
         import_spec = importer.spec
-        assert import_spec.type == "file"
+        assert import_spec.type == "file.csv"
         assert import_spec.sourceOptions == {
-            "format": "csv",
             "path": importer.remote_path
         }
         assert import_spec.entities == ["driver"]
@@ -182,6 +181,27 @@ def test_from_df(self):
             assert col == field.name
             if col in feature_columns:
                 assert field.featureId == "driver." + col
+    
+    def test_stage_df_without_timestamp(self, mocker):
+        mocker.patch("feast.sdk.importer.df_to_gcs", return_value=True)
+        feature_columns = [
+            "avg_distance_completed", "avg_customer_distance_completed",
+            "avg_distance_cancelled"
+        ]
+        csv_path = "tests/data/driver_features.csv"
+        entity_name = "driver"
+        owner = "owner@feast.com"
+        staging_location = "gs://test-bucket"
+        id_column = "driver_id"
+        importer = Importer.from_csv(
+            path=csv_path,
+            entity=entity_name,
+            owner=owner,
+            staging_location=staging_location,
+            id_column=id_column,
+            feature_columns=feature_columns)
+
+        importer.stage()
 
     def _validate_csv_importer(self,
                                importer,
@@ -228,6 +248,7 @@ def _validate_csv_importer(self,
             if col in feature_columns:
                 assert field.featureId == '{}.{}'.format(entity_name,
                                                          col).lower()
+    
 
 
 class TestHelpers:
diff --git a/sdk/python/tests/sdk/utils/test_bq_utils.py b/sdk/python/tests/sdk/utils/test_bq_utils.py
index bc5fd72f18..7fafb1af20 100644
--- a/sdk/python/tests/sdk/utils/test_bq_utils.py
+++ b/sdk/python/tests/sdk/utils/test_bq_utils.py
@@ -38,7 +38,7 @@ def test_get_table_name():
     )
     assert (
         get_table_name(feature_id, storage_spec)
-        == "my_project.my_dataset.myentity_none"
+        == "my_project.my_dataset.myentity"
     )
 
 
@@ -48,23 +48,6 @@ def test_get_table_name_not_bq():
     with pytest.raises(ValueError, match="storage spec is not BigQuery storage spec"):
         get_table_name(feature_id, storage_spec)
 
-
-@pytest.mark.skipif(
-    os.getenv("SKIP_BIGQUERY_TEST") is not None,
-    reason="SKIP_BIGQUERY_TEST is set in the environment",
-)
-def test_query_to_dataframe():
-    with open(
-        os.path.join(testdata_path, "austin_bikeshare.bikeshare_stations.avro"), "rb"
-    ) as expected_file:
-        avro_reader = fastavro.reader(expected_file)
-        expected = pd.DataFrame.from_records(avro_reader)
-
-    query = "SELECT * FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations`"
-    actual = query_to_dataframe(query)
-    assert expected.equals(actual)
-
-
 @pytest.mark.skipif(
     os.getenv("SKIP_BIGQUERY_TEST") is not None,
     reason="SKIP_BIGQUERY_TEST is set in the environment",