diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index dd77df1283d80..dba4618e35905 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -1441,6 +1441,86 @@ def delete_dataset(self, project_id, dataset_id): 'BigQuery job failed. Error was: {}'.format(err.content) ) + def get_dataset(self, dataset_id, project_id=None): + """ + Method returns dataset_resource if dataset exist + and raised 404 error if dataset does not exist + + :param dataset_id: The BigQuery Dataset ID + :type dataset_id: str + :param project_id: The GCP Project ID + :type project_id: str + :return: dataset_resource + + .. seealso:: + For more information, see Dataset Resource content: + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + """ + + if not dataset_id or not isinstance(dataset_id, str): + raise ValueError("dataset_id argument must be provided and has " + "a type 'str'. You provided: {}".format(dataset_id)) + + dataset_project_id = project_id if project_id else self.project_id + + try: + dataset_resource = self.service.datasets().get( + datasetId=dataset_id, projectId=dataset_project_id).execute() + self.log.info("Dataset Resource: {}".format(dataset_resource)) + except HttpError as err: + raise AirflowException( + 'BigQuery job failed. Error was: {}'.format(err.content)) + + return dataset_resource + + def get_datasets_list(self, project_id=None): + """ + Method returns full list of BigQuery datasets in the current project + + .. seealso:: + For more information, see: + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list + + :param project_id: Google Cloud Project for which you + try to get all datasets + :type project_id: str + :return: datasets_list + + Example of returned datasets_list: :: + + { + "kind":"bigquery#dataset", + "location":"US", + "id":"your-project:dataset_2_test", + "datasetReference":{ + "projectId":"your-project", + "datasetId":"dataset_2_test" + } + }, + { + "kind":"bigquery#dataset", + "location":"US", + "id":"your-project:dataset_1_test", + "datasetReference":{ + "projectId":"your-project", + "datasetId":"dataset_1_test" + } + } + ] + """ + dataset_project_id = project_id if project_id else self.project_id + + try: + datasets_list = self.service.datasets().list( + projectId=dataset_project_id).execute()['datasets'] + self.log.info("Datasets List: {}".format(datasets_list)) + + except HttpError as err: + raise AirflowException( + 'BigQuery job failed. Error was: {}'.format(err.content)) + + return datasets_list + class BigQueryCursor(BigQueryBaseCursor): """ diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index 84fe84043e582..77a31f032081b 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -360,6 +360,68 @@ def test_create_empty_dataset_duplicates_call_err(self, {"datasetId": "test_dataset", "projectId": "project_test2"}}) + def test_get_dataset_without_dataset_id(self): + with mock.patch.object(hook.BigQueryHook, 'get_service'): + with self.assertRaises(ValueError): + hook.BigQueryBaseCursor( + mock.Mock(), "test_create_empty_dataset").get_dataset( + dataset_id="", project_id="project_test") + + def test_get_dataset(self): + expected_result = { + "kind": "bigquery#dataset", + "location": "US", + "id": "your-project:dataset_2_test", + "datasetReference": { + "projectId": "your-project", + "datasetId": "dataset_2_test" + } + } + dataset_id = "test_dataset" + project_id = "project_test" + + bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id) + with mock.patch.object(bq_hook.service, 'datasets') as MockService: + MockService.return_value.get(datasetId=dataset_id, + projectId=project_id).execute.\ + return_value = expected_result + result = bq_hook.get_dataset(dataset_id=dataset_id, + project_id=project_id) + self.assertEqual(result, expected_result) + + def test_get_datasets_list(self): + expected_result = {'datasets': [ + { + "kind": "bigquery#dataset", + "location": "US", + "id": "your-project:dataset_2_test", + "datasetReference": { + "projectId": "your-project", + "datasetId": "dataset_2_test" + } + }, + { + "kind": "bigquery#dataset", + "location": "US", + "id": "your-project:dataset_1_test", + "datasetReference": { + "projectId": "your-project", + "datasetId": "dataset_1_test" + } + } + ]} + project_id = "project_test"'' + + mocked = mock.Mock() + with mock.patch.object(hook.BigQueryBaseCursor(mocked, project_id).service, + 'datasets') as MockService: + MockService.return_value.list( + projectId=project_id).execute.return_value = expected_result + result = hook.BigQueryBaseCursor( + mocked, "test_create_empty_dataset").get_datasets_list( + project_id=project_id) + self.assertEqual(result, expected_result['datasets']) + class TestTimePartitioningInRunJob(unittest.TestCase): @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")