Skip to content

Commit

Permalink
facilitator: refactor manifest fetching. (#207)
Browse files Browse the repository at this point in the history
This adds a new get_url function that wraps ureq gets, turning ureq's
synthetic_errors into Results, and using anyhow to add context about
which URL failed.

This also changes the manifest reader functions to take a slice of bytes
instead of a reader.
  • Loading branch information
jsha committed Nov 18, 2020
1 parent 4b621e1 commit 864561a
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 78 deletions.
4 changes: 2 additions & 2 deletions facilitator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion facilitator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ structopt = "0.3"
tempfile = "3.1.0"
thiserror = "1.0"
tokio = { version = "0.2", features = ["rt-core", "io-util"] }
ureq = { version = "1.5.1", features = ["json"] }
ureq = { version = "1.5.2", features = ["json"] }
urlencoding = "1.1.1"
uuid = { version = "0.8", features = ["serde", "v4"] }

Expand Down
22 changes: 11 additions & 11 deletions facilitator/src/bin/facilitator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ring::signature::{
EcdsaKeyPair, KeyPair, UnparsedPublicKey, ECDSA_P256_SHA256_ASN1,
ECDSA_P256_SHA256_ASN1_SIGNING,
};
use std::{collections::HashMap, fs::File, io::Read, str::FromStr};
use std::{collections::HashMap, fs, fs::File, io::Read, str::FromStr};
use uuid::Uuid;

use facilitator::{
Expand Down Expand Up @@ -869,8 +869,8 @@ fn aggregate(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {

fn lint_manifest(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
let manifest_base_url = sub_matches.value_of("manifest-base-url");
let manifest_file_reader = match sub_matches.value_of("manifest-path") {
Some(path) => Some(File::open(path).context("failed to open manifest file")?),
let manifest_body: Option<String> = match sub_matches.value_of("manifest-path") {
Some(f) => Some(fs::read_to_string(f)?),
None => None,
};

Expand All @@ -884,8 +884,8 @@ fn lint_manifest(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
ManifestKind::IngestorGlobal => {
let manifest = if let Some(base_url) = manifest_base_url {
IngestionServerGlobalManifest::from_https(base_url)?
} else if let Some(reader) = manifest_file_reader {
IngestionServerGlobalManifest::from_reader(reader)?
} else if let Some(body) = manifest_body {
IngestionServerGlobalManifest::from_slice(body.as_bytes())?
} else {
return Err(anyhow!(
"one of manifest-base-url or manifest-path is required"
Expand All @@ -896,8 +896,8 @@ fn lint_manifest(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
ManifestKind::DataShareProcessorGlobal => {
let manifest = if let Some(base_url) = manifest_base_url {
DataShareProcessorGlobalManifest::from_https(base_url)?
} else if let Some(reader) = manifest_file_reader {
DataShareProcessorGlobalManifest::from_reader(reader)?
} else if let Some(body) = manifest_body {
DataShareProcessorGlobalManifest::from_slice(body.as_bytes())?
} else {
return Err(anyhow!(
"one of manifest-base-url or manifest-path is required"
Expand All @@ -911,8 +911,8 @@ fn lint_manifest(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
.context("instance is required when manifest-kind=data-share-processor-specific")?;
let manifest = if let Some(base_url) = manifest_base_url {
SpecificManifest::from_https(base_url, instance)?
} else if let Some(reader) = manifest_file_reader {
SpecificManifest::from_reader(reader)?
} else if let Some(body) = manifest_body {
SpecificManifest::from_slice(body.as_bytes())?
} else {
return Err(anyhow!(
"one of manifest-base-url or manifest-path is required"
Expand All @@ -923,8 +923,8 @@ fn lint_manifest(sub_matches: &ArgMatches) -> Result<(), anyhow::Error> {
ManifestKind::PortalServerGlobal => {
let manifest = if let Some(base_url) = manifest_base_url {
PortalServerGlobalManifest::from_https(base_url)?
} else if let Some(reader) = manifest_file_reader {
PortalServerGlobalManifest::from_reader(reader)?
} else if let Some(body) = manifest_body {
PortalServerGlobalManifest::from_slice(body.as_bytes())?
} else {
return Err(anyhow!(
"one of manifest-base-url or manifest-path is required"
Expand Down
22 changes: 22 additions & 0 deletions facilitator/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use anyhow::{anyhow, Context, Result};

pub(crate) fn get_url(url: &str) -> Result<String> {
let resp = ureq::get(url)
// By default, ureq will wait forever to connect or
// read.
.timeout_connect(10_000) // ten seconds
.timeout_read(10_000) // ten seconds
.call();
if resp.synthetic_error().is_some() {
Err(anyhow!(
"fetching {}: {}",
url,
resp.into_synthetic_error().unwrap()
))
} else if !resp.ok() {
Err(anyhow!("fetching {}: status {}", url, resp.status()))
} else {
resp.into_string()
.context(format!("reading body of {}", url))
}
}
1 change: 1 addition & 0 deletions facilitator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::io::Write;
pub mod aggregation;
pub mod batch;
pub mod config;
pub mod http;
pub mod idl;
pub mod intake;
pub mod manifest;
Expand Down
111 changes: 47 additions & 64 deletions facilitator/src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::config::StoragePath;
use crate::http;
use anyhow::{anyhow, Context, Result};
use ring::signature::{UnparsedPublicKey, ECDSA_P256_SHA256_ASN1};
use serde::Deserialize;
use serde_json::from_reader;
use std::{collections::HashMap, io::Read, str::FromStr};
use ureq::Response;

use std::{collections::HashMap, str::FromStr};

// See discussion in SpecificManifest::batch_signing_public_key
const ECDSA_P256_SPKI_PREFIX: &[u8] = &[
Expand Down Expand Up @@ -66,16 +66,16 @@ pub struct DataShareProcessorServerIdentity {
impl DataShareProcessorGlobalManifest {
/// Loads the global manifest relative to the provided base path and returns
/// it. Returns an error if the manifest could not be loaded or parsed.
pub fn from_https(base_path: &str) -> Result<DataShareProcessorGlobalManifest> {
pub fn from_https(base_path: &str) -> Result<Self> {
let manifest_url = format!("{}/global-manifest.json", base_path);
DataShareProcessorGlobalManifest::from_reader(fetch_manifest(&manifest_url)?.into_reader())
DataShareProcessorGlobalManifest::from_slice(fetch_manifest(&manifest_url)?.as_bytes())
}

/// Loads the manifest from the provided std::io::Read. Returns an error if
/// the manifest could not be read or parsed.
pub fn from_reader<R: Read>(reader: R) -> Result<DataShareProcessorGlobalManifest> {
let manifest: DataShareProcessorGlobalManifest =
from_reader(reader).context("failed to decode JSON global manifest")?;
/// Loads the manifest from the provided String. Returns an error if
/// the manifest could not be parsed.
pub fn from_slice(json: &[u8]) -> Result<Self> {
let manifest: Self =
serde_json::from_slice(json).context("failed to decode JSON global manifest")?;
if manifest.format != 0 {
return Err(anyhow!("unsupported manifest format {}", manifest.format));
}
Expand Down Expand Up @@ -114,16 +114,16 @@ impl SpecificManifest {
/// Load the specific manifest for the specified peer relative to the
/// provided base path. Returns an error if the manifest could not be
/// downloaded or parsed.
pub fn from_https(base_path: &str, peer_name: &str) -> Result<SpecificManifest> {
pub fn from_https(base_path: &str, peer_name: &str) -> Result<Self> {
let manifest_url = format!("{}/{}-manifest.json", base_path, peer_name);
SpecificManifest::from_reader(fetch_manifest(&manifest_url)?.into_reader())
SpecificManifest::from_slice(fetch_manifest(&manifest_url)?.as_bytes())
}

/// Loads the manifest from the provided std::io::Read. Returns an error if
/// the manifest could not be read or parsed.
pub fn from_reader<R: Read>(reader: R) -> Result<SpecificManifest> {
let manifest: SpecificManifest =
from_reader(reader).context("failed to decode JSON specific manifest")?;
/// Loads the manifest from the provided String. Returns an error if
/// the manifest could not be parsed.
pub fn from_slice(json: &[u8]) -> Result<Self> {
let manifest: Self =
serde_json::from_slice(json).context("failed to decode JSON global manifest")?;
if manifest.format != 0 {
return Err(anyhow!("unsupported manifest format {}", manifest.format));
}
Expand Down Expand Up @@ -203,16 +203,16 @@ pub struct IngestionServerGlobalManifest {
impl IngestionServerGlobalManifest {
/// Loads the global manifest relative to the provided base path and returns
/// it. Returns an error if the manifest could not be loaded or parsed.
pub fn from_https(base_path: &str) -> Result<IngestionServerGlobalManifest> {
pub fn from_https(base_path: &str) -> Result<Self> {
let manifest_url = format!("{}/global-manifest.json", base_path);
IngestionServerGlobalManifest::from_reader(fetch_manifest(&manifest_url)?.into_reader())
IngestionServerGlobalManifest::from_slice(fetch_manifest(&manifest_url)?.as_bytes())
}

/// Loads the manifest from the provided std::io::Read. Returns an error if
/// the manifest could not be read or parsed.
pub fn from_reader<R: Read>(reader: R) -> Result<IngestionServerGlobalManifest> {
let manifest: IngestionServerGlobalManifest =
from_reader(reader).context("failed to decode JSON global manifest")?;
/// Loads the manifest from the provided String. Returns an error if
/// the manifest could not be parsed.
pub fn from_slice(json: &[u8]) -> Result<Self> {
let manifest: Self =
serde_json::from_slice(json).context("failed to decode JSON global manifest")?;
if manifest.format != 0 {
return Err(anyhow!("unsupported manifest format {}", manifest.format));
}
Expand Down Expand Up @@ -259,16 +259,16 @@ pub struct PortalServerGlobalManifest {
}

impl PortalServerGlobalManifest {
pub fn from_https(base_path: &str) -> Result<PortalServerGlobalManifest> {
pub fn from_https(base_path: &str) -> Result<Self> {
let manifest_url = format!("{}/global-manifest.json", base_path);
PortalServerGlobalManifest::from_reader(fetch_manifest(&manifest_url)?.into_reader())
PortalServerGlobalManifest::from_slice(fetch_manifest(&manifest_url)?.as_bytes())
}

/// Loads the manifest from the provided std::io::Read. Returns an error if
/// the manifest could not be read or parsed.
pub fn from_reader<R: Read>(reader: R) -> Result<PortalServerGlobalManifest> {
/// Loads the manifest from the provided String. Returns an error if
/// the manifest could not be parsed.
pub fn from_slice(json: &[u8]) -> Result<Self> {
let manifest: PortalServerGlobalManifest =
from_reader(reader).context("failed to decode JSON global manifest")?;
serde_json::from_slice(json).context("failed to decode JSON global manifest")?;
if manifest.format != 0 {
return Err(anyhow!("unsupported manifest format {}", manifest.format));
}
Expand Down Expand Up @@ -301,20 +301,11 @@ impl PortalServerGlobalManifest {
}

/// Obtains a manifest file from the provided URL
fn fetch_manifest(manifest_url: &str) -> Result<Response> {
fn fetch_manifest(manifest_url: &str) -> Result<String> {
if !manifest_url.starts_with("https://") {
return Err(anyhow!("Manifest must be fetched over HTTPS"));
}
let response = ureq::get(manifest_url)
// By default, ureq will wait forever to connect or
// read.
.timeout_connect(10_000) // ten seconds
.timeout_read(10_000) // ten seconds
.call();
if response.error() {
return Err(anyhow!("failed to fetch manifest: {:?}", response));
}
Ok(response)
http::get_url(manifest_url)
}

/// Attempts to parse the provided string as a PEM encoded PKIX
Expand Down Expand Up @@ -370,22 +361,19 @@ mod tests {
};
use ring::rand::SystemRandom;
use rusoto_core::Region;
use std::io::Cursor;

#[test]
fn load_data_share_processor_global_manifest() {
let reader = Cursor::new(
r#"
let json = br#"
{
"format": 0,
"server-identity": {
"aws-account-id": 12345678901234567,
"gcp-service-account-email": "service-account@project-name.iam.gserviceaccount.com"
}
}
"#,
);
let manifest = DataShareProcessorGlobalManifest::from_reader(reader).unwrap();
"#;
let manifest = DataShareProcessorGlobalManifest::from_slice(json).unwrap();
assert_eq!(manifest.format, 0);
assert_eq!(
manifest.server_identity,
Expand All @@ -399,7 +387,7 @@ mod tests {

#[test]
fn invalid_data_share_processor_global_manifests() {
let invalid_manifests = vec![
let invalid_manifests: Vec<&str> = vec![
// no format key
r#"
{
Expand Down Expand Up @@ -466,14 +454,13 @@ mod tests {
];

for invalid_manifest in &invalid_manifests {
let reader = Cursor::new(invalid_manifest);
DataShareProcessorGlobalManifest::from_reader(reader).unwrap_err();
DataShareProcessorGlobalManifest::from_slice(invalid_manifest.as_bytes()).unwrap_err();
}
}

#[test]
fn load_specific_manifest() {
let reader = Cursor::new(format!(
let json = format!(
r#"
{{
"format": 0,
Expand All @@ -494,8 +481,8 @@ mod tests {
}}
"#,
DEFAULT_INGESTOR_SUBJECT_PUBLIC_KEY_INFO
));
let manifest = SpecificManifest::from_reader(reader).unwrap();
);
let manifest = SpecificManifest::from_slice(json.as_bytes()).unwrap();

let mut expected_batch_keys = HashMap::new();
expected_batch_keys.insert(
Expand Down Expand Up @@ -637,8 +624,7 @@ mod tests {
];

for invalid_manifest in &invalid_manifests {
let reader = Cursor::new(invalid_manifest);
SpecificManifest::from_reader(reader).unwrap_err();
SpecificManifest::from_slice(invalid_manifest.as_bytes()).unwrap_err();
}
}

Expand Down Expand Up @@ -707,8 +693,7 @@ mod tests {
"#,
];
for invalid_manifest in &manifests_with_invalid_public_keys {
let reader = Cursor::new(invalid_manifest);
let manifest = SpecificManifest::from_reader(reader).unwrap();
let manifest = SpecificManifest::from_slice(invalid_manifest.as_bytes()).unwrap();
assert!(manifest.batch_signing_public_keys().is_err());
}
}
Expand Down Expand Up @@ -750,7 +735,7 @@ mod tests {
"#;

let manifest =
IngestionServerGlobalManifest::from_reader(Cursor::new(manifest_with_aws_identity))
IngestionServerGlobalManifest::from_slice(manifest_with_aws_identity.as_bytes())
.unwrap();
assert_eq!(
manifest.server_identity.aws_iam_entity,
Expand All @@ -762,7 +747,7 @@ mod tests {
assert!(batch_signing_public_keys.get("nosuchkey").is_none());

let manifest =
IngestionServerGlobalManifest::from_reader(Cursor::new(manifest_with_gcp_identity))
IngestionServerGlobalManifest::from_slice(manifest_with_gcp_identity.as_bytes())
.unwrap();
assert_eq!(manifest.server_identity.aws_iam_entity, None);
assert_eq!(
Expand Down Expand Up @@ -830,8 +815,7 @@ mod tests {
];

for invalid_manifest in &invalid_manifests {
let reader = Cursor::new(invalid_manifest);
IngestionServerGlobalManifest::from_reader(reader).unwrap_err();
IngestionServerGlobalManifest::from_slice(invalid_manifest.as_bytes()).unwrap_err();
}
}

Expand All @@ -845,7 +829,7 @@ mod tests {
}
"#;

let manifest = PortalServerGlobalManifest::from_reader(Cursor::new(manifest)).unwrap();
let manifest = PortalServerGlobalManifest::from_slice(manifest.as_bytes()).unwrap();
if let StoragePath::GCSPath(path) = manifest.sum_part_bucket(false).unwrap() {
assert_eq!(
path,
Expand Down Expand Up @@ -908,8 +892,7 @@ mod tests {
];

for invalid_manifest in &invalid_manifests {
let reader = Cursor::new(invalid_manifest);
PortalServerGlobalManifest::from_reader(reader).unwrap_err();
PortalServerGlobalManifest::from_slice(invalid_manifest.as_bytes()).unwrap_err();
}
}
}

0 comments on commit 864561a

Please sign in to comment.