Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3055] add get_dataset and get_datasets_list to bigquery_hook #3894

Merged
merged 3 commits into from
Oct 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,86 @@ def delete_dataset(self, project_id, dataset_id):
'BigQuery job failed. Error was: {}'.format(err.content)
)

def get_dataset(self, dataset_id, project_id=None):
"""
Method returns dataset_resource if dataset exist
and raised 404 error if dataset does not exist

:param dataset_id: The BigQuery Dataset ID
:type dataset_id: str
:param project_id: The GCP Project ID
:type project_id: str
:return: dataset_resource

.. seealso::
For more information, see Dataset Resource content:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""

if not dataset_id or not isinstance(dataset_id, str):
raise ValueError("dataset_id argument must be provided and has "
"a type 'str'. You provided: {}".format(dataset_id))

dataset_project_id = project_id if project_id else self.project_id

try:
dataset_resource = self.service.datasets().get(
datasetId=dataset_id, projectId=dataset_project_id).execute()
self.log.info("Dataset Resource: {}".format(dataset_resource))
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content))

return dataset_resource

def get_datasets_list(self, project_id=None):
"""
Method returns full list of BigQuery datasets in the current project

.. seealso::
For more information, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list

:param project_id: Google Cloud Project for which you
try to get all datasets
:type project_id: str
:return: datasets_list

Example of returned datasets_list: ::

{
"kind":"bigquery#dataset",
"location":"US",
"id":"your-project:dataset_2_test",
"datasetReference":{
"projectId":"your-project",
"datasetId":"dataset_2_test"
}
},
{
"kind":"bigquery#dataset",
"location":"US",
"id":"your-project:dataset_1_test",
"datasetReference":{
"projectId":"your-project",
"datasetId":"dataset_1_test"
}
}
]
"""
dataset_project_id = project_id if project_id else self.project_id

try:
datasets_list = self.service.datasets().list(
projectId=dataset_project_id).execute()['datasets']
self.log.info("Datasets List: {}".format(datasets_list))

except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content))

return datasets_list


class BigQueryCursor(BigQueryBaseCursor):
"""
Expand Down
62 changes: 62 additions & 0 deletions tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,68 @@ def test_create_empty_dataset_duplicates_call_err(self,
{"datasetId": "test_dataset",
"projectId": "project_test2"}})

def test_get_dataset_without_dataset_id(self):
with mock.patch.object(hook.BigQueryHook, 'get_service'):
with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").get_dataset(
dataset_id="", project_id="project_test")

def test_get_dataset(self):
expected_result = {
"kind": "bigquery#dataset",
"location": "US",
"id": "your-project:dataset_2_test",
"datasetReference": {
"projectId": "your-project",
"datasetId": "dataset_2_test"
}
}
dataset_id = "test_dataset"
project_id = "project_test"

bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id)
with mock.patch.object(bq_hook.service, 'datasets') as MockService:
MockService.return_value.get(datasetId=dataset_id,
projectId=project_id).execute.\
return_value = expected_result
result = bq_hook.get_dataset(dataset_id=dataset_id,
project_id=project_id)
self.assertEqual(result, expected_result)

def test_get_datasets_list(self):
expected_result = {'datasets': [
{
"kind": "bigquery#dataset",
"location": "US",
"id": "your-project:dataset_2_test",
"datasetReference": {
"projectId": "your-project",
"datasetId": "dataset_2_test"
}
},
{
"kind": "bigquery#dataset",
"location": "US",
"id": "your-project:dataset_1_test",
"datasetReference": {
"projectId": "your-project",
"datasetId": "dataset_1_test"
}
}
]}
project_id = "project_test"''

mocked = mock.Mock()
with mock.patch.object(hook.BigQueryBaseCursor(mocked, project_id).service,
'datasets') as MockService:
MockService.return_value.list(
projectId=project_id).execute.return_value = expected_result
result = hook.BigQueryBaseCursor(
mocked, "test_create_empty_dataset").get_datasets_list(
project_id=project_id)
self.assertEqual(result, expected_result['datasets'])


class TestTimePartitioningInRunJob(unittest.TestCase):
@mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
Expand Down