Skip to content

Commit

Permalink
[AIRFLOW-3055] add get_dataset and get_datasets_list to bigquery_hook (
Browse files Browse the repository at this point in the history
…#3894)

* [AIRFLOW-3055] add get_dataset and get_datasets_list to bigquery_hook
  • Loading branch information
xnuinside authored and kaxil committed Dec 29, 2018
1 parent 0844211 commit 5eb3cd0
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 0 deletions.
80 changes: 80 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,86 @@ def delete_dataset(self, project_id, dataset_id):
'BigQuery job failed. Error was: {}'.format(err.content)
)

def get_dataset(self, dataset_id, project_id=None):
"""
Method returns dataset_resource if dataset exist
and raised 404 error if dataset does not exist
:param dataset_id: The BigQuery Dataset ID
:type dataset_id: str
:param project_id: The GCP Project ID
:type project_id: str
:return: dataset_resource
.. seealso::
For more information, see Dataset Resource content:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""

if not dataset_id or not isinstance(dataset_id, str):
raise ValueError("dataset_id argument must be provided and has "
"a type 'str'. You provided: {}".format(dataset_id))

dataset_project_id = project_id if project_id else self.project_id

try:
dataset_resource = self.service.datasets().get(
datasetId=dataset_id, projectId=dataset_project_id).execute()
self.log.info("Dataset Resource: {}".format(dataset_resource))
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content))

return dataset_resource

def get_datasets_list(self, project_id=None):
"""
Method returns full list of BigQuery datasets in the current project
.. seealso::
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
:param project_id: Google Cloud Project for which you
try to get all datasets
:type project_id: str
:return: datasets_list
Example of returned datasets_list: ::
{
"kind":"bigquery#dataset",
"location":"US",
"id":"your-project:dataset_2_test",
"datasetReference":{
"projectId":"your-project",
"datasetId":"dataset_2_test"
}
},
{
"kind":"bigquery#dataset",
"location":"US",
"id":"your-project:dataset_1_test",
"datasetReference":{
"projectId":"your-project",
"datasetId":"dataset_1_test"
}
}
]
"""
dataset_project_id = project_id if project_id else self.project_id

try:
datasets_list = self.service.datasets().list(
projectId=dataset_project_id).execute()['datasets']
self.log.info("Datasets List: {}".format(datasets_list))

except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content))

return datasets_list


class BigQueryCursor(BigQueryBaseCursor):
"""
Expand Down
62 changes: 62 additions & 0 deletions tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,68 @@ def test_create_empty_dataset_duplicates_call_err(self,
{"datasetId": "test_dataset",
"projectId": "project_test2"}})

def test_get_dataset_without_dataset_id(self):
with mock.patch.object(hook.BigQueryHook, 'get_service'):
with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").get_dataset(
dataset_id="", project_id="project_test")

def test_get_dataset(self):
expected_result = {
"kind": "bigquery#dataset",
"location": "US",
"id": "your-project:dataset_2_test",
"datasetReference": {
"projectId": "your-project",
"datasetId": "dataset_2_test"
}
}
dataset_id = "test_dataset"
project_id = "project_test"

bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id)
with mock.patch.object(bq_hook.service, 'datasets') as MockService:
MockService.return_value.get(datasetId=dataset_id,
projectId=project_id).execute.\
return_value = expected_result
result = bq_hook.get_dataset(dataset_id=dataset_id,
project_id=project_id)
self.assertEqual(result, expected_result)

def test_get_datasets_list(self):
expected_result = {'datasets': [
{
"kind": "bigquery#dataset",
"location": "US",
"id": "your-project:dataset_2_test",
"datasetReference": {
"projectId": "your-project",
"datasetId": "dataset_2_test"
}
},
{
"kind": "bigquery#dataset",
"location": "US",
"id": "your-project:dataset_1_test",
"datasetReference": {
"projectId": "your-project",
"datasetId": "dataset_1_test"
}
}
]}
project_id = "project_test"''

mocked = mock.Mock()
with mock.patch.object(hook.BigQueryBaseCursor(mocked, project_id).service,
'datasets') as MockService:
MockService.return_value.list(
projectId=project_id).execute.return_value = expected_result
result = hook.BigQueryBaseCursor(
mocked, "test_create_empty_dataset").get_datasets_list(
project_id=project_id)
self.assertEqual(result, expected_result['datasets'])


class TestTimePartitioningInRunJob(unittest.TestCase):
@mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
Expand Down

0 comments on commit 5eb3cd0

Please sign in to comment.